Skip to content

Commit 5f8e61a

Browse files
author
Edoardo Gallo
authored
Fix half-open socket (#54)
* add blade.ping and test it * use blade.ping to detect half-open socket connection * minor refactoring and tests * changelog entry * increase keepalive delay * use const for ping_delay
1 parent 6e02a0f commit 5f8e61a

File tree

4 files changed

+49
-2
lines changed

4 files changed

+49
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
99

1010
### Fixed
1111
- Check signals supported by the environment. On Windows there is no `SIGHUP`.
12+
- Detect half-open connection and force close connection to update Client/Consumer properly.
1213

1314
## [2.0.0rc1] - 2019-10-28
1415
### Added

signalwire/blade/messages/ping.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from signalwire.blade.messages.message import Message
2+
3+
class Ping(Message):
4+
5+
def __init__(self, timestamp=None):
6+
self.method = 'blade.ping'
7+
params = { 'timestamp': timestamp } if timestamp else {}
8+
super().__init__(params=params)

signalwire/relay/client.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from uuid import uuid4
77
from signalwire.blade.connection import Connection
88
from signalwire.blade.messages.connect import Connect
9+
from signalwire.blade.messages.ping import Ping
910
from signalwire.blade.handler import register, unregister, trigger
1011
from .helpers import setup_protocol
1112
from .calling import Calling
@@ -15,6 +16,8 @@
1516
from .constants import Constants, WebSocketEvents
1617

1718
class Client:
19+
PING_DELAY = 10
20+
1821
def __init__(self, project, token, host=Constants.HOST, connection=Connection):
1922
self.loop = asyncio.get_event_loop()
2023
self.host = host
@@ -34,6 +37,7 @@ def __init__(self, project, token, host=Constants.HOST, connection=Connection):
3437
self._requests = {}
3538
self._idle = False
3639
self._executeQueue = asyncio.Queue()
40+
self._pingInterval = None
3741
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
3842
logging.basicConfig(level=log_level)
3943

@@ -79,7 +83,7 @@ async def _connect(self):
7983
await self.connection.connect()
8084
asyncio.create_task(self.on_socket_open())
8185
await self.connection.read()
82-
trigger(WebSocketEvents.CLOSE, suffix=self.uuid)
86+
self.on_socket_close()
8387
except aiohttp.client_exceptions.ClientConnectorError as error:
8488
trigger(WebSocketEvents.ERROR, error, suffix=self.uuid)
8589
logging.warn(f"{self.host} seems down..")
@@ -122,6 +126,12 @@ def attach_signals(self):
122126
except:
123127
pass
124128

129+
def on_socket_close(self):
130+
if self._pingInterval:
131+
self._pingInterval.cancel()
132+
self.contexts = []
133+
trigger(WebSocketEvents.CLOSE, suffix=self.uuid)
134+
125135
async def on_socket_open(self):
126136
try:
127137
self._idle = False
@@ -131,12 +141,24 @@ async def on_socket_open(self):
131141
self.signature = result['authorization']['signature']
132142
self.protocol = await setup_protocol(self)
133143
await self._clearExecuteQueue()
144+
self._pong = True
145+
self.keepalive()
134146
logging.info('Client connected!')
135147
trigger(Constants.READY, self, suffix=self.uuid)
136148
except Exception as error:
137149
logging.error('Client setup error: {0}'.format(str(error)))
138150
await self.connection.close()
139151

152+
def keepalive(self):
153+
async def send_ping():
154+
if self._pong is False:
155+
return await self.connection.close()
156+
self._pong = False
157+
await self.execute(Ping())
158+
self._pong = True
159+
asyncio.create_task(send_ping())
160+
self._pingInterval = self.loop.call_later(self.PING_DELAY, self.keepalive)
161+
140162
async def _clearExecuteQueue(self):
141163
while True:
142164
if self._executeQueue.empty():

signalwire/tests/blade/test_blade_messages.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
from time import time
12
from unittest import TestCase
2-
from unittest.mock import patch
33
from signalwire import __version__
44
from signalwire.blade.messages.message import Message
55
from signalwire.blade.messages.connect import Connect
66
from signalwire.blade.messages.execute import Execute
77
from signalwire.blade.messages.subscription import Subscription
8+
from signalwire.blade.messages.ping import Ping
89

910
class TestBladeMessages(TestCase):
1011
def test_from_json_with_result(self):
@@ -74,3 +75,18 @@ def test_subscription(self):
7475

7576
self.assertEqual(msg.method, 'blade.subscription')
7677
self.assertEqual(msg.to_json(), '{"method":"blade.subscription","jsonrpc":"2.0","id":"mocked","params":{"protocol":"proto","command":"add","channels":["notif"]}}')
78+
79+
def test_ping_without_ts(self):
80+
msg = Ping()
81+
msg.id = 'mocked'
82+
83+
self.assertEqual(msg.method, 'blade.ping')
84+
self.assertEqual(msg.to_json(), '{"method":"blade.ping","jsonrpc":"2.0","id":"mocked","params":{}}')
85+
86+
def test_ping_with_ts(self):
87+
ts = time()
88+
msg = Ping(ts)
89+
msg.id = 'mocked'
90+
91+
self.assertEqual(msg.method, 'blade.ping')
92+
self.assertEqual(msg.to_json(), f'{{"method":"blade.ping","jsonrpc":"2.0","id":"mocked","params":{{"timestamp":{ts}}}}}')

0 commit comments

Comments
 (0)