Skip to content

Commit 25316ff

Browse files
Sam Luryefacebook-github-bot
authored andcommitted
Replace &Instance<A> with &Context<'_, A> in the Handler trait (#368)
Summary: Pull Request resolved: #368 In order to make the headers accessible to message handlers in a cleaner way, this diff introduces a new struct `Context<'_, A>` which contains a `&Instance<A>` and the headers associated with the current message. As per T228176204, the `Handler` trait now takes a `&Context<Self>` for the `this` parameter. `Context` has the same capabilities as the contained `Instance`. Moreover, I have implemented `Deref` for `Context` to return `&Instance` -- with deref coercion, this means that no code has to change except for inside the method signatures (this deref coercion is also why the `Handler` takes `&Context` instead of `&Instance`. Disclaimer: many of these changes were made by devmate. Reviewed By: shayne-fletcher, mariusae Differential Revision: D77341169 fbshipit-source-id: 4b7309b04beca43249a881d4c8a435f007caea3f
1 parent abbc708 commit 25316ff

File tree

35 files changed

+394
-333
lines changed

35 files changed

+394
-333
lines changed

controller/src/lib.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use async_trait::async_trait;
2222
use hyperactor::Actor;
2323
use hyperactor::ActorId;
2424
use hyperactor::ActorRef;
25+
use hyperactor::Context;
2526
use hyperactor::GangId;
2627
use hyperactor::Handler;
2728
use hyperactor::Named;
@@ -212,7 +213,7 @@ impl ControllerActor {
212213
// M = self.worker_progress_check_interval
213214
async fn request_status_if_needed(
214215
&mut self,
215-
this: &hyperactor::Instance<Self>,
216+
this: &Context<'_, Self>,
216217
) -> Result<(), anyhow::Error> {
217218
if let Some((expected_seq, ..)) = self.history.deadline(
218219
self.operations_per_worker_progress_request,
@@ -259,7 +260,7 @@ struct CheckWorkerProgress;
259260
impl Handler<CheckWorkerProgress> for ControllerActor {
260261
async fn handle(
261262
&mut self,
262-
this: &hyperactor::Instance<Self>,
263+
this: &Context<Self>,
263264
_check_worker_progress: CheckWorkerProgress,
264265
) -> Result<(), anyhow::Error> {
265266
let client = self.client()?;
@@ -359,7 +360,7 @@ fn slice_to_selection(slice: Slice) -> Selection {
359360
impl ControllerMessageHandler for ControllerActor {
360361
async fn attach(
361362
&mut self,
362-
this: &hyperactor::Instance<Self>,
363+
this: &Context<Self>,
363364
client_actor: ActorRef<ClientActor>,
364365
) -> Result<(), anyhow::Error> {
365366
tracing::debug!("attaching client actor {}", client_actor);
@@ -378,7 +379,7 @@ impl ControllerMessageHandler for ControllerActor {
378379

379380
async fn node(
380381
&mut self,
381-
this: &hyperactor::Instance<Self>,
382+
this: &Context<Self>,
382383
seq: Seq,
383384
defs: Vec<Ref>,
384385
uses: Vec<Ref>,
@@ -395,7 +396,7 @@ impl ControllerMessageHandler for ControllerActor {
395396

396397
async fn drop_refs(
397398
&mut self,
398-
_this: &hyperactor::Instance<Self>,
399+
_this: &Context<Self>,
399400
refs: Vec<Ref>,
400401
) -> Result<(), anyhow::Error> {
401402
self.history.delete_invocations_for_refs(refs);
@@ -404,7 +405,7 @@ impl ControllerMessageHandler for ControllerActor {
404405

405406
async fn send(
406407
&mut self,
407-
this: &hyperactor::Instance<Self>,
408+
this: &Context<Self>,
408409
ranks: Ranks,
409410
message: Serialized,
410411
) -> Result<(), anyhow::Error> {
@@ -456,7 +457,7 @@ impl ControllerMessageHandler for ControllerActor {
456457

457458
async fn remote_function_failed(
458459
&mut self,
459-
this: &hyperactor::Instance<Self>,
460+
this: &Context<Self>,
460461
seq: Seq,
461462
error: WorkerError,
462463
) -> Result<(), anyhow::Error> {
@@ -469,7 +470,7 @@ impl ControllerMessageHandler for ControllerActor {
469470

470471
async fn status(
471472
&mut self,
472-
_this: &hyperactor::Instance<Self>,
473+
_this: &Context<Self>,
473474
seq: Seq,
474475
worker_actor_id: ActorId,
475476
controller: bool,
@@ -486,18 +487,15 @@ impl ControllerMessageHandler for ControllerActor {
486487

487488
async fn fetch_result(
488489
&mut self,
489-
_this: &hyperactor::Instance<Self>,
490+
_this: &Context<Self>,
490491
seq: Seq,
491492
result: Result<Serialized, WorkerError>,
492493
) -> Result<(), anyhow::Error> {
493494
self.history.set_result(seq, result);
494495
Ok(())
495496
}
496497

497-
async fn check_supervision(
498-
&mut self,
499-
this: &hyperactor::Instance<Self>,
500-
) -> Result<(), anyhow::Error> {
498+
async fn check_supervision(&mut self, this: &Context<Self>) -> Result<(), anyhow::Error> {
501499
let gang_id: GangId = self.worker_gang_ref.clone().into();
502500
let world_state = self
503501
.system_supervision_actor_ref
@@ -565,7 +563,7 @@ impl ControllerMessageHandler for ControllerActor {
565563

566564
async fn debugger_message(
567565
&mut self,
568-
this: &hyperactor::Instance<Self>,
566+
this: &Context<Self>,
569567
debugger_actor_id: ActorId,
570568
action: DebuggerAction,
571569
) -> Result<(), anyhow::Error> {
@@ -577,15 +575,15 @@ impl ControllerMessageHandler for ControllerActor {
577575
#[cfg(test)]
578576
async fn get_first_incomplete_seqs_unit_tests_only(
579577
&mut self,
580-
_this: &hyperactor::Instance<Self>,
578+
_this: &Context<Self>,
581579
) -> Result<Vec<Seq>, anyhow::Error> {
582580
Ok(self.history.first_incomplete_seqs().to_vec())
583581
}
584582

585583
#[cfg(not(test))]
586584
async fn get_first_incomplete_seqs_unit_tests_only(
587585
&mut self,
588-
_this: &hyperactor::Instance<Self>,
586+
_this: &Context<Self>,
589587
) -> Result<Vec<Seq>, anyhow::Error> {
590588
unimplemented!("get_first_incomplete_seqs_unit_tests_only is only for unit tests")
591589
}
@@ -616,7 +614,6 @@ mod tests {
616614

617615
use hyperactor::HandleClient;
618616
use hyperactor::Handler;
619-
use hyperactor::Instance;
620617
use hyperactor::RefClient;
621618
use hyperactor::channel;
622619
use hyperactor::channel::ChannelTransport;
@@ -1801,7 +1798,7 @@ mod tests {
18011798
impl PanickingMessageHandler for PanickingActor {
18021799
async fn panic(
18031800
&mut self,
1804-
_this: &Instance<Self>,
1801+
_this: &Context<Self>,
18051802
err_msg: String,
18061803
) -> Result<(), anyhow::Error> {
18071804
panic!("{}", err_msg);

hyper/src/commands/demo.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::time::Duration;
1212
use async_trait::async_trait;
1313
use clap::Subcommand;
1414
use hyperactor::Actor;
15+
use hyperactor::Context;
1516
use hyperactor::HandleClient;
1617
use hyperactor::Handler;
1718
use hyperactor::Named;
@@ -229,40 +230,29 @@ impl Actor for DemoActor {
229230
impl DemoMessageHandler for DemoActor {
230231
async fn echo(
231232
&mut self,
232-
_this: &hyperactor::Instance<Self>,
233+
_this: &Context<Self>,
233234
message: String,
234235
) -> Result<String, anyhow::Error> {
235236
tracing::info!("demo: message: {}", message);
236237
Ok(message)
237238
}
238239

239-
async fn increment(
240-
&mut self,
241-
_this: &hyperactor::Instance<Self>,
242-
num: u64,
243-
) -> Result<u64, anyhow::Error> {
240+
async fn increment(&mut self, _this: &Context<Self>, num: u64) -> Result<u64, anyhow::Error> {
244241
tracing::info!("demo: increment: {}", num);
245242
Ok(num + 1)
246243
}
247244

248-
async fn panic(&mut self, _this: &hyperactor::Instance<Self>) -> Result<(), anyhow::Error> {
245+
async fn panic(&mut self, _this: &Context<Self>) -> Result<(), anyhow::Error> {
249246
tracing::info!("demo: panic!");
250247
panic!()
251248
}
252249

253-
async fn spawn_child(
254-
&mut self,
255-
this: &hyperactor::Instance<Self>,
256-
) -> Result<ActorRef<Self>, anyhow::Error> {
250+
async fn spawn_child(&mut self, this: &Context<Self>) -> Result<ActorRef<Self>, anyhow::Error> {
257251
tracing::info!("demo: spawn child");
258252
Ok(Self::spawn(this, ()).await?.bind())
259253
}
260254

261-
async fn error(
262-
&mut self,
263-
_this: &hyperactor::Instance<Self>,
264-
message: String,
265-
) -> Result<(), anyhow::Error> {
255+
async fn error(&mut self, _this: &Context<Self>, message: String) -> Result<(), anyhow::Error> {
266256
tracing::info!("demo: message: {}", message);
267257
anyhow::bail!("{}", message)
268258
}

hyperactor/example/derive.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ use std::time::Duration;
1111

1212
use async_trait::async_trait;
1313
use hyperactor::Actor;
14+
use hyperactor::Context;
1415
use hyperactor::HandleClient;
1516
use hyperactor::Handler;
16-
use hyperactor::Instance;
1717
use hyperactor::Named;
1818
use hyperactor::OncePortRef;
1919
use hyperactor::RefClient;
@@ -68,27 +68,23 @@ impl Actor for ShoppingListActor {
6868
#[async_trait]
6969
#[hyperactor::forward(ShoppingList)]
7070
impl ShoppingListHandler for ShoppingListActor {
71-
async fn add(&mut self, _this: &Instance<Self>, item: String) -> Result<(), anyhow::Error> {
71+
async fn add(&mut self, _this: &Context<Self>, item: String) -> Result<(), anyhow::Error> {
7272
eprintln!("insert {}", item);
7373
self.0.insert(item);
7474
Ok(())
7575
}
7676

77-
async fn remove(&mut self, _this: &Instance<Self>, item: String) -> Result<(), anyhow::Error> {
77+
async fn remove(&mut self, _this: &Context<Self>, item: String) -> Result<(), anyhow::Error> {
7878
eprintln!("remove {}", item);
7979
self.0.remove(&item);
8080
Ok(())
8181
}
8282

83-
async fn exists(
84-
&mut self,
85-
_this: &Instance<Self>,
86-
item: String,
87-
) -> Result<bool, anyhow::Error> {
83+
async fn exists(&mut self, _this: &Context<Self>, item: String) -> Result<bool, anyhow::Error> {
8884
Ok(self.0.contains(&item))
8985
}
9086

91-
async fn list(&mut self, _this: &Instance<Self>) -> Result<Vec<String>, anyhow::Error> {
87+
async fn list(&mut self, _this: &Context<Self>) -> Result<Vec<String>, anyhow::Error> {
9288
Ok(self.0.iter().cloned().collect())
9389
}
9490
}

hyperactor/src/actor.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::mailbox::UndeliverableMessageError;
4949
use crate::mailbox::log::MessageLogError;
5050
use crate::message::Castable;
5151
use crate::message::IndexedErasedUnbound;
52+
use crate::proc::Context;
5253
use crate::proc::Instance;
5354
use crate::proc::InstanceCell;
5455
use crate::proc::Ports;
@@ -142,7 +143,7 @@ pub trait Actor: Sized + Send + Debug + 'static {
142143
#[async_trait]
143144
pub trait Handler<M>: Actor {
144145
/// Handle the next M-typed message.
145-
async fn handle(&mut self, this: &Instance<Self>, message: M) -> Result<(), anyhow::Error>;
146+
async fn handle(&mut self, this: &Context<Self>, message: M) -> Result<(), anyhow::Error>;
146147
}
147148

148149
/// We provide this handler to indicate that actors can handle the [`Signal`] message.
@@ -151,7 +152,7 @@ pub trait Handler<M>: Actor {
151152
impl<A: Actor> Handler<Signal> for A {
152153
async fn handle(
153154
&mut self,
154-
_this: &Instance<Self>,
155+
_this: &Context<Self>,
155156
_message: Signal,
156157
) -> Result<(), anyhow::Error> {
157158
unimplemented!("signal handler should not be called directly")
@@ -164,7 +165,7 @@ impl<A: Actor> Handler<Signal> for A {
164165
impl<A: Actor> Handler<Undeliverable<MessageEnvelope>> for A {
165166
async fn handle(
166167
&mut self,
167-
this: &Instance<Self>,
168+
this: &Context<Self>,
168169
message: Undeliverable<MessageEnvelope>,
169170
) -> Result<(), anyhow::Error> {
170171
self.handle_undeliverable_message(this, message).await
@@ -181,7 +182,7 @@ where
181182
{
182183
async fn handle(
183184
&mut self,
184-
this: &Instance<Self>,
185+
this: &Context<Self>,
185186
msg: IndexedErasedUnbound<M>,
186187
) -> anyhow::Result<()> {
187188
let message = msg.downcast()?.bind()?;
@@ -712,7 +713,7 @@ mod tests {
712713
impl Handler<u64> for EchoActor {
713714
async fn handle(
714715
&mut self,
715-
this: &Instance<Self>,
716+
this: &Context<Self>,
716717
message: u64,
717718
) -> Result<(), anyhow::Error> {
718719
let Self(port) = self;
@@ -818,7 +819,7 @@ mod tests {
818819
impl Handler<OncePortHandle<bool>> for InitActor {
819820
async fn handle(
820821
&mut self,
821-
_this: &Instance<Self>,
822+
_this: &Context<Self>,
822823
port: OncePortHandle<bool>,
823824
) -> Result<(), anyhow::Error> {
824825
port.send(self.0)?;
@@ -861,7 +862,7 @@ mod tests {
861862

862863
#[async_trait]
863864
impl Handler<u64> for CheckpointActor {
864-
async fn handle(&mut self, this: &Instance<Self>, value: u64) -> Result<(), anyhow::Error> {
865+
async fn handle(&mut self, this: &Context<Self>, value: u64) -> Result<(), anyhow::Error> {
865866
self.sum += value;
866867
self.port.send(this, self.sum)?;
867868
Ok(())
@@ -946,7 +947,7 @@ mod tests {
946947
impl Handler<u64> for MultiActor {
947948
async fn handle(
948949
&mut self,
949-
_this: &Instance<Self>,
950+
_this: &Context<Self>,
950951
message: u64,
951952
) -> Result<(), anyhow::Error> {
952953
let mut vals = self.0.lock().unwrap();
@@ -959,7 +960,7 @@ mod tests {
959960
impl Handler<String> for MultiActor {
960961
async fn handle(
961962
&mut self,
962-
_this: &Instance<Self>,
963+
_this: &Context<Self>,
963964
message: String,
964965
) -> Result<(), anyhow::Error> {
965966
let mut vals = self.0.lock().unwrap();
@@ -972,7 +973,7 @@ mod tests {
972973
impl Handler<OncePortHandle<bool>> for MultiActor {
973974
async fn handle(
974975
&mut self,
975-
_this: &Instance<Self>,
976+
_this: &Context<Self>,
976977
message: OncePortHandle<bool>,
977978
) -> Result<(), anyhow::Error> {
978979
message.send(true).unwrap();

hyperactor/src/actor/remote.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ mod tests {
139139

140140
use super::*;
141141
use crate as hyperactor; // for macros
142+
use crate::Context;
142143
use crate::Handler;
143-
use crate::Instance;
144144

145145
#[derive(Debug)]
146146
#[hyperactor::export(handlers = [()])]
@@ -161,7 +161,7 @@ mod tests {
161161

162162
#[async_trait]
163163
impl Handler<()> for MyActor {
164-
async fn handle(&mut self, _this: &Instance<Self>, _message: ()) -> anyhow::Result<()> {
164+
async fn handle(&mut self, _this: &Context<Self>, _message: ()) -> anyhow::Result<()> {
165165
unimplemented!()
166166
}
167167
}

hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ pub use mailbox::RemoteMessage;
149149
pub use opentelemetry;
150150
#[doc(hidden)]
151151
pub use paste::paste;
152+
pub use proc::Context;
152153
pub use proc::Instance;
153154
pub use reference::ActorId;
154155
pub use reference::ActorRef;

0 commit comments

Comments
 (0)