Skip to content

Commit 1d8db66

Browse files
authored
Respect channel layer capacity in RedisChannelLayer.receive_buffer (#219)
respect the capacity setting so that the receive_buffer does not grow without bounds see: #212
1 parent 2075071 commit 1d8db66

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

CHANGELOG.txt

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
UNRELEASED
2+
----------
3+
4+
* Ensured per-channel queues are bounded in size to avoid a slow memory leak
5+
if consumers stop reading.
6+
17
3.0.1 (2020-07-15)
28
------------------
39

channels_redis/core.py

+22-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import base64
33
import binascii
44
import collections
5+
import functools
56
import hashlib
67
import itertools
78
import logging
@@ -179,6 +180,19 @@ class UnsupportedRedis(Exception):
179180
pass
180181

181182

183+
class BoundedQueue(asyncio.Queue):
184+
def put_nowait(self, item):
185+
if self.full():
186+
# see: https://github.com/django/channels_redis/issues/212
187+
# if we actually get into this code block, it likely means that
188+
# this specific consumer has stopped reading
189+
# if we get into this code block, it's better to drop messages
190+
# that exceed the channel layer capacity than to continue to
191+
# malloc() forever
192+
self.get_nowait()
193+
return super(BoundedQueue, self).put_nowait(item)
194+
195+
182196
class RedisChannelLayer(BaseChannelLayer):
183197
"""
184198
Redis channel layer.
@@ -226,7 +240,9 @@ def __init__(
226240
# Event loop they are trying to receive on
227241
self.receive_event_loop = None
228242
# Buffered messages by process-local channel name
229-
self.receive_buffer = collections.defaultdict(asyncio.Queue)
243+
self.receive_buffer = collections.defaultdict(
244+
functools.partial(BoundedQueue, self.capacity)
245+
)
230246
# Detached channel cleanup tasks
231247
self.receive_cleaners = []
232248
# Per-channel cleanup locks to prevent a receive starting and moving
@@ -544,7 +560,11 @@ async def new_channel(self, prefix="specific"):
544560
Returns a new channel name that can be used by something in our
545561
process as a specific channel.
546562
"""
547-
return "%s.%s!%s" % (prefix, self.client_prefix, uuid.uuid4().hex,)
563+
return "%s.%s!%s" % (
564+
prefix,
565+
self.client_prefix,
566+
uuid.uuid4().hex,
567+
)
548568

549569
### Flush extension ###
550570

tests/test_core.py

+14
Original file line numberDiff line numberDiff line change
@@ -627,3 +627,17 @@ def test_custom_group_key_format():
627627
channel_layer = RedisChannelLayer(prefix="test_prefix")
628628
group_name = channel_layer._group_key("test_group")
629629
assert group_name == b"test_prefix:group:test_group"
630+
631+
632+
def test_receive_buffer_respects_capacity():
633+
channel_layer = RedisChannelLayer()
634+
buff = channel_layer.receive_buffer["test-group"]
635+
for i in range(10000):
636+
buff.put_nowait(i)
637+
638+
capacity = 100
639+
assert channel_layer.capacity == capacity
640+
assert buff.full() is True
641+
assert buff.qsize() == capacity
642+
messages = [buff.get_nowait() for _ in range(capacity)]
643+
assert list(range(9900, 10000)) == messages

0 commit comments

Comments
 (0)