File tree 2 files changed +32
-1
lines changed
2 files changed +32
-1
lines changed Original file line number Diff line number Diff line change 2
2
import base64
3
3
import binascii
4
4
import collections
5
+ import functools
5
6
import hashlib
6
7
import itertools
7
8
import logging
@@ -179,6 +180,19 @@ class UnsupportedRedis(Exception):
179
180
pass
180
181
181
182
183
+ class BoundedQueue (asyncio .Queue ):
184
+ def put_nowait (self , item ):
185
+ if self .full ():
186
+ # see: https://github.com/django/channels_redis/issues/212
187
+ # if we actually get into this code block, it likely means that
188
+ # this specific consumer has stopped reading
189
+ # if we get into this code block, it's better to drop messages
190
+ # that exceed the channel layer capacity than to continue to
191
+ # malloc() forever
192
+ self .get_nowait ()
193
+ return super (BoundedQueue , self ).put_nowait (item )
194
+
195
+
182
196
class RedisChannelLayer (BaseChannelLayer ):
183
197
"""
184
198
Redis channel layer.
@@ -226,7 +240,9 @@ def __init__(
226
240
# Event loop they are trying to receive on
227
241
self .receive_event_loop = None
228
242
# Buffered messages by process-local channel name
229
- self .receive_buffer = collections .defaultdict (asyncio .Queue )
243
+ self .receive_buffer = collections .defaultdict (
244
+ functools .partial (BoundedQueue , self .capacity )
245
+ )
230
246
# Detached channel cleanup tasks
231
247
self .receive_cleaners = []
232
248
# Per-channel cleanup locks to prevent a receive starting and moving
Original file line number Diff line number Diff line change @@ -627,3 +627,18 @@ def test_custom_group_key_format():
627
627
channel_layer = RedisChannelLayer (prefix = "test_prefix" )
628
628
group_name = channel_layer ._group_key ("test_group" )
629
629
assert group_name == b"test_prefix:group:test_group"
630
+
631
+
632
+ @pytest .mark .asyncio
633
+ async def test_receive_buffer_respects_capacity ():
634
+ channel_layer = RedisChannelLayer ()
635
+ buff = channel_layer .receive_buffer ['test-group' ]
636
+ for i in range (10000 ):
637
+ buff .put_nowait (i )
638
+
639
+ capacity = 100
640
+ assert channel_layer .capacity == capacity
641
+ assert buff .full () is True
642
+ assert buff .qsize () == capacity
643
+ messages = [buff .get_nowait () for _ in range (capacity )]
644
+ assert list (range (9900 , 10000 )) == messages
You can’t perform that action at this time.
0 commit comments