Skip to content

[hyperactor-book]: mailboxes: update for headers #342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: gh/shayne-fletcher/27/base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions books/hyperactor-book/src/mailboxes/delivery.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ pub struct MessageEnvelope {

/// Error contains a delivery error when message delivery failed.
error: Option<DeliveryError>,

/// Additional context for this message.
headers: Attrs,
}
```

`MessageEnvelope::new` creates a message envelope:
```rust
impl MessageEnvelope {
fn new(sender: ActorId, dest: PortId, data: Serialized) -> Self { ... }
fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self { ... }
}
```
`MessageEnvelope::new_unknown` creates a new envelope when we don't know who the sender is:
Expand All @@ -47,7 +50,7 @@ If a type `T` implements `Serialize` and `Named`, an envelope can be constructed
```rust
impl MessageEnvelope {
fn serialize<T: Serialize + Named>(
source: ActorId, dest: PortId, value: &T) -> Result<Self, bincode::Error> {
source: ActorId, dest: PortId, value: &T, headers: Attrs) -> Result<Self, bincode::Error> {
Ok(Self {
data: Serialized::serialize(value)?,
sender: source,
Expand Down
45 changes: 28 additions & 17 deletions books/hyperactor-book/src/mailboxes/mailbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,19 @@ impl MailboxSender for Mailbox {
),
Entry::Occupied(entry) => {
let (metadata, data) = envelope.open();
match entry.get().send_serialized(data) {
let MessageMetadata {headers, sender, dest, error: metadata_error } = metadata;
match entry.get().send_serialized(headers, data) {
Ok(false) => {
entry.remove();
}
Ok(true) => (),
Err(SerializedSenderError { data, error }) => MessageEnvelope::seal(
metadata, data,
Err(SerializedSenderError {
data,
error,
headers,
}) => MessageEnvelope::seal(
MessageMetadata { headers, sender, dest, error: metadata_error },
data,
)
.undeliverable(DeliveryError::Mailbox(format!("{}", error)), return_handle),
}
Expand All @@ -213,7 +219,7 @@ The mailbox uses the destination `PortId` to locate the bound port in its intern

3. Deserialization and Delivery Attempt
```rust
match entry.get().send_serialized(data)
match entry.get().send_serialized(headers, data)
```
If the port is found, the message is unsealed and passed to the corresponding `SerializedSender` (e.g., the `UnboundedSender` inserted during binding). This may succeed or fail:
- `Ok(true)`: Message was delivered.
Expand Down Expand Up @@ -276,7 +282,7 @@ There are two distinct pathways by which a message can arrive at a `PortReceiver
When you call `.send(msg)` on a `PortHandle<M>`, the message bypasses the `Mailbox` entirely and goes directly into the associated channel:
```text
PortHandle<M>::send(msg)
→ UnboundedPortSender<M>::send(msg)
→ UnboundedPortSender<M>::send(Attrs::new(), msg)
→ underlying channel (mpsc::UnboundedSender<M>)
→ PortReceiver<M>::recv().await
```
Expand All @@ -287,8 +293,8 @@ When a message is wrapped in a `MessageEnvelope` and posted via `Mailbox::post`,
```text
Mailbox::post(envelope, return_handle)
→ lookup State::ports[port_index]
→ SerializedSender::send_serialized(bytes)
→ UnboundedSender::send(M) // after deserialization
→ SerializedSender::send_serialized(headers, bytes)
→ UnboundedSender::send(headers, M) // after deserialization
→ mpsc channel
→ PortReceiver<M>::recv().await
```
Expand Down Expand Up @@ -320,28 +326,33 @@ impl<T: sealed::CanSend> CanSend for T {}
The sealed version defines the core method:
```rust
pub trait sealed::CanSend: Send + Sync {
fn post(&self, dest: PortId, data: Serialized);
fn post(&self, dest: PortId, headers: Attrs, data: Serialized);
}
```
Only internal types (e.g., `Mailbox`) implement this sealed trait, meaning only trusted components can obtain `CanSend`:
```rust
impl cap::sealed::CanSend for Mailbox {
fn post(&self, dest: PortId, data: Serialized) {
fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
let return_handle = self
.lookup_sender::<Undeliverable<MessageEnvelope>>()
.map_or_else(
|| {
let bt = std::backtrace::Backtrace::capture();
tracing::warn!(
actor_id = ?self.actor_id(),
backtrace = ?bt,
"Mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
);
let actor_id = self.actor_id();
if CAN_SEND_WARNED_MAILBOXES
.get_or_init(DashSet::new)
.insert(actor_id.clone()) {
let bt = std::backtrace::Backtrace::capture();
tracing::warn!(
actor_id = ?actor_id,
backtrace = ?bt,
"mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
);
}
monitored_return_handle()
},
|sender| PortHandle::new(self.clone(), u64::MAX, sender),
|sender| PortHandle::new(self.clone(), self.state.allocate_port(), sender),
);
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data);
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data, headers);
MailboxSender::post(self, envelope, return_handle);
}
}
Expand Down
28 changes: 20 additions & 8 deletions books/hyperactor-book/src/mailboxes/ports.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum UnboundedPortSender<M: Message> {
- **`Mpsc`**: Sends messages into a tokio unbounded channel
- **`Func`**: Custom logic, often used to enqueue messages onto actor work queues.

Messages are sent via the `.send(message)` method, which forwards to either the internal channel or the configured function.
Messages are sent via the `.send(headers, message)` method, which forwards to either the internal channel or the configured function.

## `PortHandle<M>`

Expand Down Expand Up @@ -385,8 +385,13 @@ Calling `.send_once(message)` on an `OnceSender` consumes the channel, and fails
To enable uniform message routing, both `UnboundedSender` and `OnceSender` implement the `SerializedSender` trait:
```rust
trait SerializedSender: Send + Sync {
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError>;
}
fn as_any(&self) -> &dyn Any;
fn send_serialized(
&self,
headers: Attrs,
serialized: Serialized,
) -> Result<bool, SerializedSenderError>;

```
This trait lets the mailbox deliver a `Serialized` message (a type-erased, encoded payload) by:
1. Deserializing the payload into a concrete `M` using `RemoteMessage` trait,
Expand All @@ -407,18 +412,24 @@ See the (Mailbox) [`State`](./mailbox.md#state) section for details on how the m
Below is the canonical implementation of `SerializedSender` for `UnboundedSender<M>`:
```rust
impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
fn send_serialized(
&self,
headers: Attrs,
serialized: Serialized,
) -> Result<bool, SerializedSenderError> {
match serialized.deserialized() {
Ok(message) => {
self.sender
.send(message)
.map_err(|err| SerializedSenderError {
self.sender.send(headers.clone(), message).map_err(|err| {
SerializedSenderError {
data: serialized,
error: MailboxSenderError::new_bound(
self.port_id.clone(),
MailboxSenderErrorKind::Other(err),
),
})?;
headers,
}
})?;

Ok(true)
}
Err(err) => Err(SerializedSenderError {
Expand All @@ -427,6 +438,7 @@ impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
self.port_id.clone(),
MailboxSenderErrorKind::Deserialize(M::typename(), err),
),
headers,
}),
}
}
Expand Down
Loading