@@ -67,7 +67,7 @@ internal sealed class SocketFrameHandler : IFrameHandler
67
67
private readonly ITcpClient _socket ;
68
68
69
69
// Channels
70
- private readonly Channel < ReadOnlyMemory < byte > > _outboundChannel = Channel . CreateUnbounded < ReadOnlyMemory < byte > > ( new UnboundedChannelOptions { AllowSynchronousContinuations = false , SingleReader = true , SingleWriter = false } ) ;
70
+ private readonly Channel < ReadOnlyMemory < byte > > _outboundChannel = Channel . CreateUnbounded < ReadOnlyMemory < byte > > ( new UnboundedChannelOptions { AllowSynchronousContinuations = true , SingleReader = true , SingleWriter = false } ) ;
71
71
private ChannelWriter < ReadOnlyMemory < byte > > OutboundChannelWriter => _outboundChannel . Writer ;
72
72
private ChannelReader < ReadOnlyMemory < byte > > OutboundChannelReader => _outboundChannel . Reader ;
73
73
@@ -78,13 +78,10 @@ internal sealed class SocketFrameHandler : IFrameHandler
78
78
private PipeWriter PipeWriter => _pipe . Output ;
79
79
80
80
// Pipe tasks
81
- // private Task _writerTask;
81
+ private readonly Task _writerTask ;
82
82
83
83
private readonly object _semaphore = new object ( ) ;
84
84
private bool _closed ;
85
- private readonly object _writeLock = new object ( ) ;
86
-
87
- private readonly Task _writerTask ;
88
85
89
86
public SocketFrameHandler ( AmqpTcpEndpoint endpoint , Func < AddressFamily , ITcpClient > socketFactory , TimeSpan connectionTimeout , TimeSpan readTimeout , TimeSpan writeTimeout )
90
87
{
@@ -125,7 +122,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func<AddressFamily, ITcpClie
125
122
}
126
123
127
124
_socket . ReceiveTimeout = readTimeout ;
128
- var pipeOptions = new PipeOptions ( useSynchronizationContext : false ) ;
125
+ var pipeOptions = new PipeOptions ( useSynchronizationContext : true ) ;
129
126
130
127
if ( endpoint . Ssl . Enabled )
131
128
{
@@ -146,7 +143,6 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func<AddressFamily, ITcpClie
146
143
147
144
WriteTimeout = writeTimeout ;
148
145
_writerTask = Task . Run ( WriteLoop ) ;
149
- //_readerTask = Task.Run(ReadLoop);
150
146
}
151
147
public AmqpTcpEndpoint Endpoint { get ; set ; }
152
148
0 commit comments