Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 40ee5fb

Browse files
authored
Merge pull request #547 from beyondcode/feature/remove-obsolete-connections-after-120-seconds
[2.x] Remove obsolete connections older than 120 seconds for local channel manager
2 parents 1f6e714 + 2c57668 commit 40ee5fb

10 files changed

+362
-147
lines changed

composer.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@
3838
"evenement/evenement": "^2.0|^3.0",
3939
"facade/ignition-contracts": "^1.0",
4040
"guzzlehttp/psr7": "^1.5",
41-
"illuminate/broadcasting": "^6.0|^7.0|^8.0",
42-
"illuminate/console": "^6.0|^7.0|^8.0",
43-
"illuminate/http": "^6.0|^7.0|^8.0",
44-
"illuminate/routing": "^6.0|^7.0|^8.0",
45-
"illuminate/support": "^6.0|^7.0|^8.0",
41+
"illuminate/broadcasting": "^6.3|^7.0|^8.0",
42+
"illuminate/console": "^6.3|^7.0|^8.0",
43+
"illuminate/http": "^6.3|^7.0|^8.0",
44+
"illuminate/routing": "^6.3|^7.0|^8.0",
45+
"illuminate/support": "^6.3|^7.0|^8.0",
4646
"pusher/pusher-php-server": "^3.0|^4.0",
4747
"react/promise": "^2.0",
4848
"symfony/http-kernel": "^4.0|^5.0",

src/ChannelManagers/LocalChannelManager.php

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
88
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
99
use BeyondCode\LaravelWebSockets\Helpers;
10+
use Carbon\Carbon;
11+
use Illuminate\Cache\ArrayLock;
12+
use Illuminate\Cache\ArrayStore;
1013
use Illuminate\Support\Str;
1114
use Ratchet\ConnectionInterface;
1215
use React\EventLoop\LoopInterface;
@@ -43,6 +46,21 @@ class LocalChannelManager implements ChannelManager
4346
*/
4447
protected $acceptsNewConnections = true;
4548

49+
/**
50+
* The ArrayStore instance of locks.
51+
*
52+
* @var \Illuminate\Cache\ArrayStore
53+
*/
54+
protected $store;
55+
56+
/**
57+
* The lock name to use on Array to avoid multiple
58+
* actions that might lead to multiple processings.
59+
*
60+
* @var string
61+
*/
62+
protected static $lockName = 'laravel-websockets:channel-manager:lock';
63+
4664
/**
4765
* Create a new channel manager instance.
4866
*
@@ -52,7 +70,7 @@ class LocalChannelManager implements ChannelManager
5270
*/
5371
public function __construct(LoopInterface $loop, $factoryClass = null)
5472
{
55-
//
73+
$this->store = new ArrayStore;
5674
}
5775

5876
/**
@@ -398,7 +416,9 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac
398416
*/
399417
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
400418
{
401-
return Helpers::createFulfilledPromise(true);
419+
$connection->lastPongedAt = Carbon::now();
420+
421+
return $this->updateConnectionInChannels($connection);
402422
}
403423

404424
/**
@@ -408,7 +428,43 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
408428
*/
409429
public function removeObsoleteConnections(): PromiseInterface
410430
{
411-
return Helpers::createFulfilledPromise(true);
431+
if (! $this->lock()->acquire()) {
432+
return Helpers::createFulfilledPromise(false);
433+
}
434+
435+
$this->getLocalConnections()->then(function ($connections) {
436+
foreach ($connections as $connection) {
437+
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
438+
439+
if ($differenceInSeconds > 120) {
440+
$this->unsubscribeFromAllChannels($connection);
441+
}
442+
}
443+
});
444+
445+
return Helpers::createFulfilledPromise(
446+
$this->lock()->release()
447+
);
448+
}
449+
450+
/**
451+
* Update the connection in all channels.
452+
*
453+
* @param ConnectionInterface $connection
454+
* @return PromiseInterface[bool]
455+
*/
456+
public function updateConnectionInChannels($connection): PromiseInterface
457+
{
458+
return $this->getLocalChannels($connection->app->id)
459+
->then(function ($channels) use ($connection) {
460+
foreach ($channels as $channel) {
461+
if ($channel->hasConnection($connection)) {
462+
$channel->saveConnection($connection);
463+
}
464+
}
465+
466+
return true;
467+
});
412468
}
413469

414470
/**
@@ -452,4 +508,14 @@ protected function getChannelClassName(string $channelName): string
452508

453509
return Channel::class;
454510
}
511+
512+
/**
513+
* Get a new ArrayLock instance to avoid race conditions.
514+
*
515+
* @return \Illuminate\Cache\CacheLock
516+
*/
517+
protected function lock()
518+
{
519+
return new ArrayLock($this->store, static::$lockName, 0);
520+
}
455521
}

