Skip to content

Commit bf89fe7

Browse files
author
Stefán Jökull Sigurðarson
committed
Adding pipelines
1 parent 9ccf87a commit bf89fe7

14 files changed

+319
-321
lines changed

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<PackageReference Include="MinVer" Version="3.1.0" PrivateAssets="All" />
6464
<PackageReference Include="System.Memory" Version="4.5.4" />
6565
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
66+
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.2.2" />
6667
</ItemGroup>
6768

6869
</Project>

projects/RabbitMQ.Client/client/api/ConnectionFactoryBase.cs

+5-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131

3232
using System;
3333
using System.Net.Sockets;
34+
35+
using Pipelines.Sockets.Unofficial;
36+
3437
using RabbitMQ.Client.Impl;
3538

3639
namespace RabbitMQ.Client
@@ -49,12 +52,8 @@ public class ConnectionFactoryBase
4952
/// <returns>New instance of a <see cref="TcpClient"/>.</returns>
5053
public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
5154
{
52-
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
53-
{
54-
NoDelay = true,
55-
ReceiveBufferSize = 65536,
56-
SendBufferSize = 65536
57-
};
55+
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
56+
SocketConnection.SetRecommendedClientOptions(socket);
5857
return new TcpClientAdapter(socket);
5958
}
6059
}

projects/RabbitMQ.Client/client/framing/Model.cs

+109-107
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System.Collections.Generic;
33+
3334
using RabbitMQ.Client.client.framing;
34-
using RabbitMQ.Client.client.impl;
3535
using RabbitMQ.Client.Impl;
3636

3737
namespace RabbitMQ.Client.Framing.Impl
@@ -297,113 +297,115 @@ public override void TxSelect()
297297

