Skip to content

: comm: handle undeliverable messages in casts #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ impl MessageEnvelope {
&self.dest
}

/// The message headers.
pub fn headers(&self) -> &Attrs {
&self.headers
}

/// Tells whether this is a signal message.
pub fn is_signal(&self) -> bool {
self.dest.index() == Signal::port()
Expand Down Expand Up @@ -2131,8 +2136,15 @@ impl MailboxSender for WeakMailboxRouter {
}
}

/// A serializable [`MailboxRouter`]. It keeps a serializable address book so that
/// the mailbox sender can be recovered.
/// A dynamic mailbox router that supports remote delivery.
///
/// `DialMailboxRouter` maintains a runtime address book mapping
/// references to `ChannelAddr`s. It holds a cache of active
/// connections and forwards messages to the appropriate
/// `MailboxClient`.
///
/// Messages sent to unknown destinations are routed to the `default`
/// sender, if present.
#[derive(Debug, Clone)]
pub struct DialMailboxRouter {
address_book: Arc<RwLock<BTreeMap<Reference, ChannelAddr>>>,
Expand Down
1 change: 0 additions & 1 deletion hyperactor_mesh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "
dir-diff = "0.3"
maplit = "1.0"
timed_test = { version = "0.0.0", path = "../timed_test" }
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }

