Skip to content

Commit 48ec4cc

Browse files
author
Stefán J. Sigurðarson
committed
Adding pipelines.
1 parent 599020b commit 48ec4cc

16 files changed

+594
-288
lines changed

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@
6060
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="3.3.0" PrivateAssets="All" />
6161
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
6262
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
63-
<PackageReference Include="MinVer" Version="2.3.0" PrivateAssets="All" />
64-
<PackageReference Include="System.Memory" Version="4.5.4" />
63+
<PackageReference Include="MinVer" Version="2.3.1" PrivateAssets="All" />
64+
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.1.16" />
6565
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
6666
</ItemGroup>
6767

projects/RabbitMQ.Client/client/api/ConnectionFactoryBase.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131

3232
using System;
3333
using System.Net.Sockets;
34+
35+
using Pipelines.Sockets.Unofficial;
36+
3437
using RabbitMQ.Client.Impl;
3538

3639
namespace RabbitMQ.Client
@@ -49,12 +52,8 @@ public class ConnectionFactoryBase
4952
/// <returns>New instance of a <see cref="TcpClient"/>.</returns>
5053
public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
5154
{
52-
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
53-
{
54-
NoDelay = true,
55-
ReceiveBufferSize = 65536,
56-
SendBufferSize = 65536
57-
};
55+
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
56+
SocketConnection.SetRecommendedClientOptions(socket);
5857
return new TcpClientAdapter(socket);
5958
}
6059
}

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 105 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
using System.Buffers;
3434
using System.Collections.Generic;
3535
using System.IO;
36+
using System.IO.Pipelines;
3637
using System.Net;
3738
using System.Net.Sockets;
3839
using System.Reflection;
@@ -153,31 +154,35 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
153154
/// <summary>
154155
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
155156
/// </summary>
156-
public event EventHandler<EventArgs> RecoverySucceeded {
157+
public event EventHandler<EventArgs> RecoverySucceeded
158+
{
157159
add { }
158160
remove { }
159161
}
160162

161163
/// <summary>
162164
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
163165
/// </summary>
164-
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError {
166+
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
167+
{
165168
add { }
166169
remove { }
167170
}
168171

169172
/// <summary>
170173
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
171174
/// </summary>
172-
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery {
175+
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
176+
{
173177
add { }
174178
remove { }
175179
}
176180

177181
/// <summary>
178182
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
179183
/// </summary>
180-
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery {
184+
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery
185+
{
181186
add { }
182187
remove { }
183188
}
@@ -332,15 +337,35 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
332337
///<remarks>
333338
/// Loop only used while quiescing. Use only to cleanly close connection
334339
///</remarks>
335-
public void ClosingLoop()
340+
public async ValueTask ClosingLoop()
336341
{
337342
try
338343
{
339344
_frameHandler.ReadTimeout = TimeSpan.Zero;
340345
// Wait for response/socket closure or timeout
346+
bool allowSync = true;
341347
while (!_closed)
342348
{
343-
MainLoopIteration();
349+
// Let's read some bytes
350+
if (!(allowSync && _frameHandler.PipeReader.TryRead(out ReadResult readResult)))
351+
{
352+
readResult = await _frameHandler.PipeReader.ReadAsync().ConfigureAwait(false);
353+
}
354+
355+
int handled = 0;
356+
ReadOnlySequence<byte> buffer = readResult.Buffer;
357+
if (!buffer.IsEmpty)
358+
{
359+
handled = MainLoopIteration(ref buffer);
360+
}
361+
362+
allowSync = handled != 0;
363+
_frameHandler.PipeReader.AdvanceTo(buffer.Start, buffer.End);
364+
365+
if (handled == 0 && readResult.IsCompleted)
366+
{
367+
throw new EndOfStreamException();
368+
}
344369
}
345370
}
346371
catch (ObjectDisposedException ode)
@@ -472,18 +497,38 @@ public void LogCloseError(string error, Exception ex)
472497
ShutdownReport.Add(new ShutdownReportEntry(error, ex));
473498
}
474499

475-
public void MainLoop()
500+
public async Task MainLoop()
476501
{
477502
try
478503
{
479504
bool shutdownCleanly = false;
480505
try
481506
{
507+
bool allowSync = true;
482508
while (_running)
483509
{
484510
try
485511
{
486-
MainLoopIteration();
512+
// Let's read some bytes
513+
if (!(allowSync && _frameHandler.PipeReader.TryRead(out ReadResult readResult)))
514+
{
515+
readResult = await _frameHandler.PipeReader.ReadAsync().ConfigureAwait(false);
516+
}
517+
518+
int handled = 0;
519+
ReadOnlySequence<byte> buffer = readResult.Buffer;
520+
if (!buffer.IsEmpty)
521+
{
522+
handled = MainLoopIteration(ref buffer);
523+
}
524+
525+
allowSync = handled != 0;
526+
_frameHandler.PipeReader.AdvanceTo(buffer.Start, buffer.End);
527+
528+
if (handled == 0 && readResult.IsCompleted)
529+
{
530+
throw new EndOfStreamException();
531+
}
487532
}
488533
catch (SoftProtocolException spe)
489534
{
@@ -520,7 +565,7 @@ public void MainLoop()
520565
#pragma warning disable 0168
521566
try
522567
{
523-
ClosingLoop();
568+
await ClosingLoop().ConfigureAwait(false);
524569
}
525570
catch (SocketException)
526571
{
@@ -539,52 +584,59 @@ public void MainLoop()
539584
}
540585
}
541586

542-
public void MainLoopIteration()
587+
public int MainLoopIteration(ref ReadOnlySequence<byte> buffer)
543588
{
544-
InboundFrame frame = _frameHandler.ReadFrame();
545-
NotifyHeartbeatListener();
546-
547-
bool shallReturn = true;
548-
// We have received an actual frame.
549-
if (frame.Type == FrameType.FrameHeartbeat)
589+
int handled = 0;
590+
while (InboundFrame.TryReadFrame(ref buffer, out InboundFrame frame))
550591
{
551-
// Ignore it: we've already just reset the heartbeat
552-
}
553-
else if (frame.Channel == 0)
554-
{
555-
// In theory, we could get non-connection.close-ok
556-
// frames here while we're quiescing (m_closeReason !=
557-
// null). In practice, there's a limited number of
558-
// things the server can ask of us on channel 0 -
559-
// essentially, just connection.close. That, combined
560-
// with the restrictions on pipelining, mean that
561-
// we're OK here to handle channel 0 traffic in a
562-
// quiescing situation, even though technically we
563-
// should be ignoring everything except
564-
// connection.close-ok.
565-
shallReturn = _session0.HandleFrame(in frame);
566-
}
567-
else
568-
{
569-
// If we're still m_running, but have a m_closeReason,
570-
// then we must be quiescing, which means any inbound
571-
// frames for non-zero channels (and any inbound
572-
// commands on channel zero that aren't
573-
// Connection.CloseOk) must be discarded.
574-
if (_closeReason is null)
592+
NotifyHeartbeatListener();
593+
594+
bool shallReturn = true;
595+
// We have received an actual frame.
596+
if (frame.Type == FrameType.FrameHeartbeat)
575597
{
576-
// No close reason, not quiescing the
577-
// connection. Handle the frame. (Of course, the
578-
// Session itself may be quiescing this particular
579-
// channel, but that's none of our concern.)
580-
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
598+
// Ignore it: we've already just reset the heartbeat
599+
}
600+
else if (frame.Channel == 0)
601+
{
602+
// In theory, we could get non-connection.close-ok
603+
// frames here while we're quiescing (m_closeReason !=
604+
// null). In practice, there's a limited number of
605+
// things the server can ask of us on channel 0 -
606+
// essentially, just connection.close. That, combined
607+
// with the restrictions on pipelining, mean that
608+
// we're OK here to handle channel 0 traffic in a
609+
// quiescing situation, even though technically we
610+
// should be ignoring everything except
611+
// connection.close-ok.
612+
shallReturn = _session0.HandleFrame(in frame);
613+
}
614+
else
615+
{
616+
// If we're still m_running, but have a m_closeReason,
617+
// then we must be quiescing, which means any inbound
618+
// frames for non-zero channels (and any inbound
619+
// commands on channel zero that aren't
620+
// Connection.CloseOk) must be discarded.
621+
if (_closeReason is null)
622+
{
623+
// No close reason, not quiescing the
624+
// connection. Handle the frame. (Of course, the
625+
// Session itself may be quiescing this particular
626+
// channel, but that's none of our concern.)
627+
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
628+
}
581629
}
582-
}
583630

584-
if (shallReturn)
585-
{
586-
frame.ReturnPayload();
631+
if (shallReturn)
632+
{
633+
frame.ReturnPayload();
634+
}
635+
636+
handled++;
587637
}
638+
639+
return handled;
588640
}
589641

590642
private void NotifyHeartbeatListener()
@@ -796,7 +848,7 @@ public void MaybeStartHeartbeatTimers()
796848

797849
public void StartMainLoop()
798850
{
799-
_mainLoopTask = Task.Run((Action)MainLoop);
851+
_mainLoopTask = Task.Run(MainLoop);
800852
}
801853

802854
public void HeartbeatReadTimerCallback(object state)
@@ -871,7 +923,7 @@ public void HeartbeatWriteTimerCallback(object state)
871923
{
872924
if (!_closed)
873925
{
874-
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame());
926+
Write(OutgoingFrame.CreateHeartbeat());
875927
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
876928
}
877929
}
@@ -908,9 +960,9 @@ public override string ToString()
908960
return string.Format("Connection({0},{1})", _id, Endpoint);
909961
}
910962

911-
public void Write(ReadOnlyMemory<byte> memory)
963+
public void Write(OutgoingFrame outgoingFrame)
912964
{
913-
_frameHandler.Write(memory);
965+
_frameHandler.Write(outgoingFrame);
914966
}
915967

916968
public void UpdateSecret(string newSecret, string reason)

0 commit comments

Comments
 (0)