Skip to content

[release/6.0] HTTP/3: Support canceling requests that aren't reading a body #35823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Microsoft.AspNetCore.Connections.Features
public interface IProtocolErrorCodeFeature
{
/// <summary>
/// Gets or sets the error code.
/// Gets or sets the error code. The property returns -1 if the error code hasn't been set.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO IProtocolErrorCodeFeature.Error should have been nullable, but the API ship sailed in .NET 5.

QUIC and HTTP/3 errors are variable-length integers that are always positive so -1 can be used to represent unset.

/// </summary>
long Error { get; set; }
}
Expand Down
28 changes: 27 additions & 1 deletion src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,30 @@ public void Initialize(Http3StreamContext context)
}

_frameWriter.Reset(context.Transport.Output, context.ConnectionId);

// We register to connection closed to handle situation where the client
// aborts after data is already sent. Only its read-side of the stream is aborted
// which means Kestrel is only notified of the abort when it writes to the stream.
// This event immediately notifies Kestrel that the client has aborted the request
// and Kestrel will complete pipes and cancel the RequestAborted token.
//
// TODO: Consider a better way to provide this notification. For perf we want to
// make the ConnectionClosed CTS pay-for-play, and change this event to use
// something that is more lightweight than a CTS.
Comment on lines +134 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be created lazily from the RequestAborted property?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still want to stop and complete the output pipes in this situation. So, we always want this event raised, even if someone hasn't subscribed to RequestAborted.

context.StreamContext.ConnectionClosed.Register(static s =>
{
var stream = (Http3Stream)s!;

if (!stream.IsCompleted)
{
// An error code value other than -1 indicates a value was set and the request didn't gracefully complete.
var errorCode = stream._errorCodeFeature.Error;
if (errorCode >= 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does an error code of 0 indicate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the stream was aborted with error code of 0. That doesn't have any meaning in HTTP/3, but it's a valid error code for QUIC.

{
stream.Abort(new ConnectionAbortedException(CoreStrings.Http2StreamResetByClient), (Http3ErrorCode)errorCode);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be updated to call AbortCore when #35764 is merged.

}
}
}, this);
}

public void InitializeWithExistingContext(IDuplexPipe transport)
Expand Down Expand Up @@ -465,7 +489,9 @@ public async Task ProcessRequestAsync<TContext>(IHttpApplication<TContext> appli
catch (ConnectionResetException ex)
{
error = ex;
Abort(new ConnectionAbortedException(ex.Message, ex), (Http3ErrorCode)_errorCodeFeature.Error);

var resolvedErrorCode = _errorCodeFeature.Error >= 0 ? _errorCodeFeature.Error : 0;
Abort(new ConnectionAbortedException(ex.Message, ex), (Http3ErrorCode)resolvedErrorCode);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ internal sealed partial class QuicConnectionContext : IProtocolErrorCodeFeature,
{
private X509Certificate2? _clientCert;
private Task<X509Certificate2?>? _clientCertTask;
private long? _error;

public long Error { get; set; }
public long Error
{
get => _error ?? -1;
set => _error = value;
}

public X509Certificate2? ClientCertificate
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ public override void Abort(ConnectionAbortedException abortReason)
return;
}

var resolvedErrorCode = _error ?? 0;
_abortReason = ExceptionDispatchInfo.Capture(abortReason);
_log.ConnectionAbort(this, Error, abortReason.Message);
_closeTask = _connection.CloseAsync(errorCode: Error).AsTask();
_log.ConnectionAbort(this, resolvedErrorCode, abortReason.Message);
_closeTask = _connection.CloseAsync(errorCode: resolvedErrorCode).AsTask();
}
}

