Skip to content

Commit f761590

Browse files
committed
Finished and tested basic features of Worker
Signed-off-by: Michael X. Grey <[email protected]>
1 parent 3db67ee commit f761590

File tree

6 files changed

+300
-16
lines changed

6 files changed

+300
-16
lines changed

rclrs/src/executor.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod basic_executor;
22
pub use self::basic_executor::*;
33

4-
use crate::{Context, ContextHandle, GuardCondition, IntoNodeOptions, Node, RclrsError, Waitable};
4+
use crate::{WeakActivityListener, Context, ContextHandle, GuardCondition, IntoNodeOptions, Node, RclrsError, Waitable};
55
pub use futures::channel::oneshot::Receiver as Promise;
66
use futures::{
77
channel::oneshot,
@@ -210,6 +210,23 @@ impl ExecutorCommands {
210210
receiver
211211
}
212212

213+
/// Pass in a promise to get a second promise that will notify when the main
214+
/// promise is fulfilled. This second promise can be passed into
215+
/// [`SpinOptions::until_promise_resolved`].
216+
pub fn create_notice<Out>(&self, promise: Promise<Out>) -> (Promise<Out>, Promise<()>)
217+
where
218+
Out: 'static + Send,
219+
{
220+
let (main_sender, main_receiver) = oneshot::channel();
221+
let notice_receiver = self.run(async move {
222+
if let Ok(out) = promise.await {
223+
main_sender.send(out).ok();
224+
}
225+
});
226+
227+
(main_receiver, notice_receiver)
228+
}
229+
213230
/// Get the context that the executor is associated with.
214231
pub fn context(&self) -> &Context {
215232
&self.context
@@ -282,6 +299,10 @@ impl WorkerCommands {
282299
self.channel.send_payload_task(task);
283300
}
284301

302+
pub(crate) fn add_activity_listener(&self, listener: WeakActivityListener) {
303+
self.channel.add_activity_listener(listener);
304+
}
305+
285306
/// Get a guard condition that can be used to wake up the wait set of the executor.
286307
pub(crate) fn get_guard_condition(&self) -> &Arc<GuardCondition> {
287308
&self.wakeup_wait_set
@@ -298,6 +319,9 @@ pub trait WorkerChannel: Send + Sync {
298319

299320
/// Send a one-time task for the worker to run with its payload.
300321
fn send_payload_task(&self, f: PayloadTask);
322+
323+
/// Send something to listen to worker activity.
324+
fn add_activity_listener(&self, listener: WeakActivityListener);
301325
}
302326

303327
/// Encapsulates a task that can operate on the payload of a worker

rclrs/src/executor/basic_executor.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::{
1616
};
1717

1818
use crate::{
19-
ExecutorChannel, ExecutorRuntime, SpinConditions, WorkerChannel,
19+
WeakActivityListener, ExecutorChannel, ExecutorRuntime, SpinConditions, WorkerChannel,
2020
RclrsError, WaitSetRunner, WaitSetRunConditions, Waitable, log_warn, log_fatal, ToLogParams,
2121
GuardCondition, ExecutorWorkerOptions, PayloadTask,
2222
};
@@ -280,6 +280,7 @@ impl ExecutorChannel for BasicExecutorChannel {
280280
let runner = WaitSetRunner::new(options);
281281
let waitable_sender = runner.waitable_sender();
282282
let payload_task_sender = runner.payload_task_sender();
283+
let activity_listeners = runner.activity_listeners();
283284

284285
if let Err(err) = self.new_worker_sender.unbounded_send(runner) {
285286
log_fatal!(
@@ -292,6 +293,7 @@ impl ExecutorChannel for BasicExecutorChannel {
292293
waitable_sender,
293294
task_sender: self.task_sender.clone(),
294295
payload_task_sender,
296+
activity_listeners,
295297
})
296298
}
297299

@@ -304,6 +306,7 @@ struct BasicWorkerChannel {
304306
task_sender: TaskSender,
305307
waitable_sender: UnboundedSender<Waitable>,
306308
payload_task_sender: UnboundedSender<PayloadTask>,
309+
activity_listeners: Arc<Mutex<Vec<WeakActivityListener>>>,
307310
}
308311

309312
impl WorkerChannel for BasicWorkerChannel {
@@ -319,6 +322,10 @@ impl WorkerChannel for BasicWorkerChannel {
319322
fn send_payload_task(&self, f: PayloadTask) {
320323
self.payload_task_sender.unbounded_send(f).ok();
321324
}
325+
326+
fn add_activity_listener(&self, listener: WeakActivityListener) {
327+
self.activity_listeners.lock().unwrap().push(listener);
328+
}
322329
}
323330

324331
#[derive(Clone)]

rclrs/src/service/worker_service_callback.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ where
3636
any_payload: &mut dyn Any,
3737
) -> Result<(), RclrsError> {
3838
let Some(payload) = any_payload.downcast_mut::<Payload>() else {
39+
dbg!();
3940
return Err(RclrsError::InvalidPayload {
4041
expected: std::any::TypeId::of::<Payload>(),
4142
received: (*any_payload).type_id(),

rclrs/src/subscription/worker_subscription_callback.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl<T: Message, Payload: 'static> WorkerSubscriptionCallback<T, Payload> {
4040
any_payload: &mut dyn Any,
4141
) -> Result<(), RclrsError> {
4242
let Some(payload) = any_payload.downcast_mut::<Payload>() else {
43+
dbg!();
4344
return Err(RclrsError::InvalidPayload {
4445
expected: std::any::TypeId::of::<Payload>(),
4546
received: (*any_payload).type_id(),

rclrs/src/wait_set/wait_set_runner.rs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use futures::channel::{
55

66
use std::{
77
any::Any,
8-
sync::{atomic::{AtomicBool, Ordering}, Arc},
8+
sync::{atomic::{AtomicBool, Ordering}, Arc, Mutex},
99
time::{Duration, Instant},
1010
};
1111

1212
use crate::{
1313
Context, Promise, RclrsError, WaitSet, Waitable, GuardCondition, ExecutorWorkerOptions,
14-
PayloadTask,
14+
PayloadTask, WeakActivityListener, ActivityListenerCallback,
1515
};
1616

1717
/// This is a utility class that executors can use to easily run and manage
@@ -22,6 +22,7 @@ pub struct WaitSetRunner {
2222
waitable_receiver: UnboundedReceiver<Waitable>,
2323
task_sender: UnboundedSender<PayloadTask>,
2424
task_receiver: UnboundedReceiver<PayloadTask>,
25+
activity_listeners: Arc<Mutex<Vec<WeakActivityListener>>>,
2526
guard_condition: Arc<GuardCondition>,
2627
payload: Box<dyn Any + Send>,
2728
}
@@ -62,6 +63,7 @@ impl WaitSetRunner {
6263
waitable_receiver,
6364
task_sender,
6465
task_receiver,
66+
activity_listeners: Arc::default(),
6567
guard_condition: worker_options.guard_condition,
6668
payload: worker_options.payload,
6769
}
@@ -79,6 +81,12 @@ impl WaitSetRunner {
7981
self.task_sender.clone()
8082
}
8183

84+
/// Get the group of senders that will be triggered each time the wait set
85+
/// is woken up. This is used
86+
pub fn activity_listeners(&self) -> Arc<Mutex<Vec<WeakActivityListener>>> {
87+
Arc::clone(&self.activity_listeners)
88+
}
89+
8290
/// Get the guard condition associated with the wait set of this runner.
8391
pub fn guard_condition(&self) -> &Arc<GuardCondition> {
8492
&self.guard_condition
@@ -111,6 +119,7 @@ impl WaitSetRunner {
111119
/// will be triggered after the user-provided promise is resolved.
112120
pub fn run_blocking(&mut self, conditions: WaitSetRunConditions) -> Result<(), RclrsError> {
113121
let mut first_spin = true;
122+
let mut listeners = Vec::new();
114123
loop {
115124
// TODO(@mxgrey): SmallVec would be better suited here if we are
116125
// okay with adding that as a dependency.
@@ -124,7 +133,7 @@ impl WaitSetRunner {
124133
}
125134

126135
while let Ok(Some(task)) = self.task_receiver.try_next() {
127-
task(&mut self.payload);
136+
task(&mut *self.payload);
128137
}
129138

130139
if conditions.only_next_available_work && !first_spin {
@@ -154,14 +163,57 @@ impl WaitSetRunner {
154163
}
155164
});
156165

166+
let mut at_least_one = false;
157167
self.wait_set.wait(timeout, |executable| {
168+
at_least_one = true;
158169
// SAFETY: The user of WaitSetRunner is responsible for ensuring
159170
// the runner has the same payload type as the executables that
160171
// are given to it.
161172
unsafe {
162-
executable.execute(&mut self.payload)
173+
executable.execute(&mut *self.payload)
163174
}
164175
})?;
176+
177+
if at_least_one {
178+
// We drain all listeners from activity_listeners to ensure that we
179+
// don't get a deadlock from double-locking the activity_listeners
180+
// mutex while executing one of the listeners. If the listener has
181+
// access to the Worker<T> then it could attempt to add another
182+
// listener while we have the vector locked, which would cause a
183+
// deadlock.
184+
listeners.extend(
185+
self.activity_listeners.lock().unwrap().drain(..)
186+
.filter_map(|x| x.upgrade())
187+
);
188+
189+
for arc_listener in &listeners {
190+
// We pull the callback out of its mutex entirely and release
191+
// the lock on the mutex before executing the callback. Otherwise
192+
// if the callback triggers its own WorkerActivity to change the
193+
// callback then we would get a deadlock from double-locking the
194+
// mutex.
195+
let listener = { arc_listener.lock().unwrap().take() };
196+
if let Some(mut listener) = listener {
197+
match &mut listener {
198+
ActivityListenerCallback::Listen(listen) => {
199+
listen(&mut *self.payload);
200+
}
201+
ActivityListenerCallback::Inert => {
202+
// Do nothing
203+
}
204+
}
205+
206+
// We replace instead of assigning in case the callback
207+
// inserted its own
208+
arc_listener.lock().unwrap().replace(listener);
209+
}
210+
}
211+
212+
self.activity_listeners.lock().unwrap().extend(
213+
listeners.drain(..)
214+
.map(|x| Arc::downgrade(&x))
215+
);
216+
}
165217
}
166218
}
167219
}

0 commit comments

Comments
 (0)