Skip to content

Commit 1a692ce

Browse files
committed
first step at introducing a scheduling policy
This makes the worker implementation plolicy based
1 parent 58e6ffe commit 1a692ce

File tree

3 files changed

+107
-49
lines changed

3 files changed

+107
-49
lines changed
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (C) 2022 TU Dresden
3+
* All rights reserved.
4+
*
5+
* Authors:
6+
* Christian Menard
7+
*/
8+
9+
#ifndef REACTOR_CPP_IMPL_SCHEDULER_IMPL_HH
10+
#define REACTOR_CPP_IMPL_SCHEDULER_IMPL_HH
11+
12+
#include "reactor-cpp/trace.hh"
13+
#include <reactor-cpp/logging.hh>
14+
#include <reactor-cpp/reaction.hh>
15+
16+
namespace reactor {
17+
18+
template <class SchedulingPolicy>
19+
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
20+
thread_local const Worker<SchedulingPolicy>* Worker<SchedulingPolicy>::current_worker = nullptr;
21+
22+
template <class SchedulingPolicy>
23+
Worker<SchedulingPolicy>::Worker(Worker<SchedulingPolicy>&& worker) // NOLINT(performance-noexcept-move-constructor)
24+
: policy_{worker.policy_}
25+
, identity_{worker.identity_} {
26+
// Need to provide the move constructor in order to organize workers in a
27+
// std::vector. However, moving is not save if the thread is already running,
28+
// thus we throw an exception here if the worker is moved but the
29+
// internal thread is already running.
30+
31+
if (worker.thread_.joinable()) {
32+
throw std::runtime_error{"Running workers cannot be moved!"};
33+
}
34+
}
35+
36+
template <class SchedulingPolicy> void Worker<SchedulingPolicy>::work() const {
37+
// initialize the current worker thread local variable
38+
current_worker = this;
39+
40+
log::Debug() << "(Worker " << identity_ << ") Starting";
41+
42+
policy_.worker_function(*this);
43+
44+
log::Debug() << "(Worker " << identity_ << ") terminates";
45+
}
46+
47+
template <class SchedulingPolicy> void Worker<SchedulingPolicy>::execute_reaction(Reaction* reaction) const {
48+
log::Debug() << "(Worker " << identity_ << ") "
49+
<< "execute reaction " << reaction->fqn();
50+
51+
tracepoint(reactor_cpp, reaction_execution_starts, id, reaction->fqn(), scheduler.logical_time());
52+
reaction->trigger();
53+
tracepoint(reactor_cpp, reaction_execution_finishes, id, reaction->fqn(), scheduler.logical_time());
54+
}
55+
56+
} // namespace reactor
57+
58+
#endif // REACTOR_CPP_IMPL_SCHEDULER_IMPL_HH

include/reactor-cpp/scheduler.hh

+41-10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#define REACTOR_CPP_SCHEDULER_HH
1111

