Skip to content

Commit 0df331a

Browse files
committed
Code cleanups
Running NATS as a Cluster
1 parent 074fa80 commit 0df331a

10 files changed

+111
-85
lines changed

docker-compose-nats.yml

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,63 @@
11
services:
2-
nats:
3-
image: "nats:latest"
2+
nats1:
3+
image: nats:latest
4+
command: "--cluster_name nats-cluster --cluster nats://0.0.0.0:6222 -D"
5+
networks:
6+
- backend
7+
nats2:
8+
image: nats:latest
9+
command: "--cluster_name nats-cluster --cluster nats://0.0.0.0:6222 --routes=nats://nats1:6222 -D"
10+
networks:
11+
- backend
12+
nats3:
13+
image: nats:latest
14+
command: "--cluster_name nats-cluster --cluster nats://0.0.0.0:6222 --routes=nats://nats1:6222 -D"
415
networks:
516
- backend
617
server1:
718
image: "stebet-signalr-nats-testserver"
819
environment:
920
- NATS_ENABLED=1
10-
- NATS_HOST=nats:4222
21+
- NATS_HOST=nats1:4222,nats2:4222,nats3:4222
1122
networks:
1223
- backend
24+
depends_on:
25+
- nats1
26+
- nats2
27+
- nats3
1328
server2:
1429
image: "stebet-signalr-nats-testserver"
1530
environment:
1631
- NATS_ENABLED=1
17-
- NATS_HOST=nats:4222
32+
- NATS_HOST=nats1:4222,nats2:4222,nats3:4222
1833
networks:
1934
- backend
35+
depends_on:
36+
- nats1
37+
- nats2
38+
- nats3
2039
server3:
2140
image: "stebet-signalr-nats-testserver"
2241
environment:
2342
- NATS_ENABLED=1
24-
- NATS_HOST=nats:4222
43+
- NATS_HOST=nats1:4222,nats2:4222,nats3:4222
2544
networks:
2645
- backend
46+
depends_on:
47+
- nats1
48+
- nats2
49+
- nats3
2750
yarp:
2851
image: "stebet-signalr-nats-yarp"
2952
ports:
3053
- 5000:8080
3154
networks:
3255
- backend
3356
- default
57+
depends_on:
58+
- server1
59+
- server2
60+
- server3
3461

3562
networks:
3663
backend:

