Description
Is your feature request related to a problem? Please describe it.
Currently, there is no way to propagate backpressure from a client-streaming RPC. For example:
service TestService {
rpc CreateStream(stream Input) returns google.protobuf.Empty;
}
is used with generated code that looks like the following:
let stream = client.createStream(handler {_ in})
_ = stream.sendMessage(input)
If the server is not processing messages fast enough, messages will (if I've read the implementation correctly) buffer with no bound in-memory.
For use cases where the data to be transmitted does not necessarily fit in memory, this becomes difficult to work with since no backpressure is propagated to the caller.
Describe the solution you'd like
Swift supports backpressure as a first-class feature in the Combine framework. Subscribers can propagate "demand" sizes to Publishers and Publishers can use this information to decide whether to generate/retrieve data. Example of a possible interface:
let stream = client.createStream(handler {_ in})
publisher.subscribe(stream.createSubscriber())
Describe alternatives you've considered
The caller can track the number of acks received via the EventLoopFuture
returned from sendMessage
and react to the buffer size measured in this way. This approach works, but it's likely that the tuned buffer size will be suboptimal.
If client code is already using Combine in other parts of the application, the extra hop to propagate backpressure signal is a significant overhead.
Additional Context
Thanks for the consideration! Please let me know if I've missed something above or if there is already an established solution for this.