src/ChannelManagers/RedisChannelManager.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class RedisChannelManager extends LocalChannelManager
5959
*
6060
* @var string
6161
*/
62-
protected static $redisLockName = 'laravel-websockets:channel-manager:lock';
62+
protected static $lockName = 'laravel-websockets:channel-manager:lock';
6363

6464
/**
6565
* Create a new channel manager instance.
@@ -768,7 +768,7 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix
768768
*/
769769
protected function lock()
770770
{
771-
return new RedisLock($this->redis, static::$redisLockName, 0);
771+
return new RedisLock($this->redis, static::$lockName, 0);
772772
}
773773

774774
/**

src/Channels/Channel.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload): b
100100
*/
101101
public function unsubscribe(ConnectionInterface $connection): bool
102102
{
103-
if (! isset($this->connections[$connection->socketId])) {
103+
if (! $this->hasConnection($connection)) {
104104
return false;
105105
}
106106

@@ -109,13 +109,24 @@ public function unsubscribe(ConnectionInterface $connection): bool
109109
return true;
110110
}
111111

112+
/**
113+
* Check if the given connection exists.
114+
*
115+
* @param \Ratchet\ConnectionInterface $connection
116+
* @return bool
117+
*/
118+
public function hasConnection(ConnectionInterface $connection): bool
119+
{
120+
return isset($this->connections[$connection->socketId]);
121+
}
122+
112123
/**
113124
* Store the connection to the subscribers list.
114125
*
115126
* @param \Ratchet\ConnectionInterface $connection
116127
* @return void
117128
*/
118-
protected function saveConnection(ConnectionInterface $connection)
129+
public function saveConnection(ConnectionInterface $connection)
119130
{
120131
$this->connections[$connection->socketId] = $connection;
121132
}

src/Server/WebSocketHandler.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public function onOpen(ConnectionInterface $connection)
5757

5858
$this->channelManager->subscribeToApp($connection->app->id);
5959

60+
$this->channelManager->connectionPonged($connection);
61+
6062
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_CONNECTED, [
6163
'origin' => "{$request->getUri()->getScheme()}://{$request->getUri()->getHost()}",
6264
'socketId' => $connection->socketId,

tests/LocalPongRemovalTest.php

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\Test;
4+
5+
use Carbon\Carbon;
6+
7+
class LocalPongRemovalTest extends TestCase
8+
{
9+
public function test_not_ponged_connections_do_get_removed_on_local_for_public_channels()
10+
{
11+
$this->runOnlyOnLocalReplication();
12+
13+
$activeConnection = $this->newActiveConnection(['public-channel']);
14+
$obsoleteConnection = $this->newActiveConnection(['public-channel']);
15+
16+
// The active connection just pinged, it should not be closed.
17+
$activeConnection->lastPongedAt = Carbon::now();
18+
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
19+
20+
$this->channelManager->updateConnectionInChannels($activeConnection);
21+
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
22+
23+
$this->channelManager
24+
->getGlobalConnectionsCount('1234', 'public-channel')
25+
->then(function ($count) {
26+
$this->assertEquals(2, $count);
27+
});
28+
29+
$this->channelManager->removeObsoleteConnections();
30+
31+
$this->channelManager
32+
->getGlobalConnectionsCount('1234', 'public-channel')
33+
->then(function ($count) {
34+
$this->assertEquals(1, $count);
35+
});
36+
37+
$this->channelManager
38+
->getLocalConnections()
39+
->then(function ($connections) use ($activeConnection) {
40+
$connection = $connections[$activeConnection->socketId];
41+
42+
$this->assertEquals($activeConnection->socketId, $connection->socketId);
43+
});
44+
}
45+
46+
public function test_not_ponged_connections_do_get_removed_on_local_for_private_channels()
47+
{
48+
$this->runOnlyOnLocalReplication();
49+
50+
$activeConnection = $this->newPrivateConnection('private-channel');
51+
$obsoleteConnection = $this->newPrivateConnection('private-channel');
52+
53+
// The active connection just pinged, it should not be closed.
54+
$activeConnection->lastPongedAt = Carbon::now();
55+
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
56+
57+
$this->channelManager->updateConnectionInChannels($activeConnection);
58+
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
59+
60+
$this->channelManager
61+
->getGlobalConnectionsCount('1234', 'private-channel')
62+
->then(function ($count) {
63+
$this->assertEquals(2, $count);
64+
});
65+
66+
$this->channelManager->removeObsoleteConnections();
67+
68+
$this->channelManager
69+
->getGlobalConnectionsCount('1234', 'private-channel')
70+
->then(function ($count) {
71+
$this->assertEquals(1, $count);
72+
});
73+
74+
$this->channelManager
75+
->getLocalConnections()
76+
->then(function ($connections) use ($activeConnection) {
77+
$connection = $connections[$activeConnection->socketId];
78+
79+
$this->assertEquals($activeConnection->socketId, $connection->socketId);
80+
});
81+
}
82+
83+
public function test_not_ponged_connections_do_get_removed_on_local_for_presence_channels()
84+
{
85+
$this->runOnlyOnLocalReplication();
86+
87+
$activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
88+
$obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
89+
90+
// The active connection just pinged, it should not be closed.
91+
$activeConnection->lastPongedAt = Carbon::now();
92+
$obsoleteConnection->lastPongedAt = Carbon::now()->subDays(1);
93+
94+
$this->channelManager->updateConnectionInChannels($activeConnection);
95+
$this->channelManager->updateConnectionInChannels($obsoleteConnection);
96+
97+
$this->channelManager
98+
->getGlobalConnectionsCount('1234', 'presence-channel')
99+
->then(function ($count) {
100+
$this->assertEquals(2, $count);
101+
});
102+
103+
$this->channelManager
104+
->getChannelMembers('1234', 'presence-channel')
105+
->then(function ($members) {
106+
$this->assertCount(2, $members);
107+
});
108+
109+
$this->channelManager->removeObsoleteConnections();
110+
111+
$this->channelManager
112+
->getGlobalConnectionsCount('1234', 'presence-channel')
113+
->then(function ($count) {
114+
$this->assertEquals(1, $count);
115+
});
116+
117+
$this->channelManager
118+
->getLocalConnections()
119+
->then(function ($connections) use ($activeConnection) {
120+
$connection = $connections[$activeConnection->socketId];
121+
122+
$this->assertEquals($activeConnection->socketId, $connection->socketId);
123+
});
124+
125+
$this->channelManager
126+
->getChannelMembers('1234', 'presence-channel')
127+
->then(function ($members) {
128+
$this->assertCount(1, $members);
129+
});
130+
}
131+
}

tests/PresenceChannelTest.php

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use BeyondCode\LaravelWebSockets\API\TriggerEvent;
66
use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature;
7-
use Carbon\Carbon;
87
use GuzzleHttp\Psr7\Request;
98
use Illuminate\Http\JsonResponse;
109
use Pusher\Pusher;
@@ -318,58 +317,6 @@ public function test_multiple_clients_with_same_user_id_trigger_member_added_and
318317
});
319318
}
320319

