@@ -179,6 +179,40 @@ class UnsupportedRedis(Exception):
179
179
pass
180
180
181
181
182
+ class ExpiringCache (collections .defaultdict ):
183
+ def __init__ (self , default , ttl = 60 , * args , ** kw ):
184
+ collections .defaultdict .__init__ (self , default )
185
+ self ._expires = collections .OrderedDict ()
186
+ self .ttl = ttl
187
+
188
+ def __setitem__ (self , k , v ):
189
+ collections .defaultdict .__setitem__ (self , k , v )
190
+ self ._expires [k ] = time .time () + self .ttl
191
+
192
+ def __delitem__ (self , k ):
193
+ try :
194
+ collections .defaultdict .__delitem__ (self , k )
195
+ except KeyError :
196
+ # RedisChannelLayer itself _does_ periodically clean up this
197
+ # dictionary (e.g., when exceptions like asyncio.CancelledError
198
+ # occur)
199
+ pass
200
+
201
+ def expire (self ):
202
+ expired = []
203
+ for k in self ._expires .keys ():
204
+ if self ._expires [k ] < time .time ():
205
+ expired .append (k )
206
+ else :
207
+ # as this is an OrderedDict, every key after this
208
+ # was inserted *later*, so if _this_ key is *not* expired,
209
+ # the ones after it aren't either (so we can stop iterating)
210
+ break
211
+ for k in expired :
212
+ del self ._expires [k ]
213
+ del self [k ]
214
+
215
+
182
216
class RedisChannelLayer (BaseChannelLayer ):
183
217
"""
184
218
Redis channel layer.
@@ -226,7 +260,7 @@ def __init__(
226
260
# Event loop they are trying to receive on
227
261
self .receive_event_loop = None
228
262
# Buffered messages by process-local channel name
229
- self .receive_buffer = collections . defaultdict (asyncio .Queue )
263
+ self .receive_buffer = ExpiringCache (asyncio .Queue , ttl = self . expiry )
230
264
# Detached channel cleanup tasks
231
265
self .receive_cleaners = []
232
266
# Per-channel cleanup locks to prevent a receive starting and moving
@@ -616,6 +650,7 @@ async def group_discard(self, group, channel):
616
650
key = self ._group_key (group )
617
651
async with self .connection (self .consistent_hash (group )) as connection :
618
652
await connection .zrem (key , channel )
653
+ self .receive_buffer .expire ()
619
654
620
655
async def group_send (self , group , message ):
621
656
"""
0 commit comments