Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/IceRpc/Internal/IceProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ internal IceProtocolConnection(
duplexConnection,
_memoryPool,
_minSegmentSize,
connectionLostAction: exception => Close("The connection was lost.", exception));
connectionLostAction: exception => DisposeTransport("The connection was lost.", exception));

_payloadWriter = new IcePayloadPipeWriter(_duplexConnectionWriter);

Expand Down Expand Up @@ -123,11 +123,11 @@ async Task PingAsync(CancellationToken cancellationToken)
}
catch (IceRpcException exception)
{
Close("The connection was lost.", exception);
DisposeTransport("The connection was lost.", exception);
}
catch (Exception exception)
{
Close("The connection failed due to an unhandled exception.", exception);
DisposeTransport("The connection failed due to an unhandled exception.", exception);
Debug.Assert(false, $"The ping task completed due to an unhandled exception: {exception}");
throw;
}
Expand Down Expand Up @@ -221,27 +221,27 @@ private protected override async Task<TransportConnectionInformation> ConnectAsy
await ReadFramesAsync(CancellationToken.None).ConfigureAwait(false);

// The peer expects the connection to be closed once the CloseConnection frame is received.
Close("The connection was closed by the peer.");
DisposeTransport("The connection was closed by the peer.");
}
catch (InvalidDataException exception)
{
Close("Invalid data was received from the peer.", exception);
DisposeTransport("Invalid data was received from the peer.", exception);
}
catch (NotSupportedException exception)
{
Close("Frame with unsupported feature was received from the peer.", exception);
DisposeTransport("Frame with unsupported feature was received from the peer.", exception);
}
catch (IceRpcException exception)
{
Close("The connection was lost.", exception);
DisposeTransport("The connection was lost.", exception);
}
catch (ObjectDisposedException exception)
{
Close("The connection was disposed.", exception);
DisposeTransport("The connection was disposed.", exception);
}
catch (Exception exception)
{
Close("The connection failed due to an unhandled exception.", exception);
DisposeTransport("The connection failed due to an unhandled exception.", exception);
Debug.Assert(false, $"The read frames task completed due to an unhandled exception: {exception}");
throw;
}
Expand All @@ -265,7 +265,7 @@ static void EncodeValidateConnectionFrame(DuplexConnectionWriter writer)

