11
11
import types
12
12
import uuid
13
13
14
- import aioredis
14
+ from redis import asyncio as aioredis
15
15
import msgpack
16
16
17
17
from channels .exceptions import ChannelFull
21
21
22
22
logger = logging .getLogger (__name__ )
23
23
24
- AIOREDIS_VERSION = tuple (map (int , aioredis .__version__ .split ("." )))
25
-
26
24
27
25
def _wrap_close (loop , pool ):
28
26
"""
@@ -49,8 +47,9 @@ class ConnectionPool:
49
47
"""
50
48
51
49
def __init__ (self , host ):
52
- self .host = host .copy ()
53
- self .master_name = self .host .pop ("master_name" , None )
50
+ self .host = host
51
+ # TODO: re-add support for master_name
52
+ self .master_name = None # self.host.pop("master_name", None)
54
53
self .conn_map = {}
55
54
self .sentinel_map = {}
56
55
self .in_use = {}
@@ -72,11 +71,11 @@ def _ensure_loop(self, loop):
72
71
73
72
async def create_conn (self , loop ):
74
73
# One connection per pool since we are emulating a single connection
75
- kwargs = {"minsize " : 1 , "maxsize" : 1 , ** self . host }
76
- if not ( sys .version_info >= (3 , 8 , 0 ) and AIOREDIS_VERSION >= ( 1 , 3 , 1 ) ):
77
- kwargs ["loop" ] = loop
74
+ kwargs = {"max_connections " : 1 }
75
+ # if not sys.version_info >= (3, 8, 0):
76
+ # kwargs["loop"] = loop
78
77
if self .master_name is None :
79
- return await aioredis .create_redis_pool ( ** kwargs )
78
+ return aioredis .ConnectionPool . from_url ( self . host , ** kwargs )
80
79
else :
81
80
kwargs = {"timeout" : 2 , ** kwargs } # aioredis default is way too low
82
81
sentinel = await aioredis .sentinel .create_sentinel (** kwargs )
@@ -93,7 +92,9 @@ async def pop(self, loop=None):
93
92
conn = await self .create_conn (loop )
94
93
conns .append (conn )
95
94
conn = conns .pop ()
96
- if conn .closed :
95
+ # Redis ConnectionPool has no closed attribute
96
+ # if conn.closed:
97
+ if False :
97
98
conn = await self .pop (loop = loop )
98
99
return conn
99
100
self .in_use [conn ] = loop
@@ -131,8 +132,7 @@ async def _close_conn(self, conn, sentinel_map=None):
131
132
sentinel_map [conn ].close ()
132
133
await sentinel_map [conn ].wait_closed ()
133
134
del sentinel_map [conn ]
134
- conn .close ()
135
- await conn .wait_closed ()
135
+ await conn .disconnect ()
136
136
137
137
async def close_loop (self , loop ):
138
138
"""
@@ -279,7 +279,7 @@ def decode_hosts(self, hosts):
279
279
"""
280
280
# If no hosts were provided, return a default value
281
281
if not hosts :
282
- return [{ "address" : ( " localhost" , 6379 )} ]
282
+ return ["redis:// localhost: 6379" ]
283
283
# If they provided just a string, scold them.
284
284
if isinstance (hosts , (str , bytes )):
285
285
raise ValueError (
@@ -289,10 +289,11 @@ def decode_hosts(self, hosts):
289
289
# Decode each hosts entry into a kwargs dict
290
290
result = []
291
291
for entry in hosts :
292
+ # TODO: re-add support for dict-based connections
292
293
if isinstance (entry , dict ):
293
294
result .append (entry )
294
295
else :
295
- result .append ({ "address" : entry } )
296
+ result .append (entry )
296
297
return result
297
298
298
299
def _setup_encryption (self , symmetric_encryption_keys ):
@@ -348,11 +349,11 @@ async def send(self, channel, message):
348
349
349
350
# Check the length of the list before send
350
351
# This can allow the list to leak slightly over capacity, but that's fine.
351
- if await connection .zcount (channel_key ) >= self .get_capacity (channel ):
352
+ if await connection .zcount (channel_key , "-inf" , "+inf" ) >= self .get_capacity (channel ):
352
353
raise ChannelFull ()
353
354
354
355
# Push onto the list then set it to expire in case it's not consumed
355
- await connection .zadd (channel_key , time . time (), self . serialize ( message ) )
356
+ await connection .zadd (channel_key , { self . serialize ( message ): time . time ()} )
356
357
await connection .expire (channel_key , int (self .expiry ))
357
358
358
359
def _backup_channel_name (self , channel ):
@@ -380,13 +381,13 @@ async def _brpop_with_clean(self, index, channel, timeout):
380
381
async with self .connection (index ) as connection :
381
382
# Cancellation here doesn't matter, we're not doing anything destructive
382
383
# and the script executes atomically...
383
- await connection .eval (cleanup_script , keys = [], args = [ channel , backup_queue ] )
384
+ await connection .eval (cleanup_script , 0 , channel , backup_queue )
384
385
# ...and it doesn't matter here either, the message will be safe in the backup.
385
386
result = await connection .bzpopmin (channel , timeout = timeout )
386
387
387
388
if result is not None :
388
389
_ , member , timestamp = result
389
- await connection .zadd (backup_queue , float (timestamp ), member )
390
+ await connection .zadd (backup_queue , { member : float (timestamp )} )
390
391
else :
391
392
member = None
392
393
@@ -610,7 +611,7 @@ async def flush(self):
610
611
# Go through each connection and remove all with prefix
611
612
for i in range (self .ring_size ):
612
613
async with self .connection (i ) as connection :
613
- await connection .eval (delete_prefix , keys = [], args = [ self .prefix + "*" ] )
614
+ await connection .eval (delete_prefix , 0 , self .prefix + "*" )
614
615
# Now clear the pools as well
615
616
await self .close_pools ()
616
617
@@ -645,7 +646,7 @@ async def group_add(self, group, channel):
645
646
group_key = self ._group_key (group )
646
647
async with self .connection (self .consistent_hash (group )) as connection :
647
648
# Add to group sorted set with creation time as timestamp
648
- await connection .zadd (group_key , time .time (), channel )
649
+ await connection .zadd (group_key , { channel : time .time ()} )
649
650
# Set expiration to be group_expiry, since everything in
650
651
# it at this point is guaranteed to expire before that
651
652
await connection .expire (group_key , self .group_expiry )
@@ -730,7 +731,7 @@ async def group_send(self, group, message):
730
731
# channel_keys does not contain a single redis key more than once
731
732
async with self .connection (connection_index ) as connection :
732
733
channels_over_capacity = await connection .eval (
733
- group_send_lua , keys = channel_redis_keys , args = args
734
+ group_send_lua , len ( channel_redis_keys ), * channel_redis_keys , * args
734
735
)
735
736
if channels_over_capacity > 0 :
736
737
logger .info (
@@ -900,7 +901,7 @@ def __init__(self, pool):
900
901
901
902
async def __aenter__ (self ):
902
903
self .conn = await self .pool .pop ()
903
- return self .conn
904
+ return aioredis . Redis ( connection_pool = self .conn )
904
905
905
906
async def __aexit__ (self , exc_type , exc , tb ):
906
907
if exc :
0 commit comments