Skip to content

[hyperactor-book]: mailboxes: muxers and routers #358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: gh/shayne-fletcher/30/base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions books/hyperactor-book/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
- [Mailboxes and Routers](mailboxes/index.md)
- [Ports](mailboxes/ports.md)
- [MailboxSender](mailboxes/mailbox_sender.md)
- [Reconfigurable Senders](mailboxes/reconfigurable_sender.md)
- [MailboxServer](mailboxes/mailbox_server.md)
- [MailboxClient](mailboxes/mailbox_client.md)
- [Mailbox](mailboxes/mailbox.md)
- [Delivery Semantics](mailboxes/delivery.md)
- [Multiplexers](mailboxes/multiplexer.md)
- [Routers](mailboxes/routers.md)
3 changes: 3 additions & 0 deletions books/hyperactor-book/src/mailboxes/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ This chapter introduces the components of the mailbox subsystem:

- [Ports](ports.md): typed channels for local message delivery
- [MailboxSender](mailbox_sender.md): trait-based abstraction for message posting
- [Reconfigurable Senders](reconfigurable_sender.md): deferred wiring and dynamic configuration
- [MailboxServer](mailbox_server.md): bridging incoming message streams into mailboxes
- [MailboxClient](mailbox_client.md): buffering, forwarding, and failure reporting
- [Mailbox](mailbox.md): port registration, binding, and routing
- [Delivery Semantics](delivery.md): envelopes, delivery errors, and failure handling
- [Multiplexers](multiplexer.md): port-level dispatch to local mailboxes
- [Routers](routers.md): prefix-based routing to local or remote destinations
73 changes: 73 additions & 0 deletions books/hyperactor-book/src/mailboxes/multiplexer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Multiplexers

**Muxers** (short for multiplexers) form the first level of indirection in the mailbox subsystem. While a `Mailbox` delivers messages to typed ports within a single actor, a `MailboxMuxer` delivers messages to the correct mailbox instance given an `ActorId`.

It acts as a dynamic registry, allowing multiple mailboxes to be addressed through a single posting interface.

This page introduces the `MailboxMuxer` and its role in:
- Aggregating multiple mailbox instances
- Dispatching incoming messages to the appropriate `MailboxSender`
- Supporting dynamic binding and unbinding of mailboxes

Let's begin by looking at the core structure of `MailboxMuxer`:
```rust
pub struct MailboxMuxer {
mailboxes: Arc<DashMap<ActorId, Box<dyn MailboxSender + Send + Sync>>>,
}
```
The `MailboxMuxer` maintains a thread-safe, concurrent map from `ActorId` to `MailboxSender` trait objects. Each entry represents a live binding to a mailbox capable of receiving messages for a specific actor. This allows the muxer to act as a single dispatch point for delivering messages to any number of registered actors, abstracting over the details of how and where each mailbox is implemented.

To register a mailbox with the muxer, callers use the `bind` method:
```rust
impl MailboxMuxer {
pub fn bind(&self, actor_id: ActorId, sender: impl MailboxSender + 'static) -> bool {
match self.mailboxes.entry(actor_id) {
Entry::Occupied(_) => false,
Entry::Vacant(entry) => {
entry.insert(Box::new(sender));
true
}
}
}

}
```
This function installs a new mapping from the given `ActorId` to a boxed `MailboxSender`. If the `ActorId` is already registered, the bind fails (returns `false`), and the existing sender is left unchanged. This ensures that actors cannot be accidentally rebound without first explicitly unbinding them—enforcing a clear handoff protocol. To rebind, the caller must invoke `unbind` first.

It's crucial to recall that `Mailbox` itself implements the `MailboxSender` trait. This is what allows it to be registered directly into a `MailboxMuxer`. The `post` method of a `Mailbox` inspects the incoming `MessageEnvelope` to determine whether it is the intended recipient. If the `ActorId` in the envelope matches the mailbox's own ID, the mailbox delivers the message locally: it looks up the appropriate port by index and invokes `send_serialized` on the matching channel. If the `ActorId` does *not* match, the mailbox delegates the message to its internal forwarder by calling `self.state.forwarder.post(envelope)`.

