1
1
using Microsoft . AspNetCore . SignalR ;
2
2
using Microsoft . AspNetCore . SignalR . Protocol ;
3
3
using Microsoft . Extensions . Logging ;
4
- using Orleans ;
5
4
using Orleans . Concurrency ;
6
5
using Orleans . Streams ;
7
6
using SignalR . Orleans . Clients ;
8
7
using SignalR . Orleans . Core ;
9
8
10
9
namespace SignalR . Orleans ;
11
10
12
- public class OrleansHubLifetimeManager < THub > : HubLifetimeManager < THub > , IDisposable where THub : Hub
11
+ public class OrleansHubLifetimeManager < THub > : HubLifetimeManager < THub > , IDisposable
12
+ where THub : Hub
13
13
{
14
14
private Timer _timer ;
15
15
private readonly HubConnectionStore _connections = new HubConnectionStore ( ) ;
16
16
private readonly ILogger _logger ;
17
- private readonly IClusterClientProvider _clusterClientProvider ;
17
+ private readonly IClusterClient _clusterClient ;
18
18
private readonly Guid _serverId ;
19
19
private IStreamProvider _streamProvider ;
20
20
private IAsyncStream < AllMessage > _allStream ;
@@ -24,22 +24,22 @@ public class OrleansHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDispos
24
24
25
25
public OrleansHubLifetimeManager (
26
26
ILogger < OrleansHubLifetimeManager < THub > > logger ,
27
- IClusterClientProvider clusterClientProvider
27
+ IClusterClient clusterClient
28
28
)
29
29
{
30
- var hubType = typeof ( THub ) . BaseType . GenericTypeArguments . FirstOrDefault ( ) ?? typeof ( THub ) ;
30
+ var hubType = typeof ( THub ) . BaseType ? . GenericTypeArguments . FirstOrDefault ( ) ?? typeof ( THub ) ;
31
31
_hubName = hubType . IsInterface && hubType . Name . StartsWith ( "I" )
32
32
? hubType . Name . Substring ( 1 )
33
33
: hubType . Name ;
34
34
_serverId = Guid . NewGuid ( ) ; // todo: include machine name
35
35
_logger = logger ;
36
- _clusterClientProvider = clusterClientProvider ;
36
+ _clusterClient = clusterClient ;
37
37
_ = EnsureStreamSetup ( ) ;
38
38
}
39
39
40
40
private Task HeartbeatCheck ( )
41
41
{
42
- var client = _clusterClientProvider . GetClient ( ) . GetServerDirectoryGrain ( ) ;
42
+ var client = _clusterClient . GetServerDirectoryGrain ( ) ;
43
43
return client . Heartbeat ( _serverId ) ;
44
44
}
45
45
@@ -66,10 +66,10 @@ private async Task SetupStreams()
66
66
{
67
67
_logger . LogInformation ( "Initializing: Orleans HubLifetimeManager {hubName} (serverId: {serverId})..." , _hubName , _serverId ) ;
68
68
69
- _streamProvider = _clusterClientProvider . GetClient ( ) . GetStreamProvider ( Constants . STREAM_PROVIDER ) ;
69
+ _streamProvider = _clusterClient . GetStreamProvider ( Constants . STREAM_PROVIDER ) ;
70
70
_serverStreamsReplicaContainer = new StreamReplicaContainer < ClientMessage > ( _streamProvider , _serverId , Constants . SERVERS_STREAM , Constants . STREAM_SEND_REPLICAS ) ;
71
71
72
- _allStream = _streamProvider . GetStream < AllMessage > ( Constants . ALL_STREAM_ID , Utils . BuildStreamHubName ( _hubName ) ) ;
72
+ _allStream = _streamProvider . GetStream < AllMessage > ( Utils . BuildStreamHubName ( _hubName ) , Constants . ALL_STREAM_ID ) ;
73
73
_timer = new Timer ( _ => Task . Run ( HeartbeatCheck ) , null , TimeSpan . FromSeconds ( 0 ) , TimeSpan . FromMinutes ( Constants . HEARTBEAT_PULSE_IN_MINUTES ) ) ;
74
74
75
75
var subscribeTasks = new List < Task >
@@ -120,15 +120,15 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)
120
120
{
121
121
_connections . Add ( connection ) ;
122
122
123
- var client = _clusterClientProvider . GetClient ( ) . GetClientGrain ( _hubName , connection . ConnectionId ) ;
123
+ var client = _clusterClient . GetClientGrain ( _hubName , connection . ConnectionId ) ;
124
124
await client . OnConnect ( _serverId ) ;
125
125
126
126
_logger . LogInformation ( "Connected {connectionId} on hub {hubName} with userId {userId} (serverId: {serverId})" ,
127
127
connection . ConnectionId , _hubName , connection . UserIdentifier , _serverId ) ;
128
128
129
129
if ( connection . User . Identity . IsAuthenticated )
130
130
{
131
- var user = _clusterClientProvider . GetClient ( ) . GetUserGrain ( _hubName , connection . UserIdentifier ) ;
131
+ var user = _clusterClient . GetUserGrain ( _hubName , connection . UserIdentifier ) ;
132
132
await user . Add ( connection . ConnectionId ) ;
133
133
}
134
134
}
@@ -147,7 +147,7 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection)
147
147
{
148
148
_logger . LogInformation ( "Disconnection {connectionId} on hub {hubName} with userId {userId} (serverId: {serverId})" ,
149
149
connection . ConnectionId , _hubName , connection . UserIdentifier , _serverId ) ;
150
- var client = _clusterClientProvider . GetClient ( ) . GetClientGrain ( _hubName , connection . ConnectionId ) ;
150
+ var client = _clusterClient . GetClientGrain ( _hubName , connection . ConnectionId ) ;
151
151
await client . OnDisconnect ( ClientDisconnectReasons . HubDisconnect ) ;
152
152
}
153
153
finally
@@ -196,7 +196,7 @@ public override Task SendGroupAsync(string groupName, string methodName, object[
196
196
if ( string . IsNullOrWhiteSpace ( groupName ) ) throw new ArgumentNullException ( nameof ( groupName ) ) ;
197
197
if ( string . IsNullOrWhiteSpace ( methodName ) ) throw new ArgumentNullException ( nameof ( methodName ) ) ;
198
198
199
- var group = _clusterClientProvider . GetClient ( ) . GetGroupGrain ( _hubName , groupName ) ;
199
+ var group = _clusterClient . GetGroupGrain ( _hubName , groupName ) ;
200
200
return group . Send ( methodName , args ) ;
201
201
}
202
202
@@ -213,7 +213,7 @@ public override Task SendGroupExceptAsync(string groupName, string methodName, o
213
213
if ( string . IsNullOrWhiteSpace ( groupName ) ) throw new ArgumentNullException ( nameof ( groupName ) ) ;
214
214
if ( string . IsNullOrWhiteSpace ( methodName ) ) throw new ArgumentNullException ( nameof ( methodName ) ) ;
215
215
216
- var group = _clusterClientProvider . GetClient ( ) . GetGroupGrain ( _hubName , groupName ) ;
216
+ var group = _clusterClient . GetGroupGrain ( _hubName , groupName ) ;
217
217
return group . SendExcept ( methodName , args , excludedConnectionIds ) ;
218
218
}
219
219
@@ -223,7 +223,7 @@ public override Task SendUserAsync(string userId, string methodName, object[] ar
223
223
if ( string . IsNullOrWhiteSpace ( userId ) ) throw new ArgumentNullException ( nameof ( userId ) ) ;
224
224
if ( string . IsNullOrWhiteSpace ( methodName ) ) throw new ArgumentNullException ( nameof ( methodName ) ) ;
225
225
226
- var user = _clusterClientProvider . GetClient ( ) . GetUserGrain ( _hubName , userId ) ;
226
+ var user = _clusterClient . GetUserGrain ( _hubName , userId ) ;
227
227
return user . Send ( methodName , args ) ;
228
228
}
229
229
@@ -237,14 +237,14 @@ public override Task SendUsersAsync(IReadOnlyList<string> userIds, string method
237
237
public override Task AddToGroupAsync ( string connectionId , string groupName ,
238
238
CancellationToken cancellationToken = new CancellationToken ( ) )
239
239
{
240
- var group = _clusterClientProvider . GetClient ( ) . GetGroupGrain ( _hubName , groupName ) ;
240
+ var group = _clusterClient . GetGroupGrain ( _hubName , groupName ) ;
241
241
return group . Add ( connectionId ) ;
242
242
}
243
243
244
244
public override Task RemoveFromGroupAsync ( string connectionId , string groupName ,
245
245
CancellationToken cancellationToken = new CancellationToken ( ) )
246
246
{
247
- var group = _clusterClientProvider . GetClient ( ) . GetGroupGrain ( _hubName , groupName ) ;
247
+ var group = _clusterClient . GetGroupGrain ( _hubName , groupName ) ;
248
248
return group . Remove ( connectionId ) ;
249
249
}
250
250
@@ -257,7 +257,7 @@ private Task SendLocal(HubConnectionContext connection, HubInvocationMessage hub
257
257
258
258
private Task SendExternal ( string connectionId , InvocationMessage hubMessage )
259
259
{
260
- var client = _clusterClientProvider . GetClient ( ) . GetClientGrain ( _hubName , connectionId ) ;
260
+ var client = _clusterClient . GetClientGrain ( _hubName , connectionId ) ;
261
261
return client . Send ( hubMessage . AsImmutable ( ) ) ;
262
262
}
263
263
@@ -276,17 +276,11 @@ public void Dispose()
276
276
toUnsubscribe . AddRange ( subscriptions . Select ( s => s . UnsubscribeAsync ( ) ) ) ;
277
277
}
278
278
279
- var serverDirectoryGrain = _clusterClientProvider . GetClient ( ) . GetServerDirectoryGrain ( ) ;
279
+ var serverDirectoryGrain = _clusterClient . GetServerDirectoryGrain ( ) ;
280
280
toUnsubscribe . Add ( serverDirectoryGrain . Unregister ( _serverId ) ) ;
281
281
282
282
Task . WaitAll ( toUnsubscribe . ToArray ( ) ) ;
283
283
284
284
_timer ? . Dispose ( ) ;
285
285
}
286
286
}
287
-
288
- public class AllMessage
289
- {
290
- public IReadOnlyList < string > ExcludedIds { get ; set ; }
291
- public InvocationMessage Payload { get ; set ; }
292
- }
0 commit comments