File tree 1 file changed +18
-1
lines changed
1 file changed +18
-1
lines changed Original file line number Diff line number Diff line change 2
2
import base64
3
3
import binascii
4
4
import collections
5
+ import functools
5
6
import hashlib
6
7
import itertools
7
8
import logging
@@ -179,6 +180,20 @@ class UnsupportedRedis(Exception):
179
180
pass
180
181
181
182
183
+ class BoundedQueue (asyncio .Queue ):
184
+
185
+ def put_nowait (self , item ):
186
+ if self .full ():
187
+ # see: https://github.com/django/channels_redis/issues/212
188
+ # if we actually get into this code block, it likely means that
189
+ # this specific consumer has stopped reading
190
+ # if we get into this code block, it's better to drop messages
191
+ # that exceed the channel layer capacity than to continue to
192
+ # malloc() forever
193
+ self .get_nowait ()
194
+ return super (BoundedQueue , self ).put_nowait (item )
195
+
196
+
182
197
class RedisChannelLayer (BaseChannelLayer ):
183
198
"""
184
199
Redis channel layer.
@@ -226,7 +241,9 @@ def __init__(
226
241
# Event loop they are trying to receive on
227
242
self .receive_event_loop = None
228
243
# Buffered messages by process-local channel name
229
- self .receive_buffer = collections .defaultdict (asyncio .Queue )
244
+ self .receive_buffer = collections .defaultdict (
245
+ functools .partial (BoundedQueue , self .capacity )
246
+ )
230
247
# Detached channel cleanup tasks
231
248
self .receive_cleaners = []
232
249
# Per-channel cleanup locks to prevent a receive starting and moving
You can’t perform that action at this time.
0 commit comments