321-
public function test_not_ponged_connections_do_get_removed_for_presence_channels()
322-
{
323-
$this->runOnlyOnRedisReplication();
324-
325-
$activeConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
326-
$obsoleteConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
327-
328-
// The active connection just pinged, it should not be closed.
329-
$this->channelManager->addConnectionToSet($activeConnection, Carbon::now());
330-
331-
// Make the connection look like it was lost 1 day ago.
332-
$this->channelManager->addConnectionToSet($obsoleteConnection, Carbon::now()->subDays(1));
333-
334-
$this->channelManager
335-
->getGlobalConnectionsCount('1234', 'presence-channel')
336-
->then(function ($count) {
337-
$this->assertEquals(2, $count);
338-
});
339-
340-
$this->channelManager
341-
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
342-
->then(function ($expiredConnections) {
343-
$this->assertCount(1, $expiredConnections);
344-
});
345-
346-
$this->channelManager
347-
->getChannelMembers('1234', 'presence-channel')
348-
->then(function ($members) {
349-
$this->assertCount(2, $members);
350-
});
351-
352-
$this->channelManager->removeObsoleteConnections();
353-
354-
$this->channelManager
355-
->getGlobalConnectionsCount('1234', 'presence-channel')
356-
->then(function ($count) {
357-
$this->assertEquals(1, $count);
358-
});
359-
360-
$this->channelManager
361-
->getConnectionsFromSet(0, Carbon::now()->subMinutes(2)->format('U'))
362-
->then(function ($expiredConnections) {
363-
$this->assertCount(0, $expiredConnections);
364-
});
365-
366-
$this->channelManager
367-
->getChannelMembers('1234', 'presence-channel')
368-
->then(function ($members) {
369-
$this->assertCount(1, $members);
370-
});
371-
}
372-
373320
public function test_events_are_processed_by_on_message_on_presence_channels()
374321
{
375322
$this->runOnlyOnRedisReplication();

0 commit comments

Comments
 (0)