File tree 1 file changed +17
-1
lines changed
1 file changed +17
-1
lines changed Original file line number Diff line number Diff line change 2
2
import base64
3
3
import binascii
4
4
import collections
5
+ import functools
5
6
import hashlib
6
7
import itertools
7
8
import logging
@@ -179,6 +180,19 @@ class UnsupportedRedis(Exception):
179
180
pass
180
181
181
182
183
+ class BoundedQueue (asyncio .Queue ):
184
+ def put_nowait (self , item ):
185
+ if self .full ():
186
+ # see: https://github.com/django/channels_redis/issues/212
187
+ # if we actually get into this code block, it likely means that
188
+ # this specific consumer has stopped reading
189
+ # if we get into this code block, it's better to drop messages
190
+ # that exceed the channel layer capacity than to continue to
191
+ # malloc() forever
192
+ self .get_nowait ()
193
+ return super (BoundedQueue , self ).put_nowait (item )
194
+
195
+
182
196
class RedisChannelLayer (BaseChannelLayer ):
183
197
"""
184
198
Redis channel layer.
@@ -226,7 +240,9 @@ def __init__(
226
240
# Event loop they are trying to receive on
227
241
self .receive_event_loop = None
228
242
# Buffered messages by process-local channel name
229
- self .receive_buffer = collections .defaultdict (asyncio .Queue )
243
+ self .receive_buffer = collections .defaultdict (
244
+ functools .partial (BoundedQueue , self .capacity )
245
+ )
230
246
# Detached channel cleanup tasks
231
247
self .receive_cleaners = []
232
248
# Per-channel cleanup locks to prevent a receive starting and moving
You can’t perform that action at this time.
0 commit comments