Skip to content

Commit 58e6ffe

Browse files
committed
copy the original scheduler to grouped_scheduler
1 parent 16b4708 commit 58e6ffe

File tree

7 files changed

+553
-5
lines changed

7 files changed

+553
-5
lines changed

include/reactor-cpp/action.hh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public:
4545

4646
friend class Reaction;
4747
friend class Scheduler;
48+
friend class GroupScheduler;
4849
};
4950

5051
template <class T> class Action : public BaseAction {

include/reactor-cpp/environment.hh

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111

1212
#include <set>
1313
#include <string>
14+
#include <utility>
1415
#include <vector>
1516

16-
#include "reactor-cpp/dependency_graph.hh"
17+
#include "dependency_graph.hh"
18+
#include "group_scheduler.hh"
1719
#include "reactor.hh"
18-
#include "scheduler.hh"
1920

2021
namespace reactor {
2122

@@ -39,7 +40,7 @@ private:
3940
std::set<Reaction*> reactions_{};
4041
std::vector<Dependency> dependencies_{};
4142

42-
Scheduler scheduler_;
43+
GroupScheduler scheduler_;
4344
Phase phase_{Phase::Construction};
4445
TimePoint start_time_{};
4546

@@ -73,9 +74,9 @@ public:
7374

7475
[[nodiscard]] auto top_level_reactors() const noexcept -> const auto& { return top_level_reactors_; }
7576
[[nodiscard]] auto phase() const noexcept -> Phase { return phase_; }
76-
[[nodiscard]] auto scheduler() const noexcept -> const Scheduler* { return &scheduler_; }
77+
[[nodiscard]] auto scheduler() const noexcept -> const GroupScheduler* { return &scheduler_; }
7778

78-
auto scheduler() noexcept -> Scheduler* { return &scheduler_; }
79+
auto scheduler() noexcept -> GroupScheduler* { return &scheduler_; }
7980

8081
[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); }
8182
[[nodiscard]] auto start_time() const noexcept -> const TimePoint& { return start_time_; }

include/reactor-cpp/fwd.hh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class Environment;
1717
class Reaction;
1818
class Reactor;
1919
class Scheduler;
20+
class GroupScheduler;
2021
class Tag;
2122

2223
template <class T> class Action;
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright (C) 2019 TU Dresden
3+
* All rights reserved.
4+
*
5+
* Authors:
6+
* Christian Menard
7+
*/
8+
9+
#ifndef REACTOR_CPP_GROUP_SCHEDULER_HH
10+
#define REACTOR_CPP_GROUP_SCHEDULER_HH
11+
12+
#include <condition_variable>
13+
#include <functional>
14+
#include <future>
15+
#include <map>
16+
#include <mutex>
17+
#include <set>
18+
#include <thread>
19+
#include <vector>
20+
21+
#include "fwd.hh"
22+
#include "logical_time.hh"
23+
#include "semaphore.hh"
24+
25+
namespace reactor {
26+
27+
// forward declarations
28+
class GroupScheduler;
29+
class GroupWorker;
30+
31+
class GroupWorker { // NOLINT
32+
public:
33+
GroupScheduler& scheduler_;
34+
const unsigned int identity_{0};
35+
std::thread thread_{};
36+
37+
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
38+
static thread_local const GroupWorker* current_worker;
39+
40+
void work() const;
41+
void execute_reaction(Reaction* reaction) const;
42+
43+
GroupWorker(GroupScheduler& scheduler, unsigned int identity)
44+
: scheduler_{scheduler}
45+
, identity_{identity} {}
46+
GroupWorker(GroupWorker&& worker); // NOLINT(performance-noexcept-move-constructor)
47+
GroupWorker(const GroupWorker& worker) = delete;
48+
49+
void start_thread() { thread_ = std::thread(&GroupWorker::work, this); }
50+
void join_thread() { thread_.join(); }
51+
52+
static auto current_worker_id() -> unsigned { return current_worker->identity_; }
53+
};
54+
55+
class GroupReadyQueue {
56+
private:
57+
std::vector<Reaction*> queue_{};
58+
std::atomic<std::ptrdiff_t> size_{0};
59+
Semaphore sem_{0};
60+
std::ptrdiff_t waiting_workers_{0};
61+
const unsigned int num_workers_;
62+
63+
public:
64+
explicit GroupReadyQueue(unsigned num_workers)
65+
: num_workers_(num_workers) {}
66+
67+
/**
68+
* Retrieve a ready reaction from the queue.
69+
*
70+
* This method may be called concurrently. In case the queue is empty, the
71+
* method blocks and waits until a ready reaction becomes available.
72+
*/
73+
auto pop() -> Reaction*;
74+
75+
/**
76+
* Fill the queue up with ready reactions.
77+
*
78+
* This method assumes that the internal queue is empty. It moves all
79+
* reactions from the provided `ready_reactions` vector to the internal
80+
* queue, leaving `ready_reactions` empty.
81+
*
82+
* Note that this method is not thread-safe. The caller needs to ensure that
83+
* no other thread will try to read from the queue during this operation.
84+
*/
85+
void fill_up(std::vector<Reaction*>& ready_reactions);
86+
};
87+
88+
using EventMap = std::map<BaseAction*, std::function<void(void)>>;
89+
90+
class GroupScheduler { // NOLINT
91+
private:
92+
const bool using_workers_;
93+
LogicalTime logical_time_{};
94+
95+
Environment* environment_;
96+
std::vector<GroupWorker> workers_{};
97+
98+
std::mutex scheduling_mutex_;
99+
std::unique_lock<std::mutex> scheduling_lock_{scheduling_mutex_, std::defer_lock};
100+
std::condition_variable cv_schedule_;
101+
102+
std::mutex lock_event_queue_;
103+
std::map<Tag, EventMap> event_queue_;
104+
105+
std::vector<std::vector<BasePort*>> set_ports_;
106+
std::vector<std::vector<Reaction*>> triggered_reactions_;
107+
108+
std::vector<std::vector<Reaction*>> reaction_queue_;
109+
unsigned int reaction_queue_pos_{std::numeric_limits<unsigned>::max()};
110+
111+
GroupReadyQueue ready_queue_;
112+
std::atomic<std::ptrdiff_t> reactions_to_process_{0}; // NOLINT
113+
114+
std::atomic<bool> stop_{false};
115+
bool continue_execution_{true};
116+
117+
void schedule() noexcept;
118+
auto schedule_ready_reactions() -> bool;
119+
void next();
120+
void terminate_all_workers();
121+
void set_port_helper(BasePort* port);
122+
123+
public:
124+
explicit GroupScheduler(Environment* env);
125+
~GroupScheduler();
126+
127+
void schedule_sync(const Tag& tag, BaseAction* action, std::function<void(void)> pre_handler);
128+
void schedule_async(const Tag& tag, BaseAction* action, std::function<void(void)> pre_handler);
129+
130+
void inline lock() noexcept { scheduling_lock_.lock(); }
131+
void inline unlock() noexcept { scheduling_lock_.unlock(); }
132+
133+
void set_port(BasePort* port);
134+
135+
[[nodiscard]] inline auto logical_time() const noexcept -> const auto& { return logical_time_; }
136+
137+
void start();
138+
void stop();
139+
140+
friend GroupWorker;
141+
};
142+
143+
} // namespace reactor
144+
145+
#endif // REACTOR_CPP_GROUP_SCHEDULER_HH

include/reactor-cpp/port.hh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public:
5757

5858
friend class Reaction;
5959
friend class Scheduler;
60+
friend class GroupScheduler;
6061
};
6162

6263
template <class T> class Port : public BasePort {

lib/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ set(SOURCE_FILES
33
assert.cc
44
dependency_graph.cc
55
environment.cc
6+
group_scheduler.cc
67
logical_time.cc
78
port.cc
89
reaction.cc

0 commit comments

Comments
 (0)