1212
#include <condition_variable>
13+
#include <cstddef>
1314
#include <functional>
1415
#include <future>
1516
#include <map>
@@ -26,12 +27,11 @@ namespace reactor {
2627

2728
// forward declarations
2829
class Scheduler;
29-
class Worker;
3030

31-
class Worker { // NOLINT
32-
public:
33-
Scheduler& scheduler_;
34-
const unsigned int identity_{0};
31+
template <class SchedulingPolicy> class Worker {
32+
private:
33+
SchedulingPolicy& policy_;
34+
const std::size_t identity_{0};
3535
std::thread thread_{};
3636

3737
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
@@ -40,20 +40,44 @@ public:
4040
void work() const;
4141
void execute_reaction(Reaction* reaction) const;
4242

43-
Worker(Scheduler& scheduler, unsigned int identity)
44-
: scheduler_{scheduler}
43+
public:
44+
Worker(SchedulingPolicy& policy, std::size_t identity)
45+
: policy_{policy}
4546
, identity_{identity} {}
4647
Worker(Worker&& worker); // NOLINT(performance-noexcept-move-constructor)
4748
Worker(const Worker& worker) = delete;
49+
~Worker() = default;
50+
51+
auto operator=(const Worker& worker) -> Worker& = delete;
52+
auto operator=(Worker&& worker) -> Worker& = delete;
4853

49-
void start_thread() { thread_ = std::thread(&Worker::work, this); }
50-
void join_thread() { thread_.join(); }
54+
void start() { thread_ = std::thread(&Worker::work, this); }
55+
void join() { thread_.join(); }
56+
57+
[[nodiscard]] auto id() const -> std::size_t { return identity_; }
5158

5259
static auto current_worker_id() -> unsigned { return current_worker->identity_; }
60+
61+
friend SchedulingPolicy;
62+
};
63+
64+
class DefaultSchedulingPolicy {
65+
Scheduler& scheduler_;
66+
std::size_t identity_counter{0};
67+
68+
public:
69+
DefaultSchedulingPolicy(Scheduler& scheduler)
70+
: scheduler_(scheduler) {}
71+
72+
void worker_function(const Worker<DefaultSchedulingPolicy>& worker) const;
73+
74+
auto create_worker() -> Worker<DefaultSchedulingPolicy> { return {*this, identity_counter++}; }
5375
};
5476

5577
class ReadyQueue {
5678
private:
79+
using Worker = Worker<DefaultSchedulingPolicy>;
80+
5781
std::vector<Reaction*> queue_{};
5882
std::atomic<std::ptrdiff_t> size_{0};
5983
Semaphore sem_{0};
@@ -89,6 +113,10 @@ using EventMap = std::map<BaseAction*, std::function<void(void)>>;
89113

90114
class Scheduler { // NOLINT
91115
private:
116+
using Worker = Worker<DefaultSchedulingPolicy>;
117+
118+
DefaultSchedulingPolicy policy_;
119+
92120
const bool using_workers_;
93121
LogicalTime logical_time_{};
94122

@@ -137,9 +165,12 @@ public:
137165
void start();
138166
void stop();
139167

140-
friend Worker;
168+
// FIXME: this needs to be removed in the final version, as we cannot make all policies friends...
169+
friend DefaultSchedulingPolicy;
141170
};
142171

143172
} // namespace reactor
144173

174+
#include "impl/scheduler_impl.hh"
175+
145176
#endif // REACTOR_CPP_SCHEDULER_HH

lib/scheduler.cc

+8-39
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,8 @@
2020

2121
namespace reactor {
2222

23-
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
24-
thread_local const Worker* Worker::current_worker = nullptr;
25-
26-
Worker::Worker(Worker&& work) // NOLINT(performance-noexcept-move-constructor)
27-
: scheduler_{work.scheduler_}
28-
, identity_{work.identity_} {
29-
// Need to provide the move constructor in order to organize workers in a
30-
// std::vector. However, moving is not save if the thread is already running,
31-
// thus we throw an exception here if the worker is moved but the
32-
// internal thread is already running.
33-
34-
if (work.thread_.joinable()) {
35-
throw std::runtime_error{"Running workers cannot be moved!"};
36-
}
37-
}
38-
39-
void Worker::work() const {
40-
// initialize the current worker thread local variable
41-
current_worker = this;
42-
43-
log::Debug() << "(Worker " << this->identity_ << ") Starting";
44-
45-
if (identity_ == 0) {
23+
void DefaultSchedulingPolicy::worker_function(const Worker<DefaultSchedulingPolicy>& worker) const {
24+
if (worker.id() == 0) {
4625
log::Debug() << "(Worker 0) do the initial scheduling";
4726
scheduler_.schedule();
4827
}
@@ -57,7 +36,7 @@ void Worker::work() const {
5736
}
5837

5938
// execute the reaction
60-
execute_reaction(reaction);
39+
worker.execute_reaction(reaction);
6140

6241
// was this the very last reaction?
6342
if (scheduler_.reactions_to_process_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
@@ -67,17 +46,6 @@ void Worker::work() const {
6746
}
6847
// continue otherwise
6948
}
70-
71-
log::Debug() << "(Worker " << identity_ << ") terminates";
72-
}
73-
74-
void Worker::execute_reaction(Reaction* reaction) const {
75-
log::Debug() << "(Worker " << identity_ << ") "
76-
<< "execute reaction " << reaction->fqn();
77-
78-
tracepoint(reactor_cpp, reaction_execution_starts, id, reaction->fqn(), scheduler.logical_time());
79-
reaction->trigger();
80-
tracepoint(reactor_cpp, reaction_execution_finishes, id, reaction->fqn(), scheduler.logical_time());
8149
}
8250

8351
void Scheduler::schedule() noexcept {
@@ -212,13 +180,13 @@ void Scheduler::start() {
212180
// be moved.
213181
workers_.reserve(num_workers);
214182
for (unsigned i = 0; i < num_workers; i++) {
215-
workers_.emplace_back(*this, i);
216-
workers_.back().start_thread();
183+
workers_.emplace_back(policy_.create_worker());
184+
workers_.back().start();
217185
}
218186

219187
// join all worker threads
220188
for (auto& worker : workers_) {
221-
worker.join_thread();
189+
worker.join();
222190
}
223191
}
224192

@@ -336,7 +304,8 @@ void Scheduler::next() { // NOLINT
336304
}
337305

338306
Scheduler::Scheduler(Environment* env)
339-
: using_workers_(env->num_workers() > 1)
307+
: policy_(*this)
308+
, using_workers_(env->num_workers() > 1)
340309
, environment_(env)
341310
, ready_queue_(env->num_workers()) {}
342311

0 commit comments

Comments
 (0)