private protected override async ValueTask DisposeAsyncCore()
{
Close();
DisposeTransport();

// Wait for the read frames and ping tasks to complete.
await Task.WhenAll(_readFramesTask ?? Task.CompletedTask, _pingTask).ConfigureAwait(false);
Expand Down Expand Up @@ -676,9 +676,9 @@ private static async ValueTask<ReadOnlySequence<byte>> ReadFullPayloadAsync(
throw new ArgumentException("The payload size is greater than int.MaxValue.", nameof(payload));
}

/// <summary>Closes the protocol connection. It closes the transport connection and cancels pending dispatches and
/// invocations.</summary>
private void Close(string? message = null, Exception? exception = null)
/// <summary>Marks the protocol connection as closed, disposes the transport connection and cancels pending
/// dispatches and invocations.</summary>
private void DisposeTransport(string? message = null, Exception? exception = null)
{
// ConnectionClosedException might already be set if the connection is being shutdown or disposed. In this
// case the connection shutdown or disposal is responsible for calling the connection closed callback.
Expand Down
70 changes: 38 additions & 32 deletions src/IceRpc/Internal/IceRpcProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ await ReceiveControlFrameHeaderAsync(
}
catch (IceRpcException exception)
{
await CloseAsync("The connection was lost.", exception).ConfigureAwait(false);
await DisposeTransportAsync("The connection was lost.", exception).ConfigureAwait(false);
throw;
}
catch (Exception exception)
{
await CloseAsync("The connection failed due to an unhandled exception.", exception)
await DisposeTransportAsync("The connection failed due to an unhandled exception.", exception)
.ConfigureAwait(false);
Debug.Assert(false, $"The read go away task completed due to an unhandled exception: {exception}");
throw;
Expand Down Expand Up @@ -346,15 +346,15 @@ IceRpcError.OperationAborted or
}
catch (IceRpcException exception)
{
await CloseAsync("The connection was lost.", exception).ConfigureAwait(false);
await DisposeTransportAsync("The connection was lost.", exception).ConfigureAwait(false);
}
catch (ObjectDisposedException exception)
{
await CloseAsync("The connection was disposed.", exception).ConfigureAwait(false);
await DisposeTransportAsync("The connection was disposed.", exception).ConfigureAwait(false);
}
catch (Exception exception)
{
await CloseAsync("The connection failed due to an unhandled exception.", exception)
await DisposeTransportAsync("The connection failed due to an unhandled exception.", exception)
.ConfigureAwait(false);
Debug.Assert(false, $"The accept stream task completed due to an unhandled exception: {exception}");
throw;
Expand All @@ -367,7 +367,7 @@ await CloseAsync("The connection failed due to an unhandled exception.", excepti

private protected override async ValueTask DisposeAsyncCore()
{
await CloseAsync().ConfigureAwait(false);
await DisposeTransportAsync().ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -692,9 +692,15 @@ await _transportConnection.CloseAsync(
MultiplexedConnectionCloseError.NoError,
cancellationToken).ConfigureAwait(false);
}
catch (ObjectDisposedException exception)
catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
{
throw new IceRpcException(IceRpcError.OperationAborted, exception);
// Expected if the peer closed the connection first and the accept request loop closed the transport
// connection.
}
catch (ObjectDisposedException)
{
// Expected if the peer closed the connection first and the accept request loop closed the transport
// connection.
}

// We wait for the completion of the dispatches that we created.
Expand Down Expand Up @@ -941,30 +947,6 @@ private void CheckPeerHeaderSize(int headerSize)
}
}

/// <summary>Closes the protocol connection. It closes the transport connection and cancels pending dispatches and
/// invocations.</summary>
private async Task CloseAsync(string? message = null, Exception? exception = null)
{
// ConnectionClosedException might already be set if the connection is being shutdown or disposed. In this
// case the connection shutdown or disposal is responsible for calling the connection closed callback.
if (ConnectionClosedException is null)
{
ConnectionClosedException = new IceRpcException(IceRpcError.ConnectionClosed, message, exception);
var rpcException = exception as IceRpcException;
if (exception is not null && rpcException is null)
{
rpcException = new IceRpcException(IceRpcError.IceRpcError, exception);
}
ConnectionClosed(rpcException);
}

// Dispose the transport connection. This will trigger the failure of tasks waiting on transport operations.
await _transportConnection.DisposeAsync().ConfigureAwait(false);

// Cancel dispatches and invocations, there's no point in letting them continue once the connection is closed.
CancelDispatchesAndInvocations();
}

private async Task DispatchRequestAsync(IMultiplexedStream stream, CancellationToken cancellationToken)
{
PipeReader? fieldsPipeReader = null;
Expand Down Expand Up @@ -1124,6 +1106,30 @@ void EncodeHeader()
}
}

/// <summary>Marks the protocol connection as closed, disposes the transport connection and cancels pending
/// dispatches and invocations.</summary>
private async Task DisposeTransportAsync(string? message = null, Exception? exception = null)
{
// ConnectionClosedException might already be set if the connection is being shutdown or disposed. In this
// case the connection shutdown or disposal is responsible for calling the connection closed callback.
if (ConnectionClosedException is null)
{
ConnectionClosedException = new IceRpcException(IceRpcError.ConnectionClosed, message, exception);
var rpcException = exception as IceRpcException;
if (exception is not null && rpcException is null)
{
rpcException = new IceRpcException(IceRpcError.IceRpcError, exception);
}
ConnectionClosed(rpcException);
}

// Dispose the transport connection. This will trigger the failure of tasks waiting on transport operations.
await _transportConnection.DisposeAsync().ConfigureAwait(false);

// Cancel dispatches and invocations, there's no point in letting them continue once the connection is closed.
CancelDispatchesAndInvocations();
}

private async ValueTask ReceiveControlFrameHeaderAsync(
IceRpcControlFrameType expectedFrameType,
CancellationToken cancellationToken)
Expand Down
4 changes: 1 addition & 3 deletions src/IceRpc/Internal/ProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ public ValueTask DisposeAsync()

async Task PerformDisposeAsync()
{
ConnectionClosedException = new(
IceRpcError.ConnectionClosed,
"The connection was disposed.");
ConnectionClosedException = new(IceRpcError.ConnectionClosed, "The connection was disposed.");

// Make sure we execute the code below without holding the mutex lock.
await Task.Yield();
Expand Down