Skip to content

Commit 612e942

Browse files
committed
AioHTTP and Async HTTPX teardown improvments pt.1
1 parent 0656e6a commit 612e942

File tree

4 files changed

+105
-20
lines changed

4 files changed

+105
-20
lines changed

pubnub/pubnub_core.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ def list_push_channels(self, device_id: str = None, push_type: PNPushType = None
250250
environment: PNPushEnvironment = None) -> ListPushProvisions:
251251
return ListPushProvisions(self, device_id=device_id, push_type=push_type, topic=topic, environment=environment)
252252

253-
def add_channels_to_push(self, channels: Union[str, List[str]], device_id: str = None, push_type: PNPushType = None,
253+
def add_channels_to_push(self, channels: Union[str, List[str]] = None, device_id: str = None, push_type: PNPushType = None,
254254
topic: str = None, environment: PNPushEnvironment = None) -> AddChannelsToPush:
255255
return AddChannelsToPush(self, channels=channels, device_id=device_id, push_type=push_type, topic=topic,
256256
environment=environment)

pubnub/request_handlers/async_aiohttp.py

+36-6
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,55 @@ class AsyncAiohttpRequestHandler(BaseRequestHandler):
2828
ENDPOINT_THREAD_COUNTER: int = 0
2929
_connector: aiohttp.TCPConnector = None
3030
_session: aiohttp.ClientSession = None
31+
_is_closing: bool = False
32+
_max_connections: int = 100
33+
_connection_timeout: float = 30.0
3134

3235
def __init__(self, pubnub):
3336
self.pubnub = pubnub
34-
self._connector = aiohttp.TCPConnector(verify_ssl=True, loop=self.pubnub.event_loop)
37+
self._connector = aiohttp.TCPConnector(
38+
verify_ssl=True,
39+
loop=self.pubnub.event_loop,
40+
limit=self._max_connections,
41+
ttl_dns_cache=300
42+
)
43+
self._is_closing = False
3544

3645
async def create_session(self):
37-
if not self._session:
46+
if not self._session and not self._is_closing:
3847
self._session = aiohttp.ClientSession(
3948
loop=self.pubnub.event_loop,
40-
timeout=aiohttp.ClientTimeout(connect=self.pubnub.config.connect_timeout),
49+
timeout=aiohttp.ClientTimeout(
50+
total=self._connection_timeout,
51+
connect=self.pubnub.config.connect_timeout,
52+
sock_read=self.pubnub.config.connect_timeout,
53+
sock_connect=self.pubnub.config.connect_timeout
54+
),
4155
connector=self._connector
4256
)
4357

4458
async def close_session(self):
45-
if self._session is not None:
46-
await self._session.close()
59+
if self._session is not None and not self._is_closing:
60+
self._is_closing = True
61+
try:
62+
# Cancel any pending requests
63+
if hasattr(self._session, '_connector'):
64+
for task in asyncio.all_tasks():
65+
if not task.done() and task is not asyncio.current_task():
66+
task.cancel()
67+
68+
# Close session and connector
69+
await self._session.close()
70+
await self._connector.close()
71+
except Exception as e:
72+
logger.error(f"Error during session cleanup: {str(e)}")
73+
finally:
74+
self._session = None
75+
self._is_closing = False
4776

4877
async def set_connector(self, connector):
49-
await self._session.aclose()
78+
if self._session is not None:
79+
await self.close_session()
5080
self._connector = connector
5181
await self.create_session()
5282

pubnub/request_handlers/async_httpx.py

+67-12
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,85 @@ def close(self):
2424
self.is_closed = True
2525
asyncio.create_task(super().aclose())
2626

27+
async def aclose(self):
28+
self.is_closed = True
29+
await super().aclose()
30+
31+
async def __aenter__(self):
32+
return self
33+
34+
async def __aexit__(self, exc_type, exc_val, exc_tb):
35+
await self.aclose()
36+
2737

2838
class AsyncHttpxRequestHandler(BaseRequestHandler):
2939
""" PubNub Python SDK asychronous requests handler based on the `httpx` HTTP library. """
3040
ENDPOINT_THREAD_COUNTER: int = 0
3141
_connector: httpx.AsyncHTTPTransport = None
3242
_session: httpx.AsyncClient = None
43+
_is_closing: bool = False
44+
_max_connections: int = 100
45+
_connection_timeout: float = 30.0
3346

3447
def __init__(self, pubnub):
3548
self.pubnub = pubnub
36-
self._connector = PubNubAsyncHTTPTransport(verify=True, http2=True)
49+
self._connector = PubNubAsyncHTTPTransport(
50+
verify=True,
51+
http2=True,
52+
limits=httpx.Limits(max_connections=self._max_connections)
53+
)
54+
self._is_closing = False
3755

3856
async def create_session(self):
39-
self._session = httpx.AsyncClient(
40-
timeout=httpx.Timeout(self.pubnub.config.connect_timeout),
41-
transport=self._connector
42-
)
57+
if self._session is None and not self._is_closing:
58+
self._session = httpx.AsyncClient(
59+
timeout=httpx.Timeout(
60+
connect=self._connection_timeout,
61+
read=self.pubnub.config.connect_timeout,
62+
write=self.pubnub.config.connect_timeout,
63+
pool=self._connection_timeout
64+
),
65+
transport=self._connector,
66+
http2=True
67+
)
4368

4469
async def close_session(self):
45-
if self._session is not None:
46-
self._connector.close()
47-
await self._session.aclose()
70+
if self._session is not None and not self._is_closing:
71+
self._is_closing = True
72+
try:
73+
# Cancel any pending requests
74+
if hasattr(self._session, '_transport'):
75+
for task in asyncio.all_tasks():
76+
if not task.done() and task is not asyncio.current_task():
77+
task.cancel()
78+
79+
# Close transport and session
80+
await self._connector.aclose()
81+
await self._session.aclose()
82+
except Exception as e:
83+
logger.error(f"Error during session cleanup: {str(e)}")
84+
finally:
85+
self._session = None
86+
self._is_closing = False
4887

4988
async def set_connector(self, connector):
50-
await self._session.aclose()
89+
if self._session is not None:
90+
await self.close_session()
5191
self._connector = connector
5292
await self.create_session()
5393

94+
async def __aenter__(self):
95+
await self.create_session()
96+
return self
97+
98+
async def __aexit__(self, exc_type, exc_val, exc_tb):
99+
await self.close_session()
100+
101+
def __del__(self):
102+
...
103+
# if self._session is not None and not self._is_closing:
104+
# asyncio.create_task(self.close_session())
105+
54106
def sync_request(self, **_):
55107
raise NotImplementedError("sync_request is not implemented for asyncio handler")
56108

@@ -123,13 +175,17 @@ async def async_request(self, options_func, cancellation_event):
123175
except Exception as e:
124176
logger.error("session.request exception: %s" % str(e))
125177
raise
178+
try:
179+
response_body = await response.aread()
180+
except Exception as e:
181+
logger.error(f"Error reading response body: {str(e)}")
182+
response_body = None
126183

127-
response_body = response.read()
128184
if not options.non_json_response:
129185
body = response_body
130186
else:
131187
if isinstance(response.content, bytes):
132-
body = response.content # TODO: simplify this logic within the v5 release
188+
body = response.content
133189
else:
134190
body = response_body
135191

@@ -192,7 +248,6 @@ async def async_request(self, options_func, cancellation_event):
192248
logger.debug(data)
193249

194250
if response.status_code not in (200, 307, 204):
195-
196251
if response.status_code >= 500:
197252
err = PNERR_SERVER_ERROR
198253
else:

pubnub/request_handlers/httpx.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class HttpxRequestHandler(BaseRequestHandler):
2828
ENDPOINT_THREAD_COUNTER: int = 0
2929

3030
def __init__(self, pubnub):
31-
self.session = httpx.Client()
31+
self.session = httpx.Client(http2=True)
3232

3333
self.pubnub = pubnub
3434

0 commit comments

Comments
 (0)