Skip to content

Commit c4ab7a0

Browse files
author
Shayne Fletcher
committed
[hyperactor_mesh]: comm: handle undeliverable messages in casts
provide `CommActor` with a custom `handle_undeliverable_message` implementation that routes delivery failures back to the original cast sender. Differential Revision: [D77398378](https://our.internmc.facebook.com/intern/diff/D77398378/) ghstack-source-id: 292942224 Pull Request resolved: #361
1 parent 8e9ee1b commit c4ab7a0

File tree

5 files changed

+97
-17
lines changed

5 files changed

+97
-17
lines changed

hyperactor/src/mailbox/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,11 @@ impl MessageEnvelope {
248248
&self.dest
249249
}
250250

251+
/// The message headers.
252+
pub fn headers(&self) -> &Attrs {
253+
&self.headers
254+
}
255+
251256
/// Tells whether this is a signal message.
252257
pub fn is_signal(&self) -> bool {
253258
self.dest.index() == Signal::port()
@@ -2141,8 +2146,15 @@ impl MailboxSender for WeakMailboxRouter {
21412146
}
21422147
}
21432148

2144-
/// A serializable [`MailboxRouter`]. It keeps a serializable address book so that
2145-
/// the mailbox sender can be recovered.
2149+
/// A dynamic mailbox router that supports remote delivery.
2150+
///
2151+
/// `DialMailboxRouter` maintains a runtime address book mapping
2152+
/// references to `ChannelAddr`s. It holds a cache of active
2153+
/// connections and forwards messages to the appropriate
2154+
/// `MailboxClient`.
2155+
///
2156+
/// Messages sent to unknown destinations are routed to the `default`
2157+
/// sender, if present.
21462158
#[derive(Debug, Clone)]
21472159
pub struct DialMailboxRouter {
21482160
address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,

hyperactor_mesh/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ buck-resources = "1"
4040
dir-diff = "0.3"
4141
maplit = "1.0"
4242
timed_test = { version = "0.0.0", path = "../timed_test" }
43-
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
4443

4544
[lints]
4645
rust = { unexpected_cfgs = { check-cfg = ["cfg(fbcode_build)"], level = "warn" } }

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -974,12 +974,8 @@ mod tests {
974974
);
975975
}
976976

977-
// The intent is to emulate the behaviors of the Python
978-
// interaction of T225230867 "process hangs when i send
979-
// messages to a dead actor".
980-
#[tracing_test::traced_test]
981977
#[tokio::test]
982-
async fn test_behaviors_on_actor_error() {
978+
async fn test_cast_failure() {
983979
use crate::alloc::ProcStopReason;
984980
use crate::proc_mesh::ProcEvent;
985981
use crate::sel;
@@ -994,6 +990,10 @@ mod tests {
994990

995991
let stop = alloc.stopper();
996992
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
993+
let mut undeliverable_rx = mesh
994+
.client_undeliverable_receiver()
995+
.take()
996+
.expect("client_undeliverable_receiver should be available");
997997
let mut events = mesh.events().unwrap();
998998

999999
let actor_mesh = mesh
@@ -1016,14 +1016,22 @@ mod tests {
10161016
ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
10171017
);
10181018

1019-
// Uncomment this to cause an infinite hang.
1020-
/*
1021-
let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
1022-
actor_mesh
1023-
.cast(sel!(*), GetRank(false, reply_handle.bind()))
1024-
.unwrap();
1025-
let rank = reply_receiver.recv().await.unwrap();
1026-
*/
1019+
// Cast the message.
1020+
let (reply_handle, _) = actor_mesh.open_port();
1021+
actor_mesh
1022+
.cast(sel!(*), GetRank(false, reply_handle.bind()))
1023+
.unwrap();
1024+
1025+
// The message will be returned.
1026+
let Undeliverable(msg) = undeliverable_rx.recv().await.unwrap();
1027+
assert_eq!(
1028+
msg.sender(),
1029+
&ActorId(
1030+
ProcId(actor_mesh.world_id().clone(), 0),
1031+
"comm".to_owned(),
1032+
0
1033+
)
1034+
);
10271035

10281036
// Stop the mesh.
10291037
stop();

hyperactor_mesh/src/comm/mod.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ use hyperactor::ActorRef;
2020
use hyperactor::Handler;
2121
use hyperactor::Instance;
2222
use hyperactor::Named;
23+
use hyperactor::PortRef;
2324
use hyperactor::WorldId;
2425
use hyperactor::attrs::Attrs;
26+
use hyperactor::attrs::declare_attrs;
2527
use hyperactor::data::Serialized;
28+
use hyperactor::mailbox::DeliveryError;
29+
use hyperactor::mailbox::Undeliverable;
30+
use hyperactor::mailbox::UndeliverableMessageError;
2631
use hyperactor::reference::UnboundPort;
2732
use ndslice::Slice;
2833
use ndslice::selection::routing::RoutingFrame;
@@ -145,6 +150,10 @@ impl CommActorMode {
145150
}
146151
}
147152

153+
declare_attrs! {
154+
pub attr CAST_ORIGINATING_SENDER: ActorId;
155+
}
156+
148157
#[async_trait]
149158
impl Actor for CommActor {
150159
type Params = CommActorParams;
@@ -156,6 +165,46 @@ impl Actor for CommActor {
156165
mode: Default::default(),
157166
})
158167
}
168+
169+
// This is an override of the default actor behavior.
170+
async fn handle_undeliverable_message(
171+
&mut self,
172+
this: &Instance<Self>,
173+
undelivered: hyperactor::mailbox::Undeliverable<hyperactor::mailbox::MessageEnvelope>,
174+
) -> Result<(), anyhow::Error> {
175+
let Undeliverable(mut message_envelope) = undelivered;
176+
177+
// 1. Case delivery failure at a "forwarding" step.
178+
if let Ok(ForwardMessage { message, .. }) =
179+
message_envelope.deserialized::<ForwardMessage>()
180+
{
181+
let sender = message.sender();
182+
let return_port = PortRef::attest_message_port(sender);
183+
return_port
184+
.send(this, Undeliverable(message_envelope.clone()))
185+
.map_err(|err| {
186+
message_envelope
187+
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
188+
UndeliverableMessageError::return_failure(&message_envelope)
189+
})?;
190+
return Ok(());
191+
}
192+
193+
// 2. Case delivery failure at a "deliver here" step.
194+
if let Some(sender) = message_envelope.headers().get(CAST_ORIGINATING_SENDER) {
195+
let return_port = PortRef::attest_message_port(sender);
196+
return_port
197+
.send(this, Undeliverable(message_envelope.clone()))
198+
.map_err(|err| {
199+
message_envelope
200+
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
201+
UndeliverableMessageError::return_failure(&message_envelope)
202+
})?;
203+
return Ok(());
204+
}
205+
206+
unreachable!()
207+
}
159208
}
160209

161210
impl CommActor {
@@ -181,6 +230,7 @@ impl CommActor {
181230
seq: usize,
182231
last_seqs: &mut HashMap<usize, usize>,
183232
) -> Result<()> {
233+
tracing::info!("sf: comm handle_message sender {:#?}", sender);
184234
// Split ports, if any, and update message with new ports. In this
185235
// way, children actors will reply to this comm actor's ports, instead
186236
// of to the original ports provided by parent.
@@ -202,14 +252,21 @@ impl CommActor {
202252
*r = CastRank(mode.self_rank(this.self_id()));
203253
Ok(())
204254
})?;
255+
256+
// Preserve the original sender in the headers so that if
257+
// it turns out the message is returned undeliverable, we
258+
// can recover it.
259+
let mut headers = Attrs::new();
260+
headers.set(CAST_ORIGINATING_SENDER, message.sender().clone());
261+
205262
// TODO(pzhang) split reply ports so children can reply to this comm
206263
// actor instead of parent.
207264
this.post(
208265
this.self_id()
209266
.proc_id()
210267
.actor_id(message.dest_port().actor_name(), 0)
211268
.port_id(message.dest_port().port()),
212-
Attrs::new(),
269+
headers,
213270
Serialized::serialize(message.data())?,
214271
);
215272
}

hyperactor_mesh/src/comm/multicast.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ impl CastMessageEnvelope {
7575
}
7676
}
7777

78+
pub(crate) fn sender(&self) -> &ActorId {
79+
&self.sender
80+
}
81+
7882
pub(crate) fn dest_port(&self) -> &DestinationPort {
7983
&self.dest_port
8084
}

0 commit comments

Comments
 (0)