With this behavior in mind, we can now define a convenience method for registering a full `Mailbox`:

```rust
impl MailboxMuxer {
fn bind_mailbox(&self, mailbox: Mailbox) -> bool {
self.bind(mailbox.actor_id().clone(), mailbox)
}
}
```
To support rebinding or teardown, the muxer also provides a symmetric `unbind` function, which removes the sender associated with a given `ActorId`:
```rust
pub(crate) fn unbind(&self, actor_id: &ActorId) {
self.mailboxes.remove(actor_id);
}
```
And of course, we can implement `MailboxSender` for `MailboxMuxer` itself—allowing it to act as a unified dispatcher for all registered mailboxes:
```rust
impl MailboxSender for MailboxMuxer {
fn post(
&self,
envelope: MessageEnvelope,
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
) {
let dest_actor_id = envelope.dest().actor_id();
match self.mailboxes.get(envelope.dest().actor_id()) {
None => {
let err = format!("no mailbox for actor {} registered in muxer", dest_actor_id);
envelope.undeliverable(DeliveryError::Unroutable(err), return_handle)
}
Some(sender) => sender.post(envelope, return_handle),
}
}
}
```
This makes `MailboxMuxer` composable: it can be nested within other routers, shared across components, or substituted for a standalone mailbox in generic code. If the destination `ActorId` is found in the internal map, the message is forwarded to the corresponding sender. Otherwise, it is marked as undeliverable with an appropriate `DeliveryError`.
63 changes: 63 additions & 0 deletions books/hyperactor-book/src/mailboxes/reconfigurable_sender.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Reconfigurable Senders

Some actors are constructed before the full messaging graph is available.
For example, the `ReconfigurableMailboxSender` is used during `MeshAgent::bootstrap` to allow early creation of the `Proc` and agent before outbound routing is available.
The `.configure(...)` method installs the actual router later, once mesh wiring is complete.

## Motivation

Actors like `mesh_agent` are created before remote routing infrastructure is established. These actors need to send messages during setup, but the concrete `MailboxSender` they will use hasn't been determined yet.

To solve this, `ReconfigurableMailboxSender` implements [`MailboxSender`] and supports **deferred configuration**: it starts by queueing messages in memory, then later transitions to forwarding once a real sender is available.

## Internal Structure

The sender wraps a state machine:

```rust
struct ReconfigurableMailboxSender {
state: Arc<RwLock<ReconfigurableMailboxSenderState>>,
}
```
There are two possible states:
```rust
type Post = (MessageEnvelope, PortHandle<Undeliverable<MessageEnvelope>>);

enum ReconfigurableMailboxSenderState {
Queueing(Mutex<Vec<Post>>),
Configured(BoxedMailboxSender),
}
```
- In the `Queueing` state, messages are buffered.
- When `.configure(...)` is called, the queue is flushed into the new sender, and the state is replaced with `Configured(...)`.

### Configuration

The `.configure(...)` method installs the actual sender. If called while in the `Queueing state`, it:
- Drains all buffered messages to the given sender
- Transitions to the `Configured` state
- Returns `true` if this was the first successful configuration

Subsequent calls are ignored and return `false`.
```rust
fn configure(&self, sender: BoxedMailboxSender) -> bool {
let mut state = self.state.write().unwrap();
if state.is_configured() {
return false;
}

let queued = std::mem::replace(
&mut *state,
ReconfigurableMailboxSenderState::Configured(sender.clone()),
);

for (envelope, return_handle) in queued.into_queueing().unwrap().into_inner().unwrap() {
sender.post(envelope, return_handle);
}

*state = ReconfigurableMailboxSenderState::Configured(sender);
true
}
```

This guarantees that messages posted before configuration are not dropped - they are delivered in-order once the sender becomes available.
Loading