-
Notifications
You must be signed in to change notification settings - Fork 863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bedrock ConverseStreamResponse provides no easy way to read the streamed response without blocking the calling .NET thread #3542
Comments
This is in principle how it could be implemented, using an extension, and with System.Threading.Channels: public static class ConverseStreamOutputExtensions
{
public async IAsyncEnumerable<IEventStreamEvent> AsAsyncEnumerable(ConverseStreamOutput output, [EnumeratorCancellation] CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<IEventStreamEvent>();
void OnEventReceived(object? sender, EventStreamEventReceivedArgs<IEventStreamEvent> args)
{
var _ = channel.Writer.TryWrite(args.EventStreamEvent);
// the metadata event is the last one of the event stream
if (args.EventStreamEvent is ConverseStreamMetadataEvent)
{
channel.Writer.Complete();
}
}
void OnExceptionReceived(object? sender, EventStreamExceptionReceivedArgs<BedrockRuntimeEventStreamException> args)
{
var _ = channel.Writer.TryComplete(args.EventStreamException);
}
Task processingTask;
try
{
output.EventReceived += OnEventReceived;
output.ExceptionReceived += OnExceptionReceived;
processingTask = output..StartProcessingAsync();
await foreach (var e in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return e;
}
}
finally
{
output.EventReceived -= OnEventReceived;
output.ExceptionReceived -= OnExceptionReceived;
}
await processingTask.ConfigureAwait(false);
}
} Not completely sure how such code could be added to the SDK since all the files say that there are auto-generated from the service model json... |
Needs review with the team. |
I've submitted a pull request to add support for IAsyncEnumerable in EnumerableEventStream, which makes it available for Bedrock: #3543 It's much easier than with the extension above, fortunately. |
Hi @ashishdhingra, which SDK version will get this change? |
@shethaadit The issue is marked with CC @normj |
The improvement we talked about is available in the latest preview of |
Comments on closed issues are hard for our team to see. |
Describe the bug
The common way to read from Bedrock
ConverseStreamResponse
is apparently to callresponse.Stread.AsEnumerable()
. This achieves the goal of streaming the response completions as they are received, but at the cost of keeping the calling .NET thread busy. So it can lead to thread exhaustion.Ideally we need a new variant that does not block the thread, so maybe an async enumerator exposed as
response.Stread.AsAsyncEnumerable()
.Regression Issue
Expected Behavior
I expect to have an easy method that gives an
IAsyncEnumerable
fromConverseStreamResponse
, and which is implemented without blocking the calling thread.Current Behavior
The current API surface in
ConverseStreamResponse
encourages the use ofresponse.Stread.AsEnumerable()
which blocks the calling thread.Reproduction Steps
N/A
Possible Solution
This problem was mentioned in #3360 (comment) for a further improvement, with a suggested implementation. I believe the suggested implementation is imperfect, but it can be a basis for a solution.
Additional Information/Context
No response
AWS .NET SDK and/or Package version used
AWSSDK.BedrockRuntime 3.7.404.8
Targeted .NET Platform
.NET 8
Operating System and version
Docker
The text was updated successfully, but these errors were encountered: