Skip to content

Commit b775a32

Browse files
author
Shayne Fletcher
committed
[hyperactor-book]: mailboxes: muxers and routers
new sections in the mailboxes chapter of "hyperactor book" detailing reconfigurable senders, muxers and routers. Differential Revision: [D77398172](https://our.internmc.facebook.com/intern/diff/D77398172/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D77398172/)! ghstack-source-id: 292940290 Pull Request resolved: #359
1 parent 503d8b9 commit b775a32

File tree

5 files changed

+428
-0
lines changed

5 files changed

+428
-0
lines changed

books/hyperactor-book/src/SUMMARY.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
- [Mailboxes and Routers](mailboxes/index.md)
55
- [Ports](mailboxes/ports.md)
66
- [MailboxSender](mailboxes/mailbox_sender.md)
7+
- [Reconfigurable Senders](mailboxes/reconfigurable_sender.md)
78
- [MailboxServer](mailboxes/mailbox_server.md)
89
- [MailboxClient](mailboxes/mailbox_client.md)
910
- [Mailbox](mailboxes/mailbox.md)
1011
- [Delivery Semantics](mailboxes/delivery.md)
12+
- [Multiplexers](mailboxes/multiplexer.md)
13+
- [Routers](mailboxes/routers.md)

books/hyperactor-book/src/mailboxes/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ This chapter introduces the components of the mailbox subsystem:
66

77
- [Ports](ports.md): typed channels for local message delivery
88
- [MailboxSender](mailbox_sender.md): trait-based abstraction for message posting
9+
- [Reconfigurable Senders](reconfigurable_sender.md): deferred wiring and dynamic configuration
910
- [MailboxServer](mailbox_server.md): bridging incoming message streams into mailboxes
1011
- [MailboxClient](mailbox_client.md): buffering, forwarding, and failure reporting
1112
- [Mailbox](mailbox.md): port registration, binding, and routing
1213
- [Delivery Semantics](delivery.md): envelopes, delivery errors, and failure handling
14+
- [Multiplexers](multiplexer.md): port-level dispatch to local mailboxes
15+
- [Routers](routers.md): prefix-based routing to local or remote destinations
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Multiplexers
2+
3+
**Muxers** (short for multiplexers) form the first level of indirection in the mailbox subsystem. While a `Mailbox` delivers messages to typed ports within a single actor, a `MailboxMuxer` delivers messages to the correct mailbox instance given an `ActorId`.
4+
5+
It acts as a dynamic registry, allowing multiple mailboxes to be addressed through a single posting interface.
6+
7+
This page introduces the `MailboxMuxer` and its role in:
8+
- Aggregating multiple mailbox instances
9+
- Dispatching incoming messages to the appropriate `MailboxSender`
10+
- Supporting dynamic binding and unbinding of mailboxes
11+
12+
Let's begin by looking at the core structure of `MailboxMuxer`:
13+
```rust
14+
pub struct MailboxMuxer {
15+
mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
16+
}
17+
```
18+
The `MailboxMuxer` maintains a thread-safe, concurrent map from `ActorId` to `MailboxSender` trait objects. Each entry represents a live binding to a mailbox capable of receiving messages for a specific actor. This allows the muxer to act as a single dispatch point for delivering messages to any number of registered actors, abstracting over the details of how and where each mailbox is implemented.
19+
20+
To register a mailbox with the muxer, callers use the `bind` method:
21+
```rust
22+
impl MailboxMuxer {
23+
pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
24+
match self.mailboxes.entry(actor_id) {
25+
Entry::Occupied(_) => false,
26+
Entry::Vacant(entry) => {
27+
entry.insert(Box::new(sender));
28+
true
29+
}
30+
}
31+
}
32+
33+
}
34+
```
35+
This function installs a new mapping from the given `ActorId` to a boxed `MailboxSender`. If the `ActorId` is already registered, the bind fails (returns `false`), and the existing sender is left unchanged. This ensures that actors cannot be accidentally rebound without first explicitly unbinding them—enforcing a clear handoff protocol. To rebind, the caller must invoke `unbind` first.
36+
37+
It's crucial to recall that `Mailbox` itself implements the `MailboxSender` trait. This is what allows it to be registered directly into a `MailboxMuxer`. The `post` method of a `Mailbox` inspects the incoming `MessageEnvelope` to determine whether it is the intended recipient. If the `ActorId` in the envelope matches the mailbox's own ID, the mailbox delivers the message locally: it looks up the appropriate port by index and invokes `send_serialized` on the matching channel. If the `ActorId` does *not* match, the mailbox delegates the message to its internal forwarder by calling `self.state.forwarder.post(envelope)`.
38+
39+
With this behavior in mind, we can now define a convenience method for registering a full `Mailbox`:
40+
41+
```rust
42+
impl MailboxMuxer {
43+
fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
44+
self.bind(mailbox.actor_id().clone(), mailbox)
45+
}
46+
}
47+
```
48+
To support rebinding or teardown, the muxer also provides a symmetric `unbind` function, which removes the sender associated with a given `ActorId`:
49+
```rust
50+
pub(crate) fn unbind(&self, actor_id: &ActorId) {
51+
self.mailboxes.remove(actor_id);
52+
}
53+
```
54+
And of course, we can implement `MailboxSender` for `MailboxMuxer` itself—allowing it to act as a unified dispatcher for all registered mailboxes:
55+
```rust
56+
impl MailboxSender for MailboxMuxer {
57+
fn post(
58+
&self,
59+
envelope: MessageEnvelope,
60+
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
61+
) {
62+
let dest_actor_id = envelope.dest().actor_id();
63+
match self.mailboxes.get(envelope.dest().actor_id()) {
64+
None => {
65+
let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
66+
envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
67+
}
68+
Some(sender) => sender.post(envelope, return_handle),
69+
}
70+
}
71+
}
72+
```
73+
This makes `MailboxMuxer` composable: it can be nested within other routers, shared across components, or substituted for a standalone mailbox in generic code. If the destination `ActorId` is found in the internal map, the message is forwarded to the corresponding sender. Otherwise, it is marked as undeliverable with an appropriate `DeliveryError`.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Reconfigurable Senders
2+
3+
Some actors are constructed before the full messaging graph is available.
4+
For example, the `ReconfigurableMailboxSender` is used during `MeshAgent::bootstrap` to allow early creation of the `Proc` and agent before outbound routing is available.
5+
The `.configure(...)` method installs the actual router later, once mesh wiring is complete.
6+
7+
## Motivation
8+
9+
Actors like `mesh_agent` are created before remote routing infrastructure is established. These actors need to send messages during setup, but the concrete `MailboxSender` they will use hasn't been determined yet.
10+
11+
To solve this, `ReconfigurableMailboxSender` implements [`MailboxSender`] and supports **deferred configuration**: it starts by queueing messages in memory, then later transitions to forwarding once a real sender is available.
12+
13+
## Internal Structure
14+
15+
The sender wraps a state machine:
16+
17+
```rust
18+
struct ReconfigurableMailboxSender {
19+
state: Arc<RwLock<ReconfigurableMailboxSenderState>>,
20+
}
21+
```
22+
There are two possible states:
23+
```rust
24+
type Post = (MessageEnvelope, PortHandle<Undeliverable<MessageEnvelope>>);
25+
26+
enum ReconfigurableMailboxSenderState {
27+
Queueing(Mutex<Vec<Post>>),
28+
Configured(BoxedMailboxSender),
29+
}
30+
```
31+
- In the `Queueing` state, messages are buffered.
32+
- When `.configure(...)` is called, the queue is flushed into the new sender, and the state is replaced with `Configured(...)`.
33+
34+
### Configuration
35+
36+
The `.configure(...)` method installs the actual sender. If called while in the `Queueing state`, it:
37+
- Drains all buffered messages to the given sender
38+
- Transitions to the `Configured` state
39+
- Returns `true` if this was the first successful configuration
40+
41+
Subsequent calls are ignored and return `false`.
42+
```rust
43+
fn configure(&self, sender: BoxedMailboxSender) -> bool {
44+
let mut state = self.state.write().unwrap();
45+
if state.is_configured() {
46+
return false;
47+
}
48+
49+
let queued = std::mem::replace(
50+
&mut *state,
51+
ReconfigurableMailboxSenderState::Configured(sender.clone()),
52+
);
53+
54+
for (envelope, return_handle) in queued.into_queueing().unwrap().into_inner().unwrap() {
55+
sender.post(envelope, return_handle);
56+
}
57+
58+
*state = ReconfigurableMailboxSenderState::Configured(sender);
59+
true
60+
}
61+
```
62+
63+
This guarantees that messages posted before configuration are not dropped - they are delivered in-order once the sender becomes available.

0 commit comments

Comments
 (0)