perf/Stebet.SignalR.NATS.TestServer/Program.cs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,8 @@
77
if (Environment.GetEnvironmentVariable("NATS_ENABLED") == "1")
88
{
99
builder.Services
10-
.AddTransient(provider => NatsOpts.Default with
11-
{
12-
LoggerFactory = provider.GetRequiredService<ILoggerFactory>(),
13-
Url = Environment.GetEnvironmentVariable("NATS_HOST") ?? $"localhost:4222",
14-
RequestTimeout = TimeSpan.FromSeconds(30),
15-
SubPendingChannelFullMode = BoundedChannelFullMode.Wait,
16-
ConnectTimeout = TimeSpan.FromSeconds(10),
17-
Name = $"Stebet.SignalR.NATS.Tests"
18-
})
19-
.AddSingleton<INatsConnectionPool, NatsConnectionPool>()
20-
.AddTransient(provider => provider.GetRequiredService<INatsConnectionPool>().GetConnection())
2110
.AddSignalR()
22-
.AddNats();
11+
.AddNats(Environment.GetEnvironmentVariable("NATS_HOST") ?? $"localhost:4222");
2312
}
2413
else if (Environment.GetEnvironmentVariable("REDIS_ENABLED") == "1")
2514
{

perf/Stebet.SignalR.NATS.TestServer/appsettings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{
22
"Logging": {
33
"LogLevel": {
4-
"Default": "Warning"
4+
"Default": "Warning",
5+
"NATS": "Information"
56
}
67
},
78
"AllowedHosts": "*"

src/Stebet.SignalR.NATS/HubMessageExtensions.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/Stebet.SignalR.NATS/ISignalRBuilderExtensions.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,33 @@
1+
using System.Threading.Channels;
2+
13
using Microsoft.Extensions.DependencyInjection;
4+
using Microsoft.Extensions.Logging;
5+
6+
using NATS.Client.Core;
7+
28
using Stebet.SignalR.NATS;
39

410
namespace Microsoft.AspNetCore.SignalR;
511

612
public static class ISignalRBuilderExtensions
713
{
8-
public static ISignalRServerBuilder AddNats(this ISignalRServerBuilder builder, string natsSubjectPrefix = "signalr.nats")
14+
public static ISignalRServerBuilder AddNats(this ISignalRServerBuilder builder, string natsUrl, string natsSubjectPrefix = "signalr.nats")
915
{
1016
NatsSubject.Prefix = natsSubjectPrefix;
17+
builder.Services.AddKeyedSingleton<INatsConnectionPool>("Stebet.SignalR.NATS", (provider, key) =>
18+
{
19+
var pool = new NatsConnectionPool(NatsOpts.Default with
20+
{
21+
LoggerFactory = provider.GetRequiredService<ILoggerFactory>(),
22+
Url = natsUrl,
23+
RequestTimeout = TimeSpan.FromSeconds(30),
24+
SubPendingChannelFullMode = BoundedChannelFullMode.Wait,
25+
ConnectTimeout = TimeSpan.FromSeconds(10),
26+
Name = $"Stebet.SignalR.NATS.Tests"
27+
});
28+
29+
return pool;
30+
});
1131
builder.Services.AddSingleton(typeof(ClientResultsManager<>), typeof(ClientResultsManager<>));
1232
builder.Services.AddSingleton(typeof(HubLifetimeManager<>), typeof(NatsHubLifetimeManager<>));
1333
return builder;

src/Stebet.SignalR.NATS/LoggerMessages.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Microsoft.Extensions.Logging;
1+
using Microsoft.AspNetCore.SignalR.Protocol;
2+
using Microsoft.Extensions.Logging;
23

34
namespace Stebet.SignalR.NATS
45
{
@@ -37,6 +38,9 @@ internal partial class LoggerMessages
3738
[LoggerMessage(LogLevel.Debug, "Invoking {MethodName} on local connection {ConnectionId}")]
3839
internal static partial void InvokeLocalConnection(ILogger logger, string methodName, string connectionId);
3940

41+
[LoggerMessage(LogLevel.Debug, "Invoking local connection {ConnectionId}")]
42+
internal static partial void InvokeLocalConnection(ILogger logger, string connectionId);
43+
4044
[LoggerMessage(LogLevel.Debug, "Invoking {MethodName} on remote connection {ConnectionId}")]
4145
internal static partial void InvokeRemoteConnection(ILogger logger, string methodName, string connectionId);
4246

@@ -46,13 +50,25 @@ internal partial class LoggerMessages
4650
[LoggerMessage(LogLevel.Debug, "Setting result for remote connection {ConnectionId}")]
4751
internal static partial void SetResultForRemoteConnection(ILogger logger, string connectionId);
4852

49-
[LoggerMessage(LogLevel.Warning, "Can't deserialize message with protocol {ProtocolName}")]
50-
internal static partial void CantDeserializeMessage(ILogger logger, string protocolName, Exception ex);
53+
[LoggerMessage(LogLevel.Warning, "Can't deserialize message with protocols {Protocols}")]
54+
internal static partial void CantDeserializeMessage(ILogger logger, IReadOnlyList<IHubProtocol> protocols);
5155

5256
[LoggerMessage(LogLevel.Debug, "Handling Invoke Result from {ConnectionId}")]
5357
internal static partial void HandleInvokeResult(ILogger logger, string connectionId);
5458

5559
[LoggerMessage(LogLevel.Debug, "Successfully deserialized CompletionMessage {InvocationId} with result {Result}")]
5660
internal static partial void SuccessfullyDeserializedCompletionMessage(ILogger logger, string invocationId, object? result);
61+
62+
[LoggerMessage(LogLevel.Debug, "Received message {Subject}")]
63+
internal static partial void ReceivedMessage(ILogger logger, string subject);
64+
65+
[LoggerMessage(LogLevel.Error, "Error processing message {Subject}")]
66+
internal static partial void ErrorProcessingMessage(ILogger logger, string subject, Exception e);
67+
[LoggerMessage(LogLevel.Debug, "Sending message on connection {ConnectionId}")]
68+
internal static partial void SendConnection(ILogger logger, string connectionId);
69+
[LoggerMessage(LogLevel.Debug, "Sending message for group {GroupName} on connection {ConnectionId}")]
70+
internal static partial void SendGroup(ILogger logger, string groupName, string connectionId);
71+
[LoggerMessage(LogLevel.Debug, "Sending invocation result to {ReplyTo}")]
72+
internal static partial void SendInvocationResult(ILogger logger, string replyTo);
5773
}
5874
}

src/Stebet.SignalR.NATS/NatsHubConnectionHandler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ await Parallel.ForEachAsync(channel.ReadAllAsync(CancellationToken.None),
8383
{
8484
try
8585
{
86-
logger.LogDebug("Writing invocation on connection {ConnectionId}", connection.ConnectionId);
86+
LoggerMessages.InvokeLocalConnection(logger, connection.ConnectionId);
8787
await SendInvocationMessage(message).ConfigureAwait(false);
8888
}
8989
catch (Exception e)
9090
{
91-
logger.LogError(e, "Error processing message");
91+
LoggerMessages.ErrorProcessingMessage(logger, message.Subject, e);
9292
}
9393

9494
}).ConfigureAwait(false);
@@ -101,12 +101,12 @@ await Parallel.ForEachAsync(channel.ReadAllAsync(CancellationToken.None),
101101
{
102102
try
103103
{
104-
logger.LogDebug("Sending message on connection {ConnectionId}", connection.ConnectionId);
104+
LoggerMessages.SendConnection(logger, connection.ConnectionId);
105105
await SendHubMessage(message).ConfigureAwait(false);
106106
}
107107
catch (Exception e)
108108
{
109-
logger.LogError(e, "Error processing message");
109+
LoggerMessages.ErrorProcessingMessage(logger, message.Subject, e);
110110
}
111111
}).ConfigureAwait(false);
112112
}
@@ -133,7 +133,7 @@ await Parallel.ForEachAsync(channel.ReadAllAsync(CancellationToken.None),
133133
}
134134
catch (Exception e)
135135
{
136-
logger.LogError(e, "Error processing message");
136+
LoggerMessages.ErrorProcessingMessage(logger, message.Subject, e);
137137
}
138138
}).ConfigureAwait(false);
139139
}
@@ -149,13 +149,13 @@ await Parallel.ForEachAsync(channel.ReadAllAsync(CancellationToken.None),
149149
(string groupName, SortedSet<string> excludedConnections, SerializedHubMessage serializedHubMessage) = buffer.ReadSerializedHubMessageWithExcludedConnectionIdsAndGroupName();
150150
if (connection.IsInGroup(groupName) && !excludedConnections.Contains(connection.ConnectionId))
151151
{
152-
logger.LogDebug("Sending message for group {GroupName} on connection {ConnectionId}", groupName, connection.ConnectionId);
152+
LoggerMessages.SendGroup(logger, groupName, connection.ConnectionId);
153153
await connection.WriteAsync(serializedHubMessage, CancellationToken.None).ConfigureAwait(false);
154154
}
155155
}
156156
catch (Exception e)
157157
{
158-
logger.LogError(e, "Error processing message");
158+
LoggerMessages.ErrorProcessingMessage(logger, message.Subject, e);
159159
}
160160
}).ConfigureAwait(false);
161161
}
@@ -177,7 +177,7 @@ private async Task SendInvocationMessage(NatsMsg<NatsMemoryOwner<byte>> message)
177177
(string invocationId, SerializedHubMessage serializedHubMessage) = buffer.ReadSerializedHubMessageWithInvocationId();
178178
resultsManager.AddInvocation(invocationId, (typeof(RawResult), connection.ConnectionId, null!, async (_, completionMessage) =>
179179
{
180-
logger.LogDebug("Sending invocation result to {ReplyTo}", message.ReplyTo);
180+
LoggerMessages.SendInvocationResult(logger, message.ReplyTo!);
181181
var buffer = new NatsBufferWriter<byte>();
182182
buffer.WriteMessageWithConnectionId(completionMessage, [connection.Protocol], connection.ConnectionId);
183183
await message.ReplyAsync(buffer).ConfigureAwait(false);

src/Stebet.SignalR.NATS/NatsHubLifetimeManager.Subscription.cs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ internal partial class NatsHubLifetimeManager<THub> : HubLifetimeManager<THub> w
99
{
1010
private async Task StartSubscriptions()
1111
{
12-
INatsSub<NatsMemoryOwner<byte>> sub = await natsConnection.SubscribeCoreAsync<NatsMemoryOwner<byte>>($"{NatsSubject.GlobalSubjectPrefix}.>").ConfigureAwait(false);
13-
await natsConnection.PingAsync().ConfigureAwait(false);
12+
INatsSub<NatsMemoryOwner<byte>> sub = await _natsConnection.SubscribeCoreAsync<NatsMemoryOwner<byte>>($"{NatsSubject.GlobalSubjectPrefix}.>").ConfigureAwait(false);
13+
await _natsConnection.PingAsync().ConfigureAwait(false);
1414
await Parallel.ForEachAsync(sub.Msgs.ReadAllAsync(), Helpers.DefaultParallelOptions,
1515
async (message, token) =>
1616
{
1717
using NatsMemoryOwner<byte> messageDataOwner = message.Data;
1818
try
1919
{
20-
logger.LogDebug("Received message {Subject}", message.Subject);
20+
LoggerMessages.ReceivedMessage(logger, message.Subject);
2121
if (message.Subject == NatsSubject.ConnectionDisconnectedSubject)
2222
{
2323
HandleConnectionDisconnected(messageDataOwner);
@@ -33,36 +33,33 @@ await Parallel.ForEachAsync(sub.Msgs.ReadAllAsync(), Helpers.DefaultParallelOpti
3333
(SortedSet<string> excludedConnections, SerializedHubMessage serializedHubMessage) = messageDataOwner.ReadSerializedHubMessageWithExcludedConnectionIds();
3434
foreach (var connection in _connections)
3535
{
36-
if (!excludedConnections.Contains(connection.ConnectionId))
36+
if (excludedConnections.Count > 0 && excludedConnections.Contains(connection.ConnectionId))
3737
{
38-
tasks.Add(connection.WriteAsync(serializedHubMessage, CancellationToken.None).AsTask());
38+
continue;
3939
}
40+
41+
tasks.Add(connection.WriteAsync(serializedHubMessage, CancellationToken.None).AsTask());
4042
}
4143

4244
await Task.WhenAll(tasks).ConfigureAwait(false);
4345
}
4446
}
4547
catch (Exception e)
4648
{
47-
logger.LogError(e, "Error processing message {Subject}", message.Subject);
49+
LoggerMessages.ErrorProcessingMessage(logger, message.Subject, e);
4850
}
4951
}).ConfigureAwait(false);
5052
}
5153

5254
private void HandleInvokeResult(NatsMemoryOwner<byte> memory)
5355
{
5456
(string connectionId, SerializedHubMessage serializedHubMessage) = memory.ReadSerializedHubMessageWithConnectionId();
55-
logger.LogDebug("Handling Invoke Result from {ConnectionId}", connectionId);
57+
LoggerMessages.HandleInvokeResult(logger, connectionId);
5658
if (TryDeserializeMessage(serializedHubMessage, out HubMessage? hubMessage) && hubMessage is CompletionMessage completionMessage)
5759
{
58-
logger.LogDebug("Successfully deserialized CompletionMessage {InvocationId} with result {Result}", completionMessage.InvocationId, completionMessage.Result);
59-
60+
LoggerMessages.SuccessfullyDeserializedCompletionMessage(logger, completionMessage.InvocationId!, completionMessage.Result);
6061
clientResultsManager.TryCompleteResult(connectionId, completionMessage);
6162
}
62-
else
63-
{
64-
logger.LogWarning("Failed to deserialize completion message");
65-
}
6663
}
6764

6865
private void HandleConnectionDisconnected(NatsMemoryOwner<byte> memory)

0 commit comments

Comments
 (0)