Skip to content

Actors: bi-directional streaming #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions 20250115-actors-bi-di-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Actors gRPC Bi-Directional Streaming

* Author(s): @joshvanl

## Overview

This proposal introduces support for the Actor runtime API to enable bi-directional streaming in gRPC.
Client-initiated connections will make Actor requests to the Dapr sidecar, and the Dapr sidecar will stream app invocations back to the client over the same connection.
Each stream will accommodate a single Actor Type.

## Background

Currently, app Actor API calls are made over unary HTTP or gRPC calls.
All Daprd Actor invocations of the app occur over unary HTTP requests, including discovering hosted Actor Types and configuration.
This requires the app to both have an open port, and be routable by Daprd to receive these requests.
Additionally, there is no current mechanism for dynamic Actor Type discovery.

By implementing the Actor API over a gRPC bi-directional stream, the app does not need to open an HTTP port to receive Actor invocations.
This approach also enables dynamic Actor Type registration.

## Implementation Details

The protos that implement the new Actor stream runtime reside in the new proto package `dapr.proto.actors.v1`.

```proto
// Actors service provides APIs for user applications to interact with the Dapr Actor
// runtime and receive actor messages.
service Actors {
// Stream is the bi-directional streaming RPC for actors to receive messages
// and send responses.
rpc Stream(stream Request) returns (stream Response) {}
}
```

### Initial Message

Each gRPC stream connected to Daprd will first register its Actor Type along with that type’s entity configuration.
Specifically, the following fields for that type will be included:

```go
type EntityConfig struct {
ActorIdleTimeout string
DrainOngoingCallTimeout string
DrainRebalancedActors bool
Reentrancy ReentrancyConfig
RemindersStoragePartitions int
}
```

Upon successful registration, which involves Daprd registering the Actor Type with the Actor runtime, the stream will be used to send and receive Actor invocations.
Registration involves adding that type to the internal actor table factories, targeting the gRPC app stream, and broadcasting the hosted actor type to placement.

Similar to the `SubscribeTopicEventsAlpha1` RPC, the Actors runtime will take a `oneof` request message, with an initial message resembling `[SubscribeTopicEventsRequestAlpha1](https://github.com/dapr/dapr/blob/776209e56e12fbff4bac68fad0c7f28e0eaf6ec2/dapr/proto/runtime/v1/dapr.proto#L450)`.

If multiple initial streams of the same Actor Type are registered, they must all contain the same initial message; otherwise, subsequent streams will be rejected.
The first message from a client on a stream must be the initial message.
The initial request will contain the following message:

```proto
// RequestInitial is the initial message containing details for
// configuring actor runtime APIs.
message RequestInitial {
string type = 1; // Required: Actor Type
optional google.protobuf.Duration idle_timeout = 2;
optional bool drain_rebalanced = 3;
optional google.protobuf.Duration drain_ongoing_call_timeout = 4;
optional RequestInitialReentrancyConfig reentrancy_config = 5;
}

message RequestInitialReentrancyConfig {
optional int32 max_stack_depth = 1;
}
```

### Actor API Requests

After the initial message, the client sends API requests to Daprd or processes received message events.

```proto
message Request {
oneof actor_request_type {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be an error channel exposed here somewhere so that if the client requests the creation of an actor and some prerequisite on the runtime isn't met (e.g. cannot connect to the specified actor state store), such a message can be sent back to the SDK so an exception can be raised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending the error response on initialization should be covered in the ResponseInitial message sent back to the client.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if an error is encountered later on after initialization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be covered by the ResponeResponse message. I'm not sure whether the error should be a generic error which lives outside the oneofs, or needs to be specific to each oneof API type. I'm assuming it needs to be inside each oneof message so that it can be fully typed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially as this channel is potentially open for a little while, it'd be better to have a oneof in place so the SDK can understand if the error is transient and something that just needs the channel re-opened for or if it's more of a fatal error (and should just be logged and given up on).

I'll have to find the issue, but it's a complaint on one of the recent building blocks that when there's a failure, the SDK doesn't handle it really well because there's no real information from the runtime what to do about it except that something went wrong. It'd be great to correct that scenario here.

RequestInitial initial = 1;
RequestRequest request = 2;
RequestProcessed processed = 3;
}
}

message RequestRequest {
uint64 uid = 1; // Unique identifier per request
RequestAPI request = 2;
}

message RequestAPI {
oneof api {
RequestInvokeActor invoke_actor = 1;
RequestRegisterTimer register_timer = 2;
RequestUnregisterTimer unregister_timer = 3;
RequestRegisterReminder register_reminder = 4;
RequestUnregisterReminder unregister_reminder = 5;
RequestGetState get_state = 6;
RequestExecuteStateTransaction execute_state_transaction = 7;
RequestPublishMessage publish_message = 8; // Actor pubsub not yet implemented.
}
}
```

Each outbound `RequestRequest` message includes a `uid` field, which is a simple incrementing `uint64`.
This field must start at `1` and increment by `1` for each new request.
If a message is received by Daprd from the client with a `uid` that is not `+1` of the previous message, the stream will be terminated.

For all inbound messages, the client must send a corresponding processed event with the same `uid`.

```proto
message RequestProcessed {
uint64 uid = 1;
RequestProcessedResponse response = 2;
}

message RequestProcessedResponse {
oneof response {
RequestProcessedResponseInvokeActor invoke_actor = 1;
RequestProcessedResponseTimer timer = 2;
RequestProcessedResponseReminder reminder = 3;
RequestProcessedResponsePubSubMessage pubsub_message = 4;
}
}
```

### Actor Inbound Messages

Daprd sends messages containing responses to API requests and event messages over the bi-directional stream.
These similarly contain a `uid` to track the request-response pair.

```proto
message Response {
oneof actor_response_type {
ResponseInitial initial = 1;
ResponseResponse response = 2;
ResponseProcessed processed = 3;
}
}

message ResponseResponse {
uint64 uid = 1;
ResponseAPI api = 2;
}

message ResponseAPI {
oneof api {
ResponseInvokeActor invoke_actor = 1;
ResponseRegisterTimer register_timer = 2;
ResponseUnregisterTimer unregister_timer = 3;
ResponseRegisterReminder register_reminder = 4;
ResponseUnregisterReminder unregister_reminder = 5;
ResponseGetState get_state = 6;
ResponseExecuteStateTransaction execute_state_transaction = 7;
ResponseSubscribePubSub subscribe_pubsub = 8; // Actor pubsub not yet implemented.
ResponsePublishPubSubMessage publish_pubsub_message = 9; // Actor pubsub not yet implemented.
}
}

message ResponseProcessed {
uint64 uid = 1;
ResponseProcessedResponse response = 2;
}

message ResponseProcessedResponse {
oneof response {
ResponseProcessedResponseInvokeActor invoke_actor = 1;
ResponseProcessedResponseTimer timer = 2;
ResponseProcessedResponseReminder reminder = 3;
ResponseProcessedResponsePubSubMessage pubsub_message = 4;
}
}
```

### Feature Lifecycle Outline

The runtime needs to be updated to implement these APIs and integrate them into the existing Actor machinery.
All Actor-supported SDKs must also be updated to track request-response pairs using the `uid`.

Once implemented, the existing Actor API should be deprecated and eventually removed in favor of this new approach.