3
3
namespace BeyondCode \LaravelWebSockets \ChannelManagers ;
4
4
5
5
use BeyondCode \LaravelWebSockets \Channels \Channel ;
6
+ use BeyondCode \LaravelWebSockets \Helpers ;
7
+ use BeyondCode \LaravelWebSockets \Server \MockableConnection ;
8
+ use Carbon \Carbon ;
6
9
use Clue \React \Redis \Client ;
7
10
use Clue \React \Redis \Factory ;
11
+ use Illuminate \Cache \RedisLock ;
12
+ use Illuminate \Support \Facades \Redis ;
8
13
use Illuminate \Support \Str ;
9
14
use Ratchet \ConnectionInterface ;
10
15
use React \EventLoop \LoopInterface ;
@@ -41,6 +46,21 @@ class RedisChannelManager extends LocalChannelManager
41
46
*/
42
47
protected $ subscribeClient ;
43
48
49
+ /**
50
+ * The Redis manager instance.
51
+ *
52
+ * @var \Illuminate\Redis\RedisManager
53
+ */
54
+ protected $ redis ;
55
+
56
+ /**
57
+ * The lock name to use on Redis to avoid multiple
58
+ * actions that might lead to multiple processings.
59
+ *
60
+ * @var string
61
+ */
62
+ protected static $ redisLockName = 'laravel-websockets:channel-manager:lock ' ;
63
+
44
64
/**
45
65
* Create a new channel manager instance.
46
66
*
@@ -52,6 +72,10 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
52
72
{
53
73
$ this ->loop = $ loop ;
54
74
75
+ $ this ->redis = Redis::connection (
76
+ config ('websockets.replication.modes.redis.connection ' , 'default ' )
77
+ );
78
+
55
79
$ connectionUri = $ this ->getConnectionUri ();
56
80
57
81
$ factoryClass = $ factoryClass ?: Factory::class;
@@ -67,6 +91,17 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
67
91
$ this ->serverId = Str::uuid ()->toString ();
68
92
}
69
93
94
+ /**
95
+ * Get the local connections, regardless of the channel
96
+ * they are connected to.
97
+ *
98
+ * @return \React\Promise\PromiseInterface
99
+ */
100
+ public function getLocalConnections (): PromiseInterface
101
+ {
102
+ return parent ::getLocalConnections ();
103
+ }
104
+
70
105
/**
71
106
* Get all channels for a specific app
72
107
* for the current instance.
@@ -108,9 +143,9 @@ public function unsubscribeFromAllChannels(ConnectionInterface $connection)
108
143
$ connection , $ channel , new stdClass
109
144
);
110
145
}
146
+ })->then (function () use ($ connection ) {
147
+ parent ::unsubscribeFromAllChannels ($ connection );
111
148
});
112
-
113
- parent ::unsubscribeFromAllChannels ($ connection );
114
149
}
115
150
116
151
/**
@@ -130,6 +165,8 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
130
165
}
131
166
});
132
167
168
+ $ this ->addConnectionToSet ($ connection );
169
+
133
170
$ this ->addChannelToSet (
134
171
$ connection ->app ->id , $ channelName
135
172
);
@@ -156,8 +193,14 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
156
193
if ($ count === 0 ) {
157
194
$ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
158
195
196
+ $ this ->removeUserData (
197
+ $ connection ->app ->id , $ channelName , $ connection ->socketId
198
+ );
199
+
159
200
$ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
160
201
202
+ $ this ->removeConnectionFromSet ($ connection );
203
+
161
204
return ;
162
205
}
163
206
@@ -168,7 +211,13 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
168
211
if ($ count < 1 ) {
169
212
$ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
170
213
214
+ $ this ->removeUserData (
215
+ $ connection ->app ->id , $ channelName , $ connection ->socketId
216
+ );
217
+
171
218
$ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
219
+
220
+ $ this ->removeConnectionFromSet ($ connection );
172
221
}
173
222
});
174
223
});
@@ -293,12 +342,8 @@ public function getChannelMembers($appId, string $channel): PromiseInterface
293
342
{
294
343
return $ this ->publishClient
295
344
->hgetall ($ this ->getRedisKey ($ appId , $ channel , ['users ' ]))
296
- ->then (function ($ members ) {
297
- [$ keys , $ values ] = collect ($ members )->partition (function ($ value , $ key ) {
298
- return $ key % 2 === 0 ;
299
- });
300
-
301
- return collect (array_combine ($ keys ->all (), $ values ->all ()))
345
+ ->then (function ($ list ) {
346
+ return collect (Helpers::redisListToArray ($ list ))
302
347
->map (function ($ user ) {
303
348
return json_decode ($ user );
304
349
})
@@ -344,6 +389,43 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
344
389
});
345
390
}
346
391
392
+ /**
393
+ * Keep tracking the connections availability when they pong.
394
+ *
395
+ * @param \Ratchet\ConnectionInterface $connection
396
+ * @return bool
397
+ */
398
+ public function connectionPonged (ConnectionInterface $ connection ): bool
399
+ {
400
+ // This will update the score with the current timestamp.
401
+ $ this ->addConnectionToSet ($ connection );
402
+
403
+ return parent ::connectionPonged ($ connection );
404
+ }
405
+
406
+ /**
407
+ * Remove the obsolete connections that didn't ponged in a while.
408
+ *
409
+ * @return bool
410
+ */
411
+ public function removeObsoleteConnections (): bool
412
+ {
413
+ $ this ->lock ()->get (function () {
414
+ $ this ->getConnectionsFromSet (0 , now ()->subMinutes (2 )->format ('U ' ))
415
+ ->then (function ($ connections ) {
416
+ foreach ($ connections as $ connection => $ score ) {
417
+ [$ appId , $ socketId ] = explode (': ' , $ connection );
418
+
419
+ $ this ->unsubscribeFromAllChannels (
420
+ $ this ->fakeConnectionForApp ($ appId , $ socketId )
421
+ );
422
+ }
423
+ });
424
+ });
425
+
426
+ return parent ::removeObsoleteConnections ();
427
+ }
428
+
347
429
/**
348
430
* Handle a message received from Redis on a specific channel.
349
431
*
@@ -462,6 +544,57 @@ public function decrementSubscriptionsCount($appId, string $channel = null, int
462
544
return $ this ->incrementSubscriptionsCount ($ appId , $ channel , $ increment * -1 );
463
545
}
464
546
547
+ /**
548
+ * Add the connection to the sorted list.
549
+ *
550
+ * @param \Ratchet\ConnectionInterface $connection
551
+ * @param \DateTime|string|null $moment
552
+ * @return void
553
+ */
554
+ public function addConnectionToSet (ConnectionInterface $ connection , $ moment = null )
555
+ {
556
+ $ this ->getPublishClient ()
557
+ ->zadd (
558
+ $ this ->getRedisKey (null , null , ['sockets ' ]),
559
+ Carbon::parse ($ moment )->format ('U ' ), "{$ connection ->app ->id }: {$ connection ->socketId }"
560
+ );
561
+ }
562
+
563
+ /**
564
+ * Remove the connection from the sorted list.
565
+ *
566
+ * @param \Ratchet\ConnectionInterface $connection
567
+ * @return void
568
+ */
569
+ public function removeConnectionFromSet (ConnectionInterface $ connection )
570
+ {
571
+ $ this ->getPublishClient ()
572
+ ->zrem (
573
+ $ this ->getRedisKey (null , null , ['sockets ' ]),
574
+ "{$ connection ->app ->id }: {$ connection ->socketId }"
575
+ );
576
+ }
577
+
578
+ /**
579
+ * Get the connections from the sorted list, with last
580
+ * connection between certain timestamps.
581
+ *
582
+ * @param int $start
583
+ * @param int $stop
584
+ * @return PromiseInterface
585
+ */
586
+ public function getConnectionsFromSet (int $ start = 0 , int $ stop = 0 )
587
+ {
588
+ return $ this ->getPublishClient ()
589
+ ->zrange (
590
+ $ this ->getRedisKey (null , null , ['sockets ' ]),
591
+ $ start , $ stop , 'withscores '
592
+ )
593
+ ->then (function ($ list ) {
594
+ return Helpers::redisListToArray ($ list );
595
+ });
596
+ }
597
+
465
598
/**
466
599
* Add a channel to the set list.
467
600
*
@@ -555,11 +688,11 @@ public function unsubscribeFromTopic($appId, string $channel = null)
555
688
* Get the Redis Keyspace name to handle subscriptions
556
689
* and other key-value sets.
557
690
*
558
- * @param mixed $appId
691
+ * @param string|int|null $appId
559
692
* @param string|null $channel
560
693
* @return string
561
694
*/
562
- public function getRedisKey ($ appId , string $ channel = null , array $ suffixes = []): string
695
+ public function getRedisKey ($ appId = null , string $ channel = null , array $ suffixes = []): string
563
696
{
564
697
$ prefix = config ('database.redis.options.prefix ' , null );
565
698
@@ -577,4 +710,28 @@ public function getRedisKey($appId, string $channel = null, array $suffixes = []
577
710
578
711
return $ hash ;
579
712
}
713
+
714
+ /**
715
+ * Get a new RedisLock instance to avoid race conditions.
716
+ *
717
+ * @return \Illuminate\Cache\CacheLock
718
+ */
719
+ protected function lock ()
720
+ {
721
+ return new RedisLock ($ this ->redis , static ::$ redisLockName , 0 );
722
+ }
723
+
724
+ /**
725
+ * Create a fake connection for app that will mimick a connection
726
+ * by app ID and Socket ID to be able to be passed to the methods
727
+ * that accepts a connection class.
728
+ *
729
+ * @param string|int $appId
730
+ * @param string $socketId
731
+ * @return ConnectionInterface
732
+ */
733
+ public function fakeConnectionForApp ($ appId , string $ socketId )
734
+ {
735
+ return new MockableConnection ($ appId , $ socketId );
736
+ }
580
737
}
0 commit comments