[lints]
rust = { unexpected_cfgs = { check-cfg = ["cfg(fbcode_build)"], level = "warn" } }
47 changes: 30 additions & 17 deletions hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,12 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
for ref slice in sel {
for rank in slice.iter() {
let mut headers = Attrs::new();
set_cast_info_on_headers(&mut headers, rank, self.shape().clone());
set_cast_info_on_headers(
&mut headers,
rank,
self.shape().clone(),
self.proc_mesh.client().actor_id().clone(),
);
self.ranks[rank]
.send_with_headers(self.proc_mesh.client(), headers, message.clone())
.map_err(|err| CastError::MailboxSenderError(rank, err))?;
Expand Down Expand Up @@ -782,17 +787,17 @@ mod tests {
let actor_mesh: RootActorMesh<TestActor> = mesh.spawn("test", &()).await.unwrap();
let actor_ref = actor_mesh.get(0).unwrap();
let mut headers = Attrs::new();
set_cast_info_on_headers(&mut headers, 0, Shape::unity());
set_cast_info_on_headers(&mut headers, 0, Shape::unity(), mesh.client().actor_id().clone());
actor_ref.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone())).unwrap();
assert_eq!(0, reply_port_receiver.recv().await.unwrap());

set_cast_info_on_headers(&mut headers, 1, Shape::unity());
set_cast_info_on_headers(&mut headers, 1, Shape::unity(), mesh.client().actor_id().clone());
actor_ref.port()
.send_with_headers(mesh.client(), headers.clone(), GetRank(true, reply_port.clone()))
.unwrap();
assert_eq!(1, reply_port_receiver.recv().await.unwrap());

set_cast_info_on_headers(&mut headers, 2, Shape::unity());
set_cast_info_on_headers(&mut headers, 2, Shape::unity(), mesh.client().actor_id().clone());
actor_ref.actor_id()
.port_id(GetRank::port())
.send_with_headers(
Expand Down Expand Up @@ -888,12 +893,8 @@ mod tests {
);
}

// The intent is to emulate the behaviors of the Python
// interaction of T225230867 "process hangs when i send
// messages to a dead actor".
#[tracing_test::traced_test]
#[tokio::test]
async fn test_behaviors_on_actor_error() {
async fn test_cast_failure() {
use crate::alloc::ProcStopReason;
use crate::proc_mesh::ProcEvent;
use crate::sel;
Expand All @@ -908,6 +909,10 @@ mod tests {

let stop = alloc.stopper();
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
let mut undeliverable_rx = mesh
.client_undeliverable_receiver()
.take()
.expect("client_undeliverable_receiver should be available");
let mut events = mesh.events().unwrap();

let actor_mesh = mesh
Expand All @@ -930,14 +935,22 @@ mod tests {
ProcEvent::Crashed(0, reason) if reason.contains("intentional error!")
);

// Uncomment this to cause an infinite hang.
/*
let (reply_handle, mut reply_receiver) = actor_mesh.open_port();
actor_mesh
.cast(sel!(*), GetRank(false, reply_handle.bind()))
.unwrap();
let rank = reply_receiver.recv().await.unwrap();
*/
// Cast the message.
let (reply_handle, _) = actor_mesh.open_port();
actor_mesh
.cast(sel!(*), GetRank(false, reply_handle.bind()))
.unwrap();

// The message will be returned.
let Undeliverable(msg) = undeliverable_rx.recv().await.unwrap();
assert_eq!(
msg.sender(),
&ActorId(
ProcId(actor_mesh.world_id().clone(), 0),
"comm".to_owned(),
0
)
);

// Stop the mesh.
stop();
Expand Down
46 changes: 46 additions & 0 deletions hyperactor_mesh/src/comm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* LICENSE file in the root directory of this source tree.
*/

use crate::comm::multicast::CAST_ORIGINATING_SENDER;
pub mod multicast;

use std::cmp::Ordering;
Expand All @@ -20,9 +21,13 @@ use hyperactor::ActorRef;
use hyperactor::Handler;
use hyperactor::Instance;
use hyperactor::Named;
use hyperactor::PortRef;
use hyperactor::WorldId;
use hyperactor::attrs::Attrs;
use hyperactor::data::Serialized;
use hyperactor::mailbox::DeliveryError;
use hyperactor::mailbox::Undeliverable;
use hyperactor::mailbox::UndeliverableMessageError;
use hyperactor::reference::UnboundPort;
use ndslice::Slice;
use ndslice::selection::routing::RoutingFrame;
Expand Down Expand Up @@ -156,6 +161,46 @@ impl Actor for CommActor {
mode: Default::default(),
})
}

// This is an override of the default actor behavior.
async fn handle_undeliverable_message(
&mut self,
this: &Instance<Self>,
undelivered: hyperactor::mailbox::Undeliverable<hyperactor::mailbox::MessageEnvelope>,
) -> Result<(), anyhow::Error> {
let Undeliverable(mut message_envelope) = undelivered;

// 1. Case delivery failure at a "forwarding" step.
if let Ok(ForwardMessage { message, .. }) =
message_envelope.deserialized::<ForwardMessage>()
{
let sender = message.sender();
let return_port = PortRef::attest_message_port(sender);
return_port
.send(this, Undeliverable(message_envelope.clone()))
.map_err(|err| {
message_envelope
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
UndeliverableMessageError::return_failure(&message_envelope)
})?;
return Ok(());
}

// 2. Case delivery failure at a "deliver here" step.
if let Some(sender) = message_envelope.headers().get(CAST_ORIGINATING_SENDER) {
let return_port = PortRef::attest_message_port(sender);
return_port
.send(this, Undeliverable(message_envelope.clone()))
.map_err(|err| {
message_envelope
.try_set_error(DeliveryError::BrokenLink(format!("send failure: {err}")));
UndeliverableMessageError::return_failure(&message_envelope)
})?;
return Ok(());
}

unreachable!()
}
}

impl CommActor {
Expand Down Expand Up @@ -203,6 +248,7 @@ impl CommActor {
&mut headers,
mode.self_rank(this.self_id()),
message.shape().clone(),
message.sender().clone(),
);
// TODO(pzhang) split reply ports so children can reply to this comm
// actor instead of parent.
Expand Down
9 changes: 8 additions & 1 deletion hyperactor_mesh/src/comm/multicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ impl CastMessageEnvelope {
}
}

pub(crate) fn sender(&self) -> &ActorId {
&self.sender
}

pub(crate) fn dest_port(&self) -> &DestinationPort {
&self.dest_port
}
Expand Down Expand Up @@ -179,11 +183,14 @@ declare_attrs! {
/// Used inside headers to store the shape of the
/// actor mesh that a message was cast to.
attr CAST_SHAPE: Shape;
/// Used inside headers to store the originating sender of a cast.
pub attr CAST_ORIGINATING_SENDER: ActorId;
}

pub fn set_cast_info_on_headers(headers: &mut Attrs, rank: usize, shape: Shape) {
pub fn set_cast_info_on_headers(headers: &mut Attrs, rank: usize, shape: Shape, sender: ActorId) {
headers.set(CAST_RANK, rank);
headers.set(CAST_SHAPE, shape);
headers.set(CAST_ORIGINATING_SENDER, sender);
}

pub fn get_cast_info_from_headers(headers: &Attrs) -> Option<(usize, Shape)> {
Expand Down
7 changes: 6 additions & 1 deletion monarch_hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ impl PyMailbox {
) -> PyResult<()> {
let port_id = dest.inner.port_id(PythonMessage::port());
let mut headers = Attrs::new();
set_cast_info_on_headers(&mut headers, rank, shape.inner.clone());
set_cast_info_on_headers(
&mut headers,
rank,
shape.inner.clone(),
self.inner.actor_id().clone(),
);
let message = Serialized::serialize(message).map_err(|err| {
PyRuntimeError::new_err(format!(
"failed to serialize message ({:?}) to Serialized: {}",
Expand Down