Skip to content

[hyperactor_mesh]: comm: handle undeliverable messages in casts #360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: gh/shayne-fletcher/32/base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions hyperactor/src/mailbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ impl MessageEnvelope {
&self.dest
}

/// The message headers.
pub fn headers(&self) -> &Attrs {
&self.headers
}

/// Tells whether this is a signal message.
pub fn is_signal(&self) -> bool {
self.dest.index() == Signal::port()
Expand Down Expand Up @@ -2141,8 +2146,15 @@ impl MailboxSender for WeakMailboxRouter {
}
}

/// A serializable [`MailboxRouter`]. It keeps a serializable address book so that
/// the mailbox sender can be recovered.
/// A dynamic mailbox router that supports remote delivery.
///
/// `DialMailboxRouter` maintains a runtime address book mapping
/// references to `ChannelAddr`s. It holds a cache of active
/// connections and forwards messages to the appropriate
/// `MailboxClient`.
///
/// Messages sent to unknown destinations are routed to the `default`
/// sender, if present.
#[derive(Debug, Clone)]
pub struct DialMailboxRouter {
address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
Expand Down
34 changes: 21 additions & 13 deletions hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,12 +974,8 @@ mod tests {
);
}

// The intent is to emulate the behaviors of the Python
// interaction of T225230867 "process hangs when i send
// messages to a dead actor".
#[tracing_test::traced_test]
#[tokio::test]
async fn test_behaviors_on_actor_error() {
async fn test_cast_failure() {
use crate::alloc::ProcStopReason;
use crate::proc_mesh::ProcEvent;
use crate::sel;
Expand All @@ -994,6 +990,10 @@ mod tests {

let stop = alloc.stopper();
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
let mut undeliverable_rx = mesh
.client_undeliverable_receiver()
.take()
.expect("client_undeliverable_receiver should be available");
let mut events = mesh.events().unwrap();

let actor_mesh = mesh
Expand All @@ -1016,14 +1016,22 @@ mod tests {
ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
);

// Uncomment this to cause an infinite hang.
/*
let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
actor_mesh
.cast(sel!(*), GetRank(false, reply_handle.bind()))
.unwrap();
let rank = reply_receiver.recv().await.unwrap();
*/
// Cast the message.
let (reply_handle, _) = actor_mesh.open_port();
actor_mesh
.cast(sel!(*), GetRank(false, reply_handle.bind()))
.unwrap();

// The message will be returned.
let Undeliverable(msg) = undeliverable_rx.recv().await.unwrap();
assert_eq!(
msg.sender(),
&ActorId(
ProcId(actor_mesh.world_id().clone(), 0),
"comm".to_owned(),
0
)
);

// Stop the mesh.
stop();
Expand Down
60 changes: 59 additions & 1 deletion hyperactor_mesh/src/comm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ use hyperactor::ActorRef;
use hyperactor::Handler;
use hyperactor::Instance;
use hyperactor::Named;
use hyperactor::PortRef;
use hyperactor::WorldId;
use hyperactor::attrs::Attrs;
use hyperactor::attrs::declare_attrs;
use hyperactor::data::Serialized;
use hyperactor::mailbox::DeliveryError;
use hyperactor::mailbox::Undeliverable;
use hyperactor::mailbox::UndeliverableMessageError;
use hyperactor::reference::UnboundPort;
use ndslice::Slice;
use ndslice::selection::routing::RoutingFrame;
Expand Down Expand Up @@ -145,6 +150,10 @@ impl CommActorMode {
}
}

declare_attrs! {
pub attr CAST_ORIGINATING_SENDER: ActorId;
}

#[async_trait]
impl Actor for CommActor {
type Params = CommActorParams;
Expand All @@ -156,6 +165,47 @@ impl Actor for CommActor {
mode: Default::default(),
})
}

// This is an override of the default actor behavior.
async fn handle_undeliverable_message(
&mut self,
this: &Instance<Self>,
undelivered: hyperactor::mailbox::Undeliverable<hyperactor::mailbox::MessageEnvelope>,
) -> Result<(), anyhow::Error> {
let Undeliverable(mut message_envelope) = undelivered;

// 1. Case delivery failure at a "forwarding" step.
if let Ok(ForwardMessage {
message: CastMessageEnvelope { sender, .. },
..
}) = message_envelope.deserialized::<ForwardMessage>()
{
let return_port = PortRef::attest_message_port(&sender);
return_port
.send(this, Undeliverable(message_envelope.clone()))
.map_err(|err| {
message_envelope
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
UndeliverableMessageError::return_failure(&message_envelope)
})?;
return Ok(());
}

// 2. Case delivery failure at a "deliver here" step.
if let Some(sender) = message_envelope.headers().get(CAST_ORIGINATING_SENDER) {
let return_port = PortRef::attest_message_port(sender);
return_port
.send(this, Undeliverable(message_envelope.clone()))
.map_err(|err| {
message_envelope
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
UndeliverableMessageError::return_failure(&message_envelope)
})?;
return Ok(());
}

unreachable!()
}
}

impl CommActor {
Expand All @@ -181,6 +231,7 @@ impl CommActor {
seq: usize,
last_seqs: &mut HashMap<usize, usize>,
) -> Result<()> {
tracing::info!("sf: comm handle_message sender {:#?}", sender);
// Split ports, if any, and update message with new ports. In this
// way, children actors will reply to this comm actor's ports, instead
// of to the original ports provided by parent.
Expand All @@ -202,14 +253,21 @@ impl CommActor {
*r = CastRank(mode.self_rank(this.self_id()));
Ok(())
})?;

// Preserve the original sender in the headers so that if
// it turns out the message is returned undeliverable, we
// can recover it.
let mut headers = Attrs::new();
headers.set(CAST_ORIGINATING_SENDER, message.sender.clone());

// TODO(pzhang) split reply ports so children can reply to this comm
// actor instead of parent.
this.post(
this.self_id()
.proc_id()
.actor_id(message.dest_port().actor_name(), 0)
.port_id(message.dest_port().port()),
Attrs::new(),
headers,
Serialized::serialize(message.data())?,
);
}
Expand Down
2 changes: 1 addition & 1 deletion hyperactor_mesh/src/comm/multicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct Uslice {
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
pub struct CastMessageEnvelope {
/// The sender of this message.
sender: ActorId,
pub sender: ActorId,
/// The destination port of the message. It could match multiple actors with
/// rank wildcard.
dest_port: DestinationPort,
Expand Down