Skip to content

Commit f346040

Browse files
author
Shayne Fletcher
committed
[hyperactor-book]: mailboxes: update for headers
Differential Revision: [D77327124](https://our.internmc.facebook.com/intern/diff/D77327124/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D77327124/)! ghstack-source-id: 292688983 Pull Request resolved: #341
1 parent 6067347 commit f346040

File tree

3 files changed

+64
-27
lines changed

3 files changed

+64
-27
lines changed

books/hyperactor-book/src/mailboxes/delivery.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ pub struct MessageEnvelope {
2626

2727
/// Error contains a delivery error when message delivery failed.
2828
error: Option<DeliveryError>,
29+
30+
/// Additional context for this message.
31+
headers: Attrs,
2932
}
3033
```
3134

3235
`MessageEnvelope::new` creates a message envelope:
3336
```rust
3437
impl MessageEnvelope {
35-
fn new(sender: ActorId, dest: PortId, data: Serialized) -> Self { ... }
38+
fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self { ... }
3639
}
3740
```
3841
`MessageEnvelope::new_unknown` creates a new envelope when we don't know who the sender is:
@@ -47,7 +50,7 @@ If a type `T` implements `Serialize` and `Named`, an envelope can be constructed
4750
```rust
4851
impl MessageEnvelope {
4952
fn serialize<T: Serialize + Named>(
50-
source: ActorId, dest: PortId, value: &T) -> Result<Self, bincode::Error> {
53+
source: ActorId, dest: PortId, value: &T, headers: Attrs) -> Result<Self, bincode::Error> {
5154
Ok(Self {
5255
data: Serialized::serialize(value)?,
5356
sender: source,

books/hyperactor-book/src/mailboxes/mailbox.md

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ impl MailboxSender for Mailbox {
169169
envelope: MessageEnvelope,
170170
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
171171
) {
172+
tracing::trace!(name = "post", "posting message to {}", envelope.dest);
172173
if envelope.dest().actor_id() != &self.state.actor_id {
173174
return self.state.forwarder.post(envelope, return_handle);
174175
}
@@ -180,13 +181,29 @@ impl MailboxSender for Mailbox {
180181
),
181182
Entry::Occupied(entry) => {
182183
let (metadata, data) = envelope.open();
183-
match entry.get().send_serialized(data) {
184+
let MessageMetadata {
185+
headers,
186+
sender,
187+
dest,
188+
error: metadata_error,
189+
} = metadata;
190+
match entry.get().send_serialized(headers, data) {
184191
Ok(false) => {
185192
entry.remove();
186193
}
187194
Ok(true) => (),
188-
Err(SerializedSenderError { data, error }) => MessageEnvelope::seal(
189-
metadata, data,
195+
Err(SerializedSenderError {
196+
data,
197+
error,
198+
headers,
199+
}) => MessageEnvelope::seal(
200+
MessageMetadata {
201+
headers,
202+
sender,
203+
dest,
204+
error: metadata_error,
205+
},
206+
data,
190207
)
191208
.undeliverable(DeliveryError::Mailbox(format!("{}", error)), return_handle),
192209
}
@@ -213,7 +230,7 @@ The mailbox uses the destination `PortId` to locate the bound port in its intern
213230

214231
3. Deserialization and Delivery Attempt
215232
```rust
216-
match entry.get().send_serialized(data)
233+
match entry.get().send_serialized(headers, data)
217234
```
218235
If the port is found, the message is unsealed and passed to the corresponding `SerializedSender` (e.g., the `UnboundedSender` inserted during binding). This may succeed or fail:
219236
- `Ok(true)`: Message was delivered.
@@ -276,7 +293,7 @@ There are two distinct pathways by which a message can arrive at a `PortReceiver
276293
When you call `.send(msg)` on a `PortHandle<M>`, the message bypasses the `Mailbox` entirely and goes directly into the associated channel:
277294
```text
278295
PortHandle<M>::send(msg)
279-
→ UnboundedPortSender<M>::send(msg)
296+
→ UnboundedPortSender<M>::send(Attrs::new(), msg)
280297
→ underlying channel (mpsc::UnboundedSender<M>)
281298
→ PortReceiver<M>::recv().await
282299
```
@@ -287,8 +304,8 @@ When a message is wrapped in a `MessageEnvelope` and posted via `Mailbox::post`,
287304
```text
288305
Mailbox::post(envelope, return_handle)
289306
→ lookup State::ports[port_index]
290-
→ SerializedSender::send_serialized(bytes)
291-
→ UnboundedSender::send(M) // after deserialization
307+
→ SerializedSender::send_serialized(headers, bytes)
308+
→ UnboundedSender::send(headers, M) // after deserialization
292309
→ mpsc channel
293310
→ PortReceiver<M>::recv().await
294311
```
@@ -320,28 +337,33 @@ impl<T: sealed::CanSend> CanSend for T {}
320337
The sealed version defines the core method:
321338
```rust
322339
pub trait sealed::CanSend: Send + Sync {
323-
fn post(&self, dest: PortId, data: Serialized);
340+
fn post(&self, dest: PortId, headers: Attrs, data: Serialized);
324341
}
325342
```
326343
Only internal types (e.g., `Mailbox`) implement this sealed trait, meaning only trusted components can obtain `CanSend`:
327344
```rust
328345
impl cap::sealed::CanSend for Mailbox {
329-
fn post(&self, dest: PortId, data: Serialized) {
346+
fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
330347
let return_handle = self
331348
.lookup_sender::<Undeliverable<MessageEnvelope>>()
332349
.map_or_else(
333350
|| {
334-
let bt = std::backtrace::Backtrace::capture();
335-
tracing::warn!(
336-
actor_id = ?self.actor_id(),
337-
backtrace = ?bt,
338-
"Mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
339-
);
351+
let actor_id = self.actor_id();
352+
if CAN_SEND_WARNED_MAILBOXES
353+
.get_or_init(DashSet::new)
354+
.insert(actor_id.clone()) {
355+
let bt = std::backtrace::Backtrace::capture();
356+
tracing::warn!(
357+
actor_id = ?actor_id,
358+
backtrace = ?bt,
359+
"mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
360+
);
361+
}
340362
monitored_return_handle()
341363
},
342-
|sender| PortHandle::new(self.clone(), u64::MAX, sender),
364+
|sender| PortHandle::new(self.clone(), self.state.allocate_port(), sender),
343365
);
344-
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data);
366+
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data, headers);
345367
MailboxSender::post(self, envelope, return_handle);
346368
}
347369
}

books/hyperactor-book/src/mailboxes/ports.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ enum UnboundedPortSender<M: Message> {
2121
- **`Mpsc`**: Sends messages into a tokio unbounded channel
2222
- **`Func`**: Custom logic, often used to enqueue messages onto actor work queues.
2323

24-
Messages are sent via the `.send(message)` method, which forwards to either the internal channel or the configured function.
24+
Messages are sent via the `.send(headers, message)` method, which forwards to either the internal channel or the configured function.
2525

2626
## `PortHandle<M>`
2727

@@ -385,8 +385,13 @@ Calling `.send_once(message)` on an `OnceSender` consumes the channel, and fails
385385
To enable uniform message routing, both `UnboundedSender` and `OnceSender` implement the `SerializedSender` trait:
386386
```rust
387387
trait SerializedSender: Send + Sync {
388-
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError>;
389-
}
388+
fn as_any(&self) -> &dyn Any;
389+
fn send_serialized(
390+
&self,
391+
headers: Attrs,
392+
serialized: Serialized,
393+
) -> Result<bool, SerializedSenderError>;
394+
390395
```
391396
This trait lets the mailbox deliver a `Serialized` message (a type-erased, encoded payload) by:
392397
1. Deserializing the payload into a concrete `M` using `RemoteMessage` trait,
@@ -407,18 +412,24 @@ See the (Mailbox) [`State`](./mailbox.md#state) section for details on how the m
407412
Below is the canonical implementation of `SerializedSender` for `UnboundedSender<M>`:
408413
```rust
409414
impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
410-
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
415+
fn send_serialized(
416+
&self,
417+
headers: Attrs,
418+
serialized: Serialized,
419+
) -> Result<bool, SerializedSenderError> {
411420
match serialized.deserialized() {
412421
Ok(message) => {
413-
self.sender
414-
.send(message)
415-
.map_err(|err| SerializedSenderError {
422+
self.sender.send(headers.clone(), message).map_err(|err| {
423+
SerializedSenderError {
416424
data: serialized,
417425
error: MailboxSenderError::new_bound(
418426
self.port_id.clone(),
419427
MailboxSenderErrorKind::Other(err),
420428
),
421-
})?;
429+
headers,
430+
}
431+
})?;
432+
422433
Ok(true)
423434
}
424435
Err(err) => Err(SerializedSenderError {
@@ -427,6 +438,7 @@ impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
427438
self.port_id.clone(),
428439
MailboxSenderErrorKind::Deserialize(M::typename(), err),
429440
),
441+
headers,
430442
}),
431443
}
432444
}

0 commit comments

Comments
 (0)