298298
protected override bool DispatchAsynchronous(in IncomingCommand cmd)
299299
{
300-
switch (cmd.CommandId)
300+
int classType = (int)cmd.CommandId >> 16;
301+
int methodType = (int)cmd.CommandId & 0x00FF;
302+
return classType switch
303+
{
304+
ClassConstants.Basic => DispatchBasicCommand(in cmd, methodType),
305+
ClassConstants.Channel => DispatchChannelCommand(in cmd, methodType),
306+
ClassConstants.Connection => DispatchConnectionCommand(in cmd, methodType),
307+
ClassConstants.Queue => DispatchQueueCommand(in cmd, methodType),
308+
_ => false,
309+
};
310+
}
311+
312+
private bool DispatchQueueCommand(in IncomingCommand cmd, int methodType)
313+
{
314+
switch (methodType)
315+
{
316+
case QueueMethodConstants.DeclareOk:
317+
HandleQueueDeclareOk(in cmd);
318+
return true;
319+
default:
320+
return false;
321+
}
322+
}
323+
324+
private bool DispatchConnectionCommand(in IncomingCommand cmd, int methodType)
325+
{
326+
switch (methodType)
327+
{
328+
case ConnectionMethodConstants.Start:
329+
HandleConnectionStart(in cmd);
330+
return true;
331+
case ConnectionMethodConstants.Secure:
332+
HandleConnectionSecure(in cmd);
333+
return true;
334+
case ConnectionMethodConstants.Tune:
335+
HandleConnectionTune(in cmd);
336+
return true;
337+
case ConnectionMethodConstants.Close:
338+
HandleConnectionClose(in cmd);
339+
return true;
340+
case ConnectionMethodConstants.Blocked:
341+
HandleConnectionBlocked(in cmd);
342+
return true;
343+
case ConnectionMethodConstants.Unblocked:
344+
cmd.ReturnMethodBuffer();
345+
HandleConnectionUnblocked();
346+
return true;
347+
default:
348+
return false;
349+
}
350+
}
351+
352+
private bool DispatchChannelCommand(in IncomingCommand cmd, int methodType)
353+
{
354+
switch (methodType)
355+
{
356+
case ChannelMethodConstants.Flow:
357+
HandleChannelFlow(in cmd);
358+
return true;
359+
case ChannelMethodConstants.Close:
360+
HandleChannelClose(in cmd);
361+
return true;
362+
case ChannelMethodConstants.CloseOk:
363+
cmd.ReturnMethodBuffer();
364+
HandleChannelCloseOk();
365+
return true;
366+
default:
367+
return false;
368+
}
369+
}
370+
371+
private bool DispatchBasicCommand(in IncomingCommand cmd, int methodType)
372+
{
373+
switch (methodType)
301374
{
302-
case ProtocolCommandId.BasicDeliver:
303-
{
304-
HandleBasicDeliver(in cmd);
305-
return true;
306-
}
307-
case ProtocolCommandId.BasicAck:
308-
{
309-
HandleBasicAck(in cmd);
310-
return true;
311-
}
312-
case ProtocolCommandId.BasicCancel:
313-
{
314-
HandleBasicCancel(in cmd);
315-
return true;
316-
}
317-
case ProtocolCommandId.BasicCancelOk:
318-
{
319-
HandleBasicCancelOk(in cmd);
320-
return true;
321-
}
322-
case ProtocolCommandId.BasicConsumeOk:
323-
{
324-
HandleBasicConsumeOk(in cmd);
325-
return true;
326-
}
327-
case ProtocolCommandId.BasicGetEmpty:
328-
{
329-
cmd.ReturnMethodBuffer();
330-
HandleBasicGetEmpty();
331-
return true;
332-
}
333-
case ProtocolCommandId.BasicGetOk:
334-
{
335-
HandleBasicGetOk(in cmd);
336-
return true;
337-
}
338-
case ProtocolCommandId.BasicNack:
339-
{
340-
HandleBasicNack(in cmd);
341-
return true;
342-
}
343-
case ProtocolCommandId.BasicRecoverOk:
344-
{
345-
cmd.ReturnMethodBuffer();
346-
HandleBasicRecoverOk();
347-
return true;
348-
}
349-
case ProtocolCommandId.BasicReturn:
350-
{
351-
HandleBasicReturn(in cmd);
352-
return true;
353-
}
354-
case ProtocolCommandId.ChannelClose:
355-
{
356-
HandleChannelClose(in cmd);
357-
return true;
358-
}
359-
case ProtocolCommandId.ChannelCloseOk:
360-
{
361-
cmd.ReturnMethodBuffer();
362-
HandleChannelCloseOk();
363-
return true;
364-
}
365-
case ProtocolCommandId.ChannelFlow:
366-
{
367-
HandleChannelFlow(in cmd);
368-
return true;
369-
}
370-
case ProtocolCommandId.ConnectionBlocked:
371-
{
372-
HandleConnectionBlocked(in cmd);
373-
return true;
374-
}
375-
case ProtocolCommandId.ConnectionClose:
376-
{
377-
HandleConnectionClose(in cmd);
378-
return true;
379-
}
380-
case ProtocolCommandId.ConnectionSecure:
381-
{
382-
HandleConnectionSecure(in cmd);
383-
return true;
384-
}
385-
case ProtocolCommandId.ConnectionStart:
386-
{
387-
HandleConnectionStart(in cmd);
388-
return true;
389-
}
390-
case ProtocolCommandId.ConnectionTune:
391-
{
392-
HandleConnectionTune(in cmd);
393-
return true;
394-
}
395-
case ProtocolCommandId.ConnectionUnblocked:
396-
{
397-
cmd.ReturnMethodBuffer();
398-
HandleConnectionUnblocked();
399-
return true;
400-
}
401-
case ProtocolCommandId.QueueDeclareOk:
402-
{
403-
HandleQueueDeclareOk(in cmd);
404-
return true;
405-
}
406-
default: return false;
375+
case BasicMethodConstants.ConsumeOk:
376+
HandleBasicConsumeOk(in cmd);
377+
return true;
378+
case BasicMethodConstants.Cancel:
379+
HandleBasicCancel(in cmd);
380+
return true;
381+
case BasicMethodConstants.CancelOk:
382+
HandleBasicCancelOk(in cmd);
383+
return true;
384+
case BasicMethodConstants.Return:
385+
HandleBasicReturn(in cmd);
386+
return true;
387+
case BasicMethodConstants.Deliver:
388+
HandleBasicDeliver(in cmd);
389+
return true;
390+
case BasicMethodConstants.GetOk:
391+
HandleBasicGetOk(in cmd);
392+
return true;
393+
case BasicMethodConstants.GetEmpty:
394+
cmd.ReturnMethodBuffer();
395+
HandleBasicGetEmpty();
396+
return true;
397+
case BasicMethodConstants.Ack:
398+
HandleBasicAck(in cmd);
399+
return true;
400+
case BasicMethodConstants.RecoverOk:
401+
cmd.ReturnMethodBuffer();
402+
HandleBasicRecoverOk();
403+
return true;
404+
case BasicMethodConstants.Nack:
405+
HandleBasicNack(in cmd);
406+
return true;
407+
default:
408+
return false;
407409
}
408410
}
409411
}

projects/RabbitMQ.Client/client/impl/Connection.Commands.cs

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
using System;
3333
using System.IO;
3434
using System.Text;
35+
3536
using RabbitMQ.Client.Events;
3637
using RabbitMQ.Client.Exceptions;
3738
using RabbitMQ.Client.Impl;

0 commit comments

Comments
 (0)