Skip to content

Commit 32b438c

Browse files
: mailboxes: update for headers
Summary: headers were introduced in D76532974 so we need to touch up a few places Differential Revision: D77327124
1 parent 09aeb5d commit 32b438c

File tree

3 files changed

+53
-27
lines changed

3 files changed

+53
-27
lines changed

books/hyperactor-book/src/mailboxes/delivery.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ pub struct MessageEnvelope {
2626

2727
/// Error contains a delivery error when message delivery failed.
2828
error: Option<DeliveryError>,
29+
30+
/// Additional context for this message.
31+
headers: Attrs,
2932
}
3033
```
3134

3235
`MessageEnvelope::new` creates a message envelope:
3336
```rust
3437
impl MessageEnvelope {
35-
fn new(sender: ActorId, dest: PortId, data: Serialized) -> Self { ... }
38+
fn new(sender: ActorId, dest: PortId, data: Serialized, headers: Attrs) -> Self { ... }
3639
}
3740
```
3841
`MessageEnvelope::new_unknown` creates a new envelope when we don't know who the sender is:
@@ -47,7 +50,7 @@ If a type `T` implements `Serialize` and `Named`, an envelope can be constructed
4750
```rust
4851
impl MessageEnvelope {
4952
fn serialize<T: Serialize + Named>(
50-
source: ActorId, dest: PortId, value: &T) -> Result<Self, bincode::Error> {
53+
source: ActorId, dest: PortId, value: &T, headers: Attrs) -> Result<Self, bincode::Error> {
5154
Ok(Self {
5255
data: Serialized::serialize(value)?,
5356
sender: source,

books/hyperactor-book/src/mailboxes/mailbox.md

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,19 @@ impl MailboxSender for Mailbox {
180180
),
181181
Entry::Occupied(entry) => {
182182
let (metadata, data) = envelope.open();
183-
match entry.get().send_serialized(data) {
183+
let MessageMetadata {headers, sender, dest, error: metadata_error } = metadata;
184+
match entry.get().send_serialized(headers, data) {
184185
Ok(false) => {
185186
entry.remove();
186187
}
187188
Ok(true) => (),
188-
Err(SerializedSenderError { data, error }) => MessageEnvelope::seal(
189-
metadata, data,
189+
Err(SerializedSenderError {
190+
data,
191+
error,
192+
headers,
193+
}) => MessageEnvelope::seal(
194+
MessageMetadata { headers, sender, dest, error: metadata_error },
195+
data,
190196
)
191197
.undeliverable(DeliveryError::Mailbox(format!("{}", error)), return_handle),
192198
}
@@ -213,7 +219,7 @@ The mailbox uses the destination `PortId` to locate the bound port in its intern
213219

214220
3. Deserialization and Delivery Attempt
215221
```rust
216-
match entry.get().send_serialized(data)
222+
match entry.get().send_serialized(headers, data)
217223
```
218224
If the port is found, the message is unsealed and passed to the corresponding `SerializedSender` (e.g., the `UnboundedSender` inserted during binding). This may succeed or fail:
219225
- `Ok(true)`: Message was delivered.
@@ -276,7 +282,7 @@ There are two distinct pathways by which a message can arrive at a `PortReceiver
276282
When you call `.send(msg)` on a `PortHandle<M>`, the message bypasses the `Mailbox` entirely and goes directly into the associated channel:
277283
```text
278284
PortHandle<M>::send(msg)
279-
→ UnboundedPortSender<M>::send(msg)
285+
→ UnboundedPortSender<M>::send(Attrs::new(), msg)
280286
→ underlying channel (mpsc::UnboundedSender<M>)
281287
→ PortReceiver<M>::recv().await
282288
```
@@ -287,8 +293,8 @@ When a message is wrapped in a `MessageEnvelope` and posted via `Mailbox::post`,
287293
```text
288294
Mailbox::post(envelope, return_handle)
289295
→ lookup State::ports[port_index]
290-
→ SerializedSender::send_serialized(bytes)
291-
→ UnboundedSender::send(M) // after deserialization
296+
→ SerializedSender::send_serialized(headers, bytes)
297+
→ UnboundedSender::send(headers, M) // after deserialization
292298
→ mpsc channel
293299
→ PortReceiver<M>::recv().await
294300
```
@@ -320,28 +326,33 @@ impl<T: sealed::CanSend> CanSend for T {}
320326
The sealed version defines the core method:
321327
```rust
322328
pub trait sealed::CanSend: Send + Sync {
323-
fn post(&self, dest: PortId, data: Serialized);
329+
fn post(&self, dest: PortId, headers: Attrs, data: Serialized);
324330
}
325331
```
326332
Only internal types (e.g., `Mailbox`) implement this sealed trait, meaning only trusted components can obtain `CanSend`:
327333
```rust
328334
impl cap::sealed::CanSend for Mailbox {
329-
fn post(&self, dest: PortId, data: Serialized) {
335+
fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
330336
let return_handle = self
331337
.lookup_sender::<Undeliverable<MessageEnvelope>>()
332338
.map_or_else(
333339
|| {
334-
let bt = std::backtrace::Backtrace::capture();
335-
tracing::warn!(
336-
actor_id = ?self.actor_id(),
337-
backtrace = ?bt,
338-
"Mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
339-
);
340+
let actor_id = self.actor_id();
341+
if CAN_SEND_WARNED_MAILBOXES
342+
.get_or_init(DashSet::new)
343+
.insert(actor_id.clone()) {
344+
let bt = std::backtrace::Backtrace::capture();
345+
tracing::warn!(
346+
actor_id = ?actor_id,
347+
backtrace = ?bt,
348+
"mailbox attempted to post a message without binding Undeliverable<MessageEnvelope>"
349+
);
350+
}
340351
monitored_return_handle()
341352
},
342-
|sender| PortHandle::new(self.clone(), u64::MAX, sender),
353+
|sender| PortHandle::new(self.clone(), self.state.allocate_port(), sender),
343354
);
344-
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data);
355+
let envelope = MessageEnvelope::new(self.actor_id().clone(), dest, data, headers);
345356
MailboxSender::post(self, envelope, return_handle);
346357
}
347358
}

books/hyperactor-book/src/mailboxes/ports.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ enum UnboundedPortSender<M: Message> {
2121
- **`Mpsc`**: Sends messages into a tokio unbounded channel
2222
- **`Func`**: Custom logic, often used to enqueue messages onto actor work queues.
2323

24-
Messages are sent via the `.send(message)` method, which forwards to either the internal channel or the configured function.
24+
Messages are sent via the `.send(headers, message)` method, which forwards to either the internal channel or the configured function.
2525

2626
## `PortHandle<M>`
2727

@@ -385,8 +385,13 @@ Calling `.send_once(message)` on an `OnceSender` consumes the channel, and fails
385385
To enable uniform message routing, both `UnboundedSender` and `OnceSender` implement the `SerializedSender` trait:
386386
```rust
387387
trait SerializedSender: Send + Sync {
388-
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError>;
389-
}
388+
fn as_any(&self) -> &dyn Any;
389+
fn send_serialized(
390+
&self,
391+
headers: Attrs,
392+
serialized: Serialized,
393+
) -> Result<bool, SerializedSenderError>;
394+
390395
```
391396
This trait lets the mailbox deliver a `Serialized` message (a type-erased, encoded payload) by:
392397
1. Deserializing the payload into a concrete `M` using `RemoteMessage` trait,
@@ -407,18 +412,24 @@ See the (Mailbox) [`State`](./mailbox.md#state) section for details on how the m
407412
Below is the canonical implementation of `SerializedSender` for `UnboundedSender<M>`:
408413
```rust
409414
impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
410-
fn send_serialized(&self, serialized: Serialized) -> Result<bool, SerializedSenderError> {
415+
fn send_serialized(
416+
&self,
417+
headers: Attrs,
418+
serialized: Serialized,
419+
) -> Result<bool, SerializedSenderError> {
411420
match serialized.deserialized() {
412421
Ok(message) => {
413-
self.sender
414-
.send(message)
415-
.map_err(|err| SerializedSenderError {
422+
self.sender.send(headers.clone(), message).map_err(|err| {
423+
SerializedSenderError {
416424
data: serialized,
417425
error: MailboxSenderError::new_bound(
418426
self.port_id.clone(),
419427
MailboxSenderErrorKind::Other(err),
420428
),
421-
})?;
429+
headers,
430+
}
431+
})?;
432+
422433
Ok(true)
423434
}
424435
Err(err) => Err(SerializedSenderError {
@@ -427,6 +438,7 @@ impl<M: RemoteMessage> SerializedSender for UnboundedSender<M> {
427438
self.port_id.clone(),
428439
MailboxSenderErrorKind::Deserialize(M::typename(), err),
429440
),
441+
headers,
430442
}),
431443
}
432444
}

0 commit comments

Comments
 (0)