Expand Down Expand Up @@ -127,7 +128,7 @@ public override void Abort(ConnectionAbortedException abortReason)
catch (QuicConnectionAbortedException ex)
{
// Shutdown initiated by peer, abortive.
Error = ex.ErrorCode;
_error = ex.ErrorCode;
_log.ConnectionAborted(this, ex.ErrorCode, ex);

ThreadPool.UnsafeQueueUserWorkItem(state =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature
{
private IDictionary<object, object?>? _persistentState;
private long? _error;

public bool CanRead { get; private set; }
public bool CanWrite { get; private set; }

public long Error { get; set; }
public long Error
{
get => _error ?? -1;
set => _error = value;
}

public long StreamId { get; private set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Net.Quic;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void Initialize(QuicStream stream)

CanRead = _stream.CanRead;
CanWrite = _stream.CanWrite;
Error = 0;
_error = null;
StreamId = _stream.StreamId;
PoolExpirationTicks = 0;

Expand Down Expand Up @@ -148,6 +149,7 @@ private async Task StartAsync()
// Streams may or may not have reading/writing, so only start tasks accordingly
var receiveTask = Task.CompletedTask;
var sendTask = Task.CompletedTask;
var sendCompletedTask = Task.CompletedTask;

if (_stream.CanRead)
{
Expand All @@ -157,18 +159,44 @@ private async Task StartAsync()
if (_stream.CanWrite)
{
sendTask = DoSend();
sendCompletedTask = WaitForWritesCompleted();
}

// Now wait for both to complete
await receiveTask;
await sendTask;
await sendCompletedTask;

await FireStreamClosedAsync();
}
catch (Exception ex)
{
_log.LogError(0, ex, $"Unexpected exception in {nameof(QuicStreamContext)}.{nameof(StartAsync)}.");
}
}

private async Task WaitForWritesCompleted()
{
Debug.Assert(_stream != null);

try
{
await _stream.WaitForWriteCompletionAsync();
}
catch (Exception ex)
{
// Send error to DoSend loop.
lock (_shutdownLock)
{
_shutdownReason ??= ex;
}
}
finally
{
Output.CancelPendingRead();
}
}

private async Task DoReceive()
{
Debug.Assert(_stream != null);
Expand Down Expand Up @@ -204,6 +232,9 @@ private async Task DoReceive()
if (completeTask.IsCompletedSuccessfully)
{
// Fast path. CompleteAsync completed immediately.
// Most implementations of ValueTask reset state in GetResult.
completeTask.GetAwaiter().GetResult();

flushTask = ValueTask.FromResult(new FlushResult(isCanceled: false, isCompleted: true));
}
else
Expand Down Expand Up @@ -240,7 +271,7 @@ private async Task DoReceive()
catch (QuicStreamAbortedException ex)
{
// Abort from peer.
Error = ex.ErrorCode;
_error = ex.ErrorCode;
_log.StreamAborted(this, ex.ErrorCode, ex);

// This could be ignored if _shutdownReason is already set.
Expand Down Expand Up @@ -268,10 +299,6 @@ private async Task DoReceive()
{
// If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
Input.Complete(ResolveCompleteReceiveException(error));

FireStreamClosed();

await _waitForConnectionClosedTcs.Task;
}

async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTask)
Expand All @@ -286,12 +313,12 @@ async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTas
return _shutdownReadReason ?? _shutdownReason ?? error;
}

private void FireStreamClosed()
private Task FireStreamClosedAsync()
{
// Guard against scheduling this multiple times
if (_streamClosed)
{
return;
return Task.CompletedTask;
}

_streamClosed = true;
Expand All @@ -304,6 +331,8 @@ private void FireStreamClosed()
},
this,
preferLocal: false);

return _waitForConnectionClosedTcs.Task;
}

private void CancelConnectionClosedToken()
Expand Down Expand Up @@ -335,6 +364,15 @@ private async Task DoSend()

if (result.IsCanceled)
{
// WaitForWritesCompleted provides immediate notification that write-side of stream has completed.
// If the stream or connection is aborted then exception will be available to rethrow.

var ex = _shutdownWriteReason ?? _shutdownReason;
if (ex != null)
{
ExceptionDispatchInfo.Throw(ex);
}

break;
}

Expand All @@ -359,7 +397,7 @@ private async Task DoSend()
catch (QuicStreamAbortedException ex)
{
// Abort from peer.
Error = ex.ErrorCode;
_error = ex.ErrorCode;
_log.StreamAborted(this, ex.ErrorCode, ex);

// This could be ignored if _shutdownReason is already set.
Expand All @@ -370,7 +408,7 @@ private async Task DoSend()
catch (QuicConnectionAbortedException ex)
{
// Abort from peer.
Error = ex.ErrorCode;
_error = ex.ErrorCode;
_log.StreamAborted(this, ex.ErrorCode, ex);

// This could be ignored if _shutdownReason is already set.
Expand All @@ -385,6 +423,10 @@ private async Task DoSend()
// System.Net.Quic exception handling not finalized.
unexpectedError = ex;
}
catch (ConnectionAbortedException ex)
{
unexpectedError = ex;
}
catch (Exception ex)
{
shutdownReason = ex;
Expand Down Expand Up @@ -415,19 +457,20 @@ public override void Abort(ConnectionAbortedException abortReason)
_serverAborted = true;
_shutdownReason = abortReason;

_log.StreamAbort(this, Error, abortReason.Message);
var resolvedErrorCode = _error ?? 0;
_log.StreamAbort(this, resolvedErrorCode, abortReason.Message);

lock (_shutdownLock)
{
if (_stream != null)
{
if (_stream.CanRead)
{
_stream.AbortRead(Error);
_stream.AbortRead(resolvedErrorCode);
}
if (_stream.CanWrite)
{
_stream.AbortWrite(Error);
_stream.AbortWrite(resolvedErrorCode);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ public async Task AcceptAsync_ClientStartsAndStopsBidirectionStream_ServerAccept
read = await serverStream.Transport.Input.ReadAsync().DefaultTimeout();
Assert.True(read.IsCompleted);

await serverStream.Transport.Output.CompleteAsync();

await closedTcs.Task.DefaultTimeout();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ public async Task ClientCertificate_Required_NotSent_ConnectionAborted()

var qex = await Assert.ThrowsAsync<QuicException>(async () => await clientConnection.ConnectAsync().DefaultTimeout());
Assert.Equal("Connection has been shutdown by transport. Error Code: 0x80410100", qex.Message);

// https://github.com/dotnet/runtime/issues/57246 The accept still completes even though the connection was rejected, but it's already failed.
var serverContext = await connectionListener.AcceptAndAddFeatureAsync().DefaultTimeout();
await Assert.ThrowsAsync<QuicException>(() => serverContext.ConnectAsync().DefaultTimeout());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public async Task BidirectionalStream_ServerWritesDataAndDisposes_ClientReadsDat
await serverStream.Transport.Input.CompleteAsync().DefaultTimeout();
await serverStream.Transport.Output.CompleteAsync().DefaultTimeout();

Logger.LogInformation("Client reading until end of stream.");
var data = await clientStream.ReadUntilEndAsync().DefaultTimeout();
Assert.Equal(testData.Length, data.Length);
Assert.Equal(testData, data);

var quicStreamContext = Assert.IsType<QuicStreamContext>(serverStream);

Logger.LogInformation("Server waiting for send and receiving loops to complete.");
Expand All @@ -150,11 +155,6 @@ public async Task BidirectionalStream_ServerWritesDataAndDisposes_ClientReadsDat
await quicStreamContext.DisposeAsync().DefaultTimeout();
quicStreamContext.Dispose();

Logger.LogInformation("Client reading until end of stream.");
var data = await clientStream.ReadUntilEndAsync().DefaultTimeout();
Assert.Equal(testData.Length, data.Length);
Assert.Equal(testData, data);

var quicConnectionContext = Assert.IsType<QuicConnectionContext>(serverConnection);

Assert.Equal(1, quicConnectionContext.StreamPool.Count);
Expand Down Expand Up @@ -402,6 +402,7 @@ public async Task ServerToClientUnidirectionalStream_ServerAborts_ClientGetsAbor

Assert.Equal(TestData, data);

Logger.LogInformation("Server aborting stream");
((IProtocolErrorCodeFeature)serverStream).Error = (long)Http3ErrorCode.InternalError;
serverStream.Abort(new ConnectionAbortedException("Test message"));

Expand Down
12 changes: 9 additions & 3 deletions src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ internal class TestMultiplexedConnectionContext : MultiplexedConnectionContext,
});

private readonly Http3InMemory _testBase;
private long _error;
private long? _error;

public TestMultiplexedConnectionContext(Http3InMemory testBase)
{
Expand All @@ -946,7 +946,7 @@ public TestMultiplexedConnectionContext(Http3InMemory testBase)

public long Error
{
get => _error;
get => _error ?? -1;
set => _error = value;
}

Expand Down Expand Up @@ -1019,6 +1019,7 @@ internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, I

private TaskCompletionSource _disposingTcs;
private TaskCompletionSource _disposedTcs;
internal long? _error;

public TestStreamContext(bool canRead, bool canWrite, Http3InMemory testBase)
{
Expand Down Expand Up @@ -1072,6 +1073,7 @@ public void Initialize(long streamId)
ConnectionId = "TEST:" + streamId.ToString();
AbortReadException = null;
AbortWriteException = null;
_error = null;

_disposedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_disposingTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -1112,7 +1114,11 @@ public override IDuplexPipe Transport

public bool CanWrite { get; }

public long Error { get; set; }
public long Error
{
get => _error ?? -1;
set => _error = value;
}

public override void Abort(ConnectionAbortedException abortReason)
{
Expand Down
Loading