Skip to content

Commit f85a3b8

Browse files
committed
fix bug in termination procedure
1 parent 27abf26 commit f85a3b8

File tree

2 files changed

+26
-16
lines changed

2 files changed

+26
-16
lines changed

include/reactor-cpp/grouped_scheduling_policy.hh

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ private:
6161
void schedule();
6262
auto finalize_group_and_notify_successors(ReactionGroup* group, std::vector<ReactionGroup*>& out_ready_groups) -> bool;
6363
void notify_groups(const std::vector<ReactionGroup*>& groups, std::vector<ReactionGroup*>& out_ready_groups);
64+
void terminate_workers();
6465

6566
public:
6667
GroupedSchedulingPolicy(Scheduler<GroupedSchedulingPolicy>& scheduler, Environment& env);

lib/grouped_scheduling_policy.cc

+25-16
Original file line numberDiff line numberDiff line change
@@ -127,21 +127,13 @@ auto GroupedSchedulingPolicy::create_worker() -> Worker<GroupedSchedulingPolicy>
127127

128128
void GroupedSchedulingPolicy::schedule() {
129129
group_queue_.reset();
130-
if (continue_execution_.load(std::memory_order_acquire)) {
131-
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id() << ") call next";
132-
bool continue_execution = scheduler_.next();
133-
std::atomic_thread_fence(std::memory_order_release);
134-
if (!continue_execution) {
135-
continue_execution_.store(false, std::memory_order_relaxed);
136-
}
137-
groups_to_process_.store(num_groups_, std::memory_order_relaxed);
138-
} else {
139-
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id()
140-
<< ") signal all workers to terminate";
141-
std::vector<ReactionGroup*> null_groups_(environment_.num_workers() + 1, nullptr);
142-
group_queue_.push(null_groups_);
143-
groups_to_process_.store(environment_.num_workers(), std::memory_order_release);
130+
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id() << ") call next";
131+
bool continue_execution = scheduler_.next();
132+
std::atomic_thread_fence(std::memory_order_release);
133+
if (!continue_execution) {
134+
continue_execution_.store(false, std::memory_order_relaxed);
144135
}
136+
groups_to_process_.store(num_groups_, std::memory_order_relaxed);
145137
}
146138

147139
auto GroupedSchedulingPolicy::finalize_group_and_notify_successors(ReactionGroup* group,
@@ -173,6 +165,14 @@ void GroupedSchedulingPolicy::notify_groups(const std::vector<ReactionGroup*>& g
173165
std::atomic_thread_fence(std::memory_order_acquire);
174166
}
175167

168+
void GroupedSchedulingPolicy::terminate_workers() {
169+
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id()
170+
<< ") signal all workers to terminate";
171+
std::vector<ReactionGroup*> null_groups_(environment_.num_workers() + 1, nullptr);
172+
group_queue_.push(null_groups_);
173+
groups_to_process_.store(environment_.num_workers(), std::memory_order_release);
174+
}
175+
176176
void GroupedSchedulingPolicy::worker_function(const Worker<GroupedSchedulingPolicy>& worker) {
177177
// This is used as a list for storing new ready groups found while processing a group.
178178
std::vector<ReactionGroup*> ready_groups;
@@ -206,8 +206,17 @@ void GroupedSchedulingPolicy::worker_function(const Worker<GroupedSchedulingPoli
206206
bool need_to_schedule = finalize_group_and_notify_successors(group, ready_groups);
207207

208208
if (need_to_schedule) {
209-
schedule();
210-
notify_groups(initial_groups_, ready_groups);
209+
// We use a do-while loop here as we could have scheduled events that do not trigger any reactions.
210+
// In this case, ready_groups will be empty and we can simply call schedule again.
211+
do {
212+
if (continue_execution_.load(std::memory_order_acquire)) {
213+
schedule();
214+
notify_groups(initial_groups_, ready_groups);
215+
} else {
216+
terminate_workers();
217+
break;
218+
}
219+
} while (ready_groups.empty());
211220
}
212221

213222
log::Debug() << "(Worker " << worker.id() << ") found " << ready_groups.size()

0 commit comments

Comments
 (0)