-
Notifications
You must be signed in to change notification settings - Fork 197
Migrate from aioredis to redis-py #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I have no idea what I'm doing here, but these changes got a good chunk of the test suite passing. 😄 I'm a little confused on how the connection pools are used and could use some guidance. |
💪 |
@Andrew-Chen-Wang — Don't suppose you have a cycle to look at this do you? 👀 (Some informed input will speed us over the line 🙂) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
@carltongibson thanks for the ping. So far, it LGTM!
Obviously it's WIP so there are still some places of 1.3.1 that are not converted yet (like self._receiver
). One concern I do have is that, although we have a solution underway to fix the performance bottleneck, I'd still highly encourage that aioredis 1.3.1 maintenance is still guaranteed for now.
That is, unless you think it would be too much of a burden to maintain both versions or channels_redis doesn't really get that many features, so 1.3.1 can be dropped.
I'm having trouble getting the pub/sub tests to run locally. It just hangs when it gets to those tests. When I kill the process, I get the message:
Any tips? |
I think that's the way... — if folks need to hang-out with aioredis 1.3.1, they can defer updating
@acu192 — Do you have a cycle to checkout this branch and advice ref @ipmb's question here ? Thanks! |
Hi @ipmb, thanks for working on this! I haven't dug deep, but at first glance it looks like redis-py does PubSub differently (see these docs). That is, this line from self._receiver = aioredis.pubsub.Receiver(on_close=on_close_noop) And that error, being deep inside the event loop somewhere, is not propagating all the way to the main thread so you're not seeing it. Again, that's just a theory on my part, not confirmed. Anyway, then the tests never finish because somewhere somebody is I suspect once you port the receiver code, it may all start working. |
Getting there. If anybody who has a better understanding of sentinel could take a look that would be great. There is an issue in a bunch of tests with how we're trying to release connections ( |
@acu192 any thoughts on this #296 (comment)? |
Hi @ipmb, it was actually @qeternity who contributed the Sentinel connection feature (see this commit). Maybe @qeternity can see what is going on here? PS: Thank you @ipmb for starting this work. It is VERY much appreciated. Does it seems these Sentinel issues are the final issues from having all tests passing? |
Hi all - catching up here. First off, definitely a big thanks to @ipmb for taking this up. My memory is a bit hazy at this point, but there was an issue with using the native pools (a sentinel connection is a pool that maintains a pubsub connection to sentinels for changes, and then a pool of regular redis connections to redis nodes). The (hacky) workaround was to get/put connections manually from the pool, which is what you're seeing. I would not emulate this behavior, and I suspect that redis-py does not have the weird corner cases that the old and unmaintained aioredis 1.3.x did so hopefully we can dispense with this. |
You're referring to this code @qeternity ? channels_redis/channels_redis/core.py Lines 87 to 110 in bba9319
|
@ipmb the core layer definitely should not manage its own pool (if memory serves, it has no functionality to ever decrease pool size). But in the pubsub layer as well, we should avoid getting/putting raw connections where possible. I haven't spent any time playing with your branch, but I can't see why we wouldn't want to have one native redis-py pool per event loop and let redis-py manage things from there. This would also give us one common interface between redis and sentinel connections, and may even allow supporting redis cluster (need to look at redis-py api closer). |
if self._pub_conn is not None and self._pub_conn.closed: | ||
self._put_redis_conn(self._pub_conn) | ||
if self._pub_conn is not None and self._pub_conn.connection is None: | ||
await self._put_redis_conn(self._pub_conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me if this check is still necessary now that we are using the built-in pool functionality from redis-py.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are using the native pools functionality, this should not be necessary.
We should only be concerning ourselves with the creation and destruction of pools at that point, and redis-py can handle everything else.
Ok, I removed all the connection pool stuff and am pretty close to a passing test suite. Locally I get one failure ( I split out a new I still don't have a deep understanding of all this and mostly got here by banging it into submission with a hammer, so a review from someone more knowledgeable would be good. 😄 |
Good work @ipmb — thanks. (Sorry for the slow pick-up. I hope to swing back to channels once Django 4.1a1 is out the door...) |
@ipmb I'm not seeing a new layer for sentinel...has this changed since your last commit? Ideally we would have a single layer for all redis types. I could be wrong, but I think once you get past the redis-py connection initialization, the pool API is the same. So we should just create a regular/sentinel/cluster pool on init, and then channels can be naive about which redis implementation is actually being used behind the scenes. |
@qeternity this is the Sentinel layer channels_redis/channels_redis/core.py Lines 774 to 791 in 9f2efcb
I also have one working for pubsub, but it requires changing the order of the class definitions which makes the diff harder to read. I'd like to get confirmation/sign-off on this before making that change. |
It made sense to me to use the same pattern as redis-py where you choose the appropriate class (ConnectionPool or SentinelConnectionPool) and pass in the required arguments (which are very different). This would also make type definitions easier in the future instead of accepting arguments in two different formats. It's easy to switch back if you prefer to keep the single class, lmk. |
When might we be able to get an update on the above question? |
It would be incredible to close this issue. |
@Luferov It would! Are you able to pick it up? It's on my list, but a contribution would be even better! 🏅 |
Unfortunately, no. But I am very interested in the development of this library. If I were a maintainer, I would develop this library, but alas. |
Firstly, my intention is not to hijack. 99.99% of the work here was done by the author not me. https://github.com/bbrowning918/channels_redis/tree/migrate-aioredis-to-redis-py
I spent more time than I would like to admit chasing what I now believe to be the underlying memory leak this library migration aims to fix so I am eager and willing to help. |
Thanks @bbrowning918! I merged your changes into this PR. The tests pass with the exception of the two ======================================================= FAILURES =======================================================
_________________________________________________ test_receive_cancel __________________________________________________
channel_layer = <channels_redis.core.RedisChannelLayer object at 0x1023eb0a0>
@pytest.mark.asyncio
async def test_receive_cancel(channel_layer):
"""
Makes sure we can cancel a receive without blocking
"""
channel_layer = RedisChannelLayer(capacity=20)
channel = await channel_layer.new_channel()
delay = 0
while delay < 0.01:
> await channel_layer.send(channel, {"type": "test.message", "text": "Ahoy-hoy!"})
tests/test_sentinel.py:419:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <channels_redis.core.RedisChannelLayer object at 0x1023eb0a0>
channel = 'specific.8e548be93cd4436db8bd497843f98732!b1c0a3afe4b44ed1aa8a37a99f551e56'
message = {'__asgi_channel__': 'specific.8e548be93cd4436db8bd497843f98732!b1c0a3afe4b44ed1aa8a37a99f551e56', 'text': 'Ahoy-hoy!', 'type': 'test.message'}
async def send(self, channel, message):
"""
Send a message onto a (general or specific) channel.
"""
# Typecheck
assert isinstance(message, dict), "message is not a dict"
assert self.valid_channel_name(channel), "Channel name not valid"
# Make sure the message does not contain reserved keys
assert "__asgi_channel__" not in message
# If it's a process-local channel, strip off local part and stick full name in message
channel_non_local_name = channel
if "!" in channel:
message = dict(message.items())
message["__asgi_channel__"] = channel
channel_non_local_name = self.non_local_name(channel)
# Write out message into expiring key (avoids big items in list)
channel_key = self.prefix + channel_non_local_name
# Pick a connection to the right server - consistent for specific
# channels, random for general channels
if "!" in channel:
index = self.consistent_hash(channel)
else:
index = next(self._send_index_generator)
connection = self.connection(index)
# Discard old messages based on expiry
await connection.zremrangebyscore(
channel_key, min=0, max=int(time.time()) - int(self.expiry)
)
# Check the length of the list before send
# This can allow the list to leak slightly over capacity, but that's fine.
if await connection.zcount(channel_key, "-inf", "+inf") >= self.get_capacity(
channel
):
> raise ChannelFull()
E channels.exceptions.ChannelFull
channels_redis/core.py:251: ChannelFull |
Hey @ipmb.
See #277 (and #229 (comment)) @revoteon had it that 15 was the in-practice limit, but that seems to have gone up (Any thought on a why?) — If you want to suggest a new limit, 40?, that's likely sufficient. |
I confirmed that if I bump it up from Here's what I did locally, diff --git a/tests/test_core.py b/tests/test_core.py
index 9a0d84c..928c94d 100644
--- a/tests/test_core.py
+++ b/tests/test_core.py
@@ -409,7 +409,7 @@ async def test_receive_cancel(channel_layer):
"""
Makes sure we can cancel a receive without blocking
"""
- channel_layer = RedisChannelLayer(capacity=20)
+ channel_layer = RedisChannelLayer(capacity=30)
channel = await channel_layer.new_channel()
delay = 0
while delay < 0.01:
diff --git a/tests/test_sentinel.py b/tests/test_sentinel.py
index a3e990d..4d3c624 100644
--- a/tests/test_sentinel.py
+++ b/tests/test_sentinel.py
@@ -412,7 +412,7 @@ async def test_receive_cancel(channel_layer):
"""
Makes sure we can cancel a receive without blocking
"""
- channel_layer = RedisChannelLayer(capacity=20)
+ channel_layer = RedisChannelLayer(capacity=30)
channel = await channel_layer.new_channel()
delay = 0
while delay < 0.01: |
OK, let's go with that. Thanks! 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, super!
I will roll this for v4b1
to let folks opt-in, and give it a test.
Thanks all!
Here you go: https://pypi.org/project/channels-redis/4.0.0b1/ — please report any issues and we can clean those up before dropping the Thanks @ipmb and @bbrowning918 🎩 |
For folks watching this, there's an issue running tests (with a reproduce) in #312. |
Hey! I get this error with channels_redis==4.1.0 and redis==3.5.3
|
Upgrading redis fixed this issue |
Migrated from aioredis to redis-py (django#296) Co-authored-by: bbrowning918 <[email protected]>
aioredis is no longer maintained and the official library has absorbed its functionality
refs #285