Skip to content

Commit 3db67ee

Browse files
committed
Finished initial Worker implementation - needs testing
Signed-off-by: Michael X. Grey <[email protected]>
1 parent 49379f1 commit 3db67ee

File tree

7 files changed

+171
-27
lines changed

7 files changed

+171
-27
lines changed

rclrs/src/executor.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,24 +267,21 @@ pub(crate) struct WorkerCommands {
267267
}
268268

269269
impl WorkerCommands {
270-
pub(crate) fn new(
271-
channel: Arc<dyn WorkerChannel>,
272-
wakeup_wait_set: Arc<GuardCondition>,
273-
) -> Self {
274-
Self { channel, wakeup_wait_set }
275-
}
276-
277270
pub(crate) fn add_to_wait_set(&self, waitable: Waitable) {
278271
self.channel.add_to_waitset(waitable);
279272
}
280273

281-
pub(crate) fn run<F>(&self, f: F)
274+
pub(crate) fn run_async<F>(&self, f: F)
282275
where
283276
F: 'static + Future<Output = ()> + Send,
284277
{
285278
self.channel.add_async_task(Box::pin(f));
286279
}
287280

281+
pub(crate) fn run_on_payload(&self, task: PayloadTask) {
282+
self.channel.send_payload_task(task);
283+
}
284+
288285
/// Get a guard condition that can be used to wake up the wait set of the executor.
289286
pub(crate) fn get_guard_condition(&self) -> &Arc<GuardCondition> {
290287
&self.wakeup_wait_set
@@ -298,8 +295,14 @@ pub trait WorkerChannel: Send + Sync {
298295

299296
/// Add new entities to the waitset of the executor.
300297
fn add_to_waitset(&self, new_entity: Waitable);
298+
299+
/// Send a one-time task for the worker to run with its payload.
300+
fn send_payload_task(&self, f: PayloadTask);
301301
}
302302

303+
/// Encapsulates a task that can operate on the payload of a worker
304+
pub type PayloadTask = Box<dyn FnOnce(&mut dyn Any) + Send>;
305+
303306
/// This is constructed by [`ExecutorCommands`] and passed to the [`ExecutorRuntime`]
304307
/// to create a new worker. Downstream users of rclrs should not be using this class
305308
/// unless you are implementing your own [`ExecutorRuntime`].

rclrs/src/executor/basic_executor.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{
1818
use crate::{
1919
ExecutorChannel, ExecutorRuntime, SpinConditions, WorkerChannel,
2020
RclrsError, WaitSetRunner, WaitSetRunConditions, Waitable, log_warn, log_fatal, ToLogParams,
21-
GuardCondition, ExecutorWorkerOptions,
21+
GuardCondition, ExecutorWorkerOptions, PayloadTask,
2222
};
2323

2424
static FAILED_TO_SEND_WORKER: &'static str =
@@ -278,7 +278,8 @@ impl ExecutorChannel for BasicExecutorChannel {
278278
options: ExecutorWorkerOptions,
279279
) -> Arc<dyn WorkerChannel> {
280280
let runner = WaitSetRunner::new(options);
281-
let waitable_sender = runner.sender();
281+
let waitable_sender = runner.waitable_sender();
282+
let payload_task_sender = runner.payload_task_sender();
282283

283284
if let Err(err) = self.new_worker_sender.unbounded_send(runner) {
284285
log_fatal!(
@@ -290,6 +291,7 @@ impl ExecutorChannel for BasicExecutorChannel {
290291
Arc::new(BasicWorkerChannel {
291292
waitable_sender,
292293
task_sender: self.task_sender.clone(),
294+
payload_task_sender,
293295
})
294296
}
295297

@@ -301,6 +303,7 @@ impl ExecutorChannel for BasicExecutorChannel {
301303
struct BasicWorkerChannel {
302304
task_sender: TaskSender,
303305
waitable_sender: UnboundedSender<Waitable>,
306+
payload_task_sender: UnboundedSender<PayloadTask>,
304307
}
305308

306309
impl WorkerChannel for BasicWorkerChannel {
@@ -312,6 +315,10 @@ impl WorkerChannel for BasicWorkerChannel {
312315
fn add_async_task(&self, f: BoxFuture<'static, ()>) {
313316
self.task_sender.add_async_task(f);
314317
}
318+
319+
fn send_payload_task(&self, f: PayloadTask) {
320+
self.payload_task_sender.unbounded_send(f).ok();
321+
}
315322
}
316323

317324
#[derive(Clone)]

rclrs/src/node.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{
3434
Publisher, PublisherOptions, PublisherState, RclrsError, Service, IntoAsyncServiceCallback,
3535
IntoNodeServiceCallback, ServiceOptions, ServiceState, Subscription, IntoAsyncSubscriptionCallback,
3636
IntoNodeSubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams,
37-
ENTITY_LIFECYCLE_MUTEX,
37+
ENTITY_LIFECYCLE_MUTEX, IntoWorkerOptions, Worker, WorkerState,
3838
};
3939

4040
/// A processing unit that can communicate with other nodes.
@@ -236,6 +236,21 @@ impl NodeState {
236236
self.call_string_getter(rcl_node_get_fully_qualified_name)
237237
}
238238

239+
/// Create a new [`Worker`] for this Node.
240+
//
241+
// TODO(@mxgrey): Write some usage examples.
242+
pub fn create_worker<'a, Payload>(
243+
&self,
244+
options: impl IntoWorkerOptions<Payload>,
245+
) -> Worker<Payload>
246+
where
247+
Payload: 'static + Send,
248+
{
249+
let options = options.into_worker_options();
250+
let commands = self.commands.create_worker_commands(Box::new(options.payload));
251+
WorkerState::create(Arc::clone(&self.handle), commands)
252+
}
253+
239254
/// Creates a [`Client`][1].
240255
///
241256
/// Pass in only the service name for the `options` argument to use all default client options:

rclrs/src/service/node_service_callback.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl<T: Service> NodeServiceCallback<T> {
3434
NodeServiceCallback::OnlyRequest(cb) => {
3535
let (msg, mut rmw_request_id) = handle.take_request::<T>()?;
3636
let response = cb(msg);
37-
commands.run(async move {
37+
commands.run_async(async move {
3838
if let Err(err) = handle.send_response::<T>(&mut rmw_request_id, response.await) {
3939
log_service_send_error(&*handle, rmw_request_id, err);
4040
}
@@ -44,7 +44,7 @@ impl<T: Service> NodeServiceCallback<T> {
4444
let (msg, mut rmw_request_id) = handle.take_request::<T>()?;
4545
let request_id = RequestId::from_rmw_request_id(&rmw_request_id);
4646
let response = cb(msg, request_id);
47-
commands.run(async move {
47+
commands.run_async(async move {
4848
if let Err(err) = handle.send_response::<T>(&mut rmw_request_id, response.await) {
4949
log_service_send_error(&*handle, rmw_request_id, err);
5050
}
@@ -55,7 +55,7 @@ impl<T: Service> NodeServiceCallback<T> {
5555
let mut rmw_request_id = rmw_service_info.rmw_request_id();
5656
let service_info = ServiceInfo::from_rmw_service_info(&rmw_service_info);
5757
let response = cb(msg, service_info);
58-
commands.run(async move {
58+
commands.run_async(async move {
5959
if let Err(err) = handle.send_response::<T>(&mut rmw_request_id, response.await) {
6060
log_service_send_error(&*handle, rmw_request_id, err);
6161
}

rclrs/src/subscription/node_subscription_callback.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,27 +44,27 @@ impl<T: Message> NodeSubscriptionCallback<T> {
4444
match self {
4545
NodeSubscriptionCallback::Regular(cb) => {
4646
let (msg, _) = handle.take::<T>()?;
47-
commands.run(cb(msg));
47+
commands.run_async(cb(msg));
4848
}
4949
NodeSubscriptionCallback::RegularWithMessageInfo(cb) => {
5050
let (msg, msg_info) = handle.take::<T>()?;
51-
commands.run(cb(msg, msg_info));
51+
commands.run_async(cb(msg, msg_info));
5252
}
5353
NodeSubscriptionCallback::Boxed(cb) => {
5454
let (msg, _) = handle.take_boxed::<T>()?;
55-
commands.run(cb(msg));
55+
commands.run_async(cb(msg));
5656
}
5757
NodeSubscriptionCallback::BoxedWithMessageInfo(cb) => {
5858
let (msg, msg_info) = handle.take_boxed::<T>()?;
59-
commands.run(cb(msg, msg_info));
59+
commands.run_async(cb(msg, msg_info));
6060
}
6161
NodeSubscriptionCallback::Loaned(cb) => {
6262
let (msg, _) = handle.take_loaned::<T>()?;
63-
commands.run(cb(msg));
63+
commands.run_async(cb(msg));
6464
}
6565
NodeSubscriptionCallback::LoanedWithMessageInfo(cb) => {
6666
let (msg, msg_info) = handle.take_loaned::<T>()?;
67-
commands.run(cb(msg, msg_info));
67+
commands.run_async(cb(msg, msg_info));
6868
}
6969
}
7070
Ok(())

rclrs/src/wait_set/wait_set_runner.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::{
1111

1212
use crate::{
1313
Context, Promise, RclrsError, WaitSet, Waitable, GuardCondition, ExecutorWorkerOptions,
14+
PayloadTask,
1415
};
1516

1617
/// This is a utility class that executors can use to easily run and manage
@@ -19,6 +20,8 @@ pub struct WaitSetRunner {
1920
wait_set: WaitSet,
2021
waitable_sender: UnboundedSender<Waitable>,
2122
waitable_receiver: UnboundedReceiver<Waitable>,
23+
task_sender: UnboundedSender<PayloadTask>,
24+
task_receiver: UnboundedReceiver<PayloadTask>,
2225
guard_condition: Arc<GuardCondition>,
2326
payload: Box<dyn Any + Send>,
2427
}
@@ -49,24 +52,33 @@ impl WaitSetRunner {
4952
worker_options: ExecutorWorkerOptions,
5053
) -> Self {
5154
let (waitable_sender, waitable_receiver) = unbounded();
55+
let (task_sender, task_receiver) = unbounded();
5256
Self {
5357
wait_set: WaitSet::new(&worker_options.context)
5458
// SAFETY: This only gets called from Context which ensures that
5559
// everything is valid when creating a wait set.
5660
.expect("Unable to create wait set for basic executor"),
5761
waitable_sender,
5862
waitable_receiver,
63+
task_sender,
64+
task_receiver,
5965
guard_condition: worker_options.guard_condition,
6066
payload: worker_options.payload,
6167
}
6268
}
6369

64-
/// Get the sender that allows users to send new [`Waitables`] to this
70+
/// Get the sender that allows users to send new [`Waitable`]s to this
6571
/// `WaitSetRunner`.
66-
pub fn sender(&self) -> UnboundedSender<Waitable> {
72+
pub fn waitable_sender(&self) -> UnboundedSender<Waitable> {
6773
self.waitable_sender.clone()
6874
}
6975

76+
/// Get the sender that allows users to send new [`PayloadTask`]s to this
77+
/// `WaitSetRunner`.
78+
pub fn payload_task_sender(&self) -> UnboundedSender<PayloadTask> {
79+
self.task_sender.clone()
80+
}
81+
7082
/// Get the guard condition associated with the wait set of this runner.
7183
pub fn guard_condition(&self) -> &Arc<GuardCondition> {
7284
&self.guard_condition
@@ -111,6 +123,10 @@ impl WaitSetRunner {
111123
self.wait_set.add(new_waitables).ok();
112124
}
113125

126+
while let Ok(Some(task)) = self.task_receiver.try_next() {
127+
task(&mut self.payload);
128+
}
129+
114130
if conditions.only_next_available_work && !first_spin {
115131
// We've already completed a spin and were asked to only do one,
116132
// so break here

rclrs/src/worker.rs

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1-
use std::sync::Arc;
2-
use crate::{WorkerCommands};
1+
use rosidl_runtime_rs::{Message, Service as IdlService};
2+
use std::{any::Any, sync::Arc};
3+
use futures::channel::oneshot;
4+
use crate::{
5+
WorkerCommands, NodeHandle, ToLogParams, Promise, log_fatal,
6+
IntoWorkerSubscriptionCallback, IntoWorkerServiceCallback,
7+
WorkerSubscription, SubscriptionState, WorkerService, ServiceState,
8+
SubscriptionOptions, ServiceOptions, RclrsError,
9+
};
310

411
/// A worker that carries a payload and synchronizes callbacks for subscriptions
512
/// and services.
@@ -25,12 +32,108 @@ pub type Worker<Payload> = Arc<WorkerState<Payload>>;
2532
///
2633
/// [1]: std::sync::Weak
2734
pub struct WorkerState<Payload> {
35+
/// The node that this worker is associated with
36+
node: Arc<NodeHandle>,
37+
/// The commands to communicate with the runtime of the worker
2838
commands: Arc<WorkerCommands>,
2939
_ignore: std::marker::PhantomData<Payload>,
3040
}
3141

32-
impl<T> WorkerState<T> {
33-
pub(crate) fn new(commands: Arc<WorkerCommands>) -> Self {
34-
Self { commands, _ignore: Default::default() }
42+
impl<Payload: 'static + Send> WorkerState<Payload> {
43+
/// Run a task on this worker. This allows you to view and modify the payload
44+
/// of the worker. You will receive a [`Promise`] which you can use to view
45+
/// what happened inside the callback.
46+
pub fn run<Out, F>(&self, f: F) -> Promise<Out>
47+
where
48+
F: FnOnce(&mut Payload) -> Out + 'static + Send,
49+
Out: 'static + Send,
50+
{
51+
let (sender, receiver) = oneshot::channel();
52+
self.commands.run_on_payload(Box::new(move |any_payload: &mut dyn Any| {
53+
let Some(payload) = any_payload.downcast_mut::<Payload>() else {
54+
log_fatal!(
55+
"rclrs.worker",
56+
"Received invalid payload from worker. Expected: {:?}, received: {:?}. \
57+
This should never happen. Please report this to the maintainers of rclrs \
58+
with a minimal reproducible example.",
59+
std::any::TypeId::of::<Payload>(),
60+
any_payload,
61+
);
62+
return;
63+
};
64+
65+
let out = f(payload);
66+
sender.send(out).ok();
67+
}));
68+
receiver
69+
}
70+
71+
/// Creates a [`WorkerSubscription`].
72+
pub fn create_subscription<'a, T, Args>(
73+
&self,
74+
options: impl Into<SubscriptionOptions<'a>>,
75+
callback: impl IntoWorkerSubscriptionCallback<T, Payload, Args>,
76+
) -> Result<WorkerSubscription<T, Payload>, RclrsError>
77+
where
78+
T: Message,
79+
{
80+
SubscriptionState::<T, Worker<Payload>>::create(
81+
options,
82+
callback.into_worker_subscription_callback(),
83+
&self.node,
84+
&self.commands,
85+
)
86+
}
87+
88+
/// Creates a [`WorkerService`].
89+
pub fn create_service<'a, T, Args>(
90+
&self,
91+
options: impl Into<ServiceOptions<'a>>,
92+
callback: impl IntoWorkerServiceCallback<T, Payload, Args>,
93+
) -> Result<WorkerService<T, Payload>, RclrsError>
94+
where
95+
T: IdlService,
96+
{
97+
ServiceState::<T, Worker<Payload>>::create(
98+
options,
99+
callback.into_worker_service_callback(),
100+
&self.node,
101+
&self.commands,
102+
)
103+
}
104+
105+
/// Used by [`Node`][crate::Node] to create a `WorkerState`. Users should
106+
/// call [`Node::create_worker`][crate::NodeState::create_worker] instead of
107+
/// this.
108+
pub(crate) fn create(
109+
node: Arc<NodeHandle>,
110+
commands: Arc<WorkerCommands>,
111+
) -> Arc<Self> {
112+
Arc::new(Self { node, commands, _ignore: Default::default() })
113+
}
114+
}
115+
116+
/// Options used while creating a new [`Worker`].
117+
pub struct WorkerOptions<Payload> {
118+
/// The value of the initial payload for the [`Worker`].
119+
pub payload: Payload,
120+
}
121+
122+
impl<Payload> WorkerOptions<Payload> {
123+
/// Create a new `WorkerOptions`.
124+
pub fn new(payload: Payload) -> Self {
125+
Self { payload }
126+
}
127+
}
128+
129+
/// Implicitly convert something into [`WorkerOptions`].
130+
pub trait IntoWorkerOptions<Payload> {
131+
/// Convert an object into [`WorkerOptions`]. Users do not need to call this.
132+
fn into_worker_options(self) -> WorkerOptions<Payload>;
133+
}
134+
135+
impl<Payload> IntoWorkerOptions<Payload> for Payload {
136+
fn into_worker_options(self) -> WorkerOptions<Payload> {
137+
WorkerOptions::new(self)
35138
}
36139
}

0 commit comments

Comments
 (0)