Skip to content

Commit dc724ba

Browse files
author
Shayne Fletcher
committed
[hyperactor_mesh]: comm: handle undeliverable messages in casts
Differential Revision: [D77398378](https://our.internmc.facebook.com/intern/diff/D77398378/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D77398378/)! ghstack-source-id: 292941204 Pull Request resolved: #360
1 parent b775a32 commit dc724ba

File tree

4 files changed

+93
-12
lines changed

4 files changed

+93
-12
lines changed

hyperactor/src/mailbox/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,11 @@ impl MessageEnvelope {
248248
&self.dest
249249
}
250250

251+
/// The message headers.
252+
pub fn headers(&self) -> &Attrs {
253+
&self.headers
254+
}
255+
251256
/// Tells whether this is a signal message.
252257
pub fn is_signal(&self) -> bool {
253258
self.dest.index() == Signal::port()
@@ -2141,8 +2146,15 @@ impl MailboxSender for WeakMailboxRouter {
21412146
}
21422147
}
21432148

2144-
/// A serializable [`MailboxRouter`]. It keeps a serializable address book so that
2145-
/// the mailbox sender can be recovered.
2149+
/// A dynamic mailbox router that supports remote delivery.
2150+
///
2151+
/// `DialMailboxRouter` maintains a runtime address book mapping
2152+
/// references to `ChannelAddr`s. It holds a cache of active
2153+
/// connections and forwards messages to the appropriate
2154+
/// `MailboxClient`.
2155+
///
2156+
/// Messages sent to unknown destinations are routed to the `default`
2157+
/// sender, if present.
21462158
#[derive(Debug, Clone)]
21472159
pub struct DialMailboxRouter {
21482160
address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,10 @@ mod tests {
994994

995995
let stop = alloc.stopper();
996996
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
997+
let mut undeliverable_rx = mesh
998+
.client_undeliverable_receiver()
999+
.take()
1000+
.expect("client_undeliverable_receiver should be available");
9971001
let mut events = mesh.events().unwrap();
9981002

9991003
let actor_mesh = mesh
@@ -1016,14 +1020,21 @@ mod tests {
10161020
ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
10171021
);
10181022

1019-
// Uncomment this to cause an infinite hang.
1020-
/*
1021-
let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1022-
actor_mesh
1023-
.cast(sel!(*), GetRank(false, reply_handle.bind()))
1024-
.unwrap();
1025-
let rank = reply_receiver.recv().await.unwrap();
1026-
*/
1023+
// Cast the message.
1024+
let (reply_handle, _) = actor_mesh.open_port();
1025+
actor_mesh
1026+
.cast(sel!(*), GetRank(false, reply_handle.bind()))
1027+
.unwrap();
1028+
// The message will be returned.
1029+
let Undeliverable(msg) = undeliverable_rx.recv().await.unwrap();
1030+
assert_eq!(
1031+
msg.sender(),
1032+
&ActorId(
1033+
ProcId(actor_mesh.world_id().clone(), 0),
1034+
"comm".to_owned(),
1035+
0
1036+
)
1037+
);
10271038

10281039
// Stop the mesh.
10291040
stop();

hyperactor_mesh/src/comm/mod.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ use hyperactor::ActorRef;
2020
use hyperactor::Handler;
2121
use hyperactor::Instance;
2222
use hyperactor::Named;
23+
use hyperactor::PortRef;
2324
use hyperactor::WorldId;
2425
use hyperactor::attrs::Attrs;
26+
use hyperactor::attrs::declare_attrs;
2527
use hyperactor::data::Serialized;
28+
use hyperactor::mailbox::DeliveryError;
29+
use hyperactor::mailbox::Undeliverable;
30+
use hyperactor::mailbox::UndeliverableMessageError;
2631
use hyperactor::reference::UnboundPort;
2732
use ndslice::Slice;
2833
use ndslice::selection::routing::RoutingFrame;
@@ -145,6 +150,10 @@ impl CommActorMode {
145150
}
146151
}
147152

153+
declare_attrs! {
154+
pub attr CAST_ORIGINATING_SENDER: ActorId;
155+
}
156+
148157
#[async_trait]
149158
impl Actor for CommActor {
150159
type Params = CommActorParams;
@@ -156,6 +165,47 @@ impl Actor for CommActor {
156165
mode: Default::default(),
157166
})
158167
}
168+
169+
// This is an override of the default actor behavior.
170+
async fn handle_undeliverable_message(
171+
&mut self,
172+
this: &Instance<Self>,
173+
undelivered: hyperactor::mailbox::Undeliverable<hyperactor::mailbox::MessageEnvelope>,
174+
) -> Result<(), anyhow::Error> {
175+
let Undeliverable(mut message_envelope) = undelivered;
176+
177+
// 1. Case delivery failure at a "forwarding" step.
178+
if let Ok(ForwardMessage {
179+
message: CastMessageEnvelope { sender, .. },
180+
..
181+
}) = message_envelope.deserialized::<ForwardMessage>()
182+
{
183+
let return_port = PortRef::attest_message_port(&sender);
184+
return_port
185+
.send(this, Undeliverable(message_envelope.clone()))
186+
.map_err(|err| {
187+
message_envelope
188+
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
189+
UndeliverableMessageError::return_failure(&message_envelope)
190+
})?;
191+
return Ok(());
192+
}
193+
194+
// 2. Case delivery failure at a "deliver here" step.
195+
if let Some(sender) = message_envelope.headers().get(CAST_ORIGINATING_SENDER) {
196+
let return_port = PortRef::attest_message_port(sender);
197+
return_port
198+
.send(this, Undeliverable(message_envelope.clone()))
199+
.map_err(|err| {
200+
message_envelope
201+
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
202+
UndeliverableMessageError::return_failure(&message_envelope)
203+
})?;
204+
return Ok(());
205+
}
206+
207+
unreachable!()
208+
}
159209
}
160210

161211
impl CommActor {
@@ -181,6 +231,7 @@ impl CommActor {
181231
seq: usize,
182232
last_seqs: &mut HashMap<usize, usize>,
183233
) -> Result<()> {
234+
tracing::info!("sf: comm handle_message sender {:#?}", sender);
184235
// Split ports, if any, and update message with new ports. In this
185236
// way, children actors will reply to this comm actor's ports, instead
186237
// of to the original ports provided by parent.
@@ -202,14 +253,21 @@ impl CommActor {
202253
*r = CastRank(mode.self_rank(this.self_id()));
203254
Ok(())
204255
})?;
256+
257+
// Preserve the original sender in the headers so that if
258+
// it turns out the message is returned undeliverable, we
259+
// can recover it.
260+
let mut headers = Attrs::new();
261+
headers.set(CAST_ORIGINATING_SENDER, message.sender.clone());
262+
205263
// TODO(pzhang) split reply ports so children can reply to this comm
206264
// actor instead of parent.
207265
this.post(
208266
this.self_id()
209267
.proc_id()
210268
.actor_id(message.dest_port().actor_name(), 0)
211269
.port_id(message.dest_port().port()),
212-
Attrs::new(),
270+
headers,
213271
Serialized::serialize(message.data())?,
214272
);
215273
}

hyperactor_mesh/src/comm/multicast.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct Uslice {
4141
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
4242
pub struct CastMessageEnvelope {
4343
/// The sender of this message.
44-
sender: ActorId,
44+
pub sender: ActorId,
4545
/// The destination port of the message. It could match multiple actors with
4646
/// rank wildcard.
4747
dest_port: DestinationPort,

0 commit comments

Comments
 (0)