Skip to content

Commit 8e1ef1c

Browse files
committed
Disable group_expiry, regardless of the spec
See django/channels#1371 for rationale. Also, document "normal channels" (i.e., job queues) and what users should do if they want job queues. [closes #18]
1 parent 9471da2 commit 8e1ef1c

File tree

5 files changed

+82
-33
lines changed

5 files changed

+82
-33
lines changed

CHANGELOG.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
v1.2.0 - 2019-12-30
2+
~~~~~~~~~~~~~~~~~~~
3+
4+
* Support Python 3.8 (upgrade to aioamqp=0.14.0) [issue #20] [PR #21]
5+
* Deprecate ``group_expiry`` [issue #18] [PR #19]. Rationale:
6+
https://github.com/django/channels/issues/1371
7+
18
v1.1.5 - 2019-08-22
29
~~~~~~~~~~~~~~~~~~~
310

README.rst

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,6 @@ Defaults to ``60``. You generally shouldn't need to change this, but you may
4444
want to turn it down if you have peaky traffic you wish to drop, or up if you
4545
have peaky traffic you want to backlog until you get to it.
4646

47-
``group_expiry``
48-
~~~~~~~~~~~~~~~~
49-
50-
Group expiry in seconds. Defaults to ``86400``. Channels will be removed from
51-
the group after this amount of time. It's recommended that you increase this
52-
parameter to ``86400000`` (1 year) and rely on explicit ``group_discard()`` to
53-
cancel subscriptions. (If your process halts, the group membership will
54-
disappear from RabbitMQ immediately: you needn't worry about leaks.)
55-
5647
``local_capacity``
5748
~~~~~~~~~~~~~~~~~~
5849

@@ -149,6 +140,52 @@ messages from RabbitMQ and routes them to receiver queues; each ``receive()``
149140
queries receiver queues. Empty queues are deleted. TODO delete queues that
150141
only contain expired messages, so we don't leak when sending to dead channels.
151142

143+
Deviations from the Channel Layer Specification
144+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
145+
146+
The `Channel Layer Specification
147+
<https://channels.readthedocs.io/en/latest/channel_layer_spec.html>`_ bends to
148+
Redis-related restrictions. RabbitMQ cannot emulate Redis. Here are the
149+
differences:
150+
151+
* **No ``flush`` extension**: To flush all state, simply disconnect all clients.
152+
(RabbitMQ won't allow one client to delete another client's data structures.)
153+
* **No ``group_expiry`` option**: the `group_expiry option
154+
<https://channels.readthedocs.io/en/latest/channel_layer_spec.html#persistence>`_
155+
recovers when a ``group_add()`` has no matching ``group_discard()``. But the
156+
"group membership expiry" logic has a fatal flaw: it disconnects legitimate
157+
members. ``channels_rabbitmq`` addresses the root problems directly:
158+
* Web-server crash: RabbitMQ cleans all traces of a web server when it
159+
disconnects. There's no problem here for ``group_expiry`` to solve.
160+
* Programming errors: You may err and call ``group_add()`` without
161+
eventually calling ``group_discard()``. Redis can't detect this
162+
programming error (because it can't detect web-server crashes.) RabbitMQ
163+
can. The ``local_expiry`` option keeps your site running when you
164+
erroneously miss a ``group_discard()``. The channel layer warns when
165+
discarding expired messages. Monitor your server logs to detect your
166+
errors.
167+
* **No "normal channels"**: `normal channels
168+
<https://channels.readthedocs.io/en/latest/channel_layer_spec.html#channels>`_
169+
are job queues. In most projects, "normal channel" readers are worker
170+
processes, ideally divorced from Websockets and Django.
171+
172+
You are welcome to submit a ``channels_rabbitmq`` pull request to support this
173+
nigh-undocumented aspect of the Channel Layer Specification. But why reinvent
174+
the wheel? There are thousands of job-queue implementations out there already.
175+
Django Channels is a bad fit, because it is tuned for Websockets.
176+
177+
If you want an async, RabbitMQ-based job queue, investigate `aiormq
178+
<https://github.com/mosquito/aiormq>`_ and `aioamqp
179+
<https://github.com/polyconseil/aioamqp>`_. You can even send your jobs
180+
to a separate RabbitMQ server or virtual host.
181+
182+
Currently, this project's strategy is to wait for `Celery 5.0.0
183+
<https://github.com/celery/celery/milestone/7>`_, evaluate it, and then
184+
recommend an alternative to "normal channels." (With Celery 4, it's
185+
inefficient for workers to send messages to the Django Channels layer, because
186+
they need to launch a new event loop and RabbitMQ connection per task.
187+
Celery 5 may fix this.)
188+
152189
Dependencies
153190
------------
154191

channels_rabbitmq/connection.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,13 @@ async def get(self):
135135

136136
return item
137137

138-
def __init__(self, loop, capacity, local_expiry, group_expiry):
138+
def __init__(self, loop, capacity, local_expiry):
139139
self.loop = loop
140140
self.capacity = capacity
141141
self.local_expiry = local_expiry
142-
self.group_expiry = group_expiry
143142
self.n = 0
144143

145-
self.local_groups = defaultdict(dict) # group => {channel => expire}
144+
self.local_groups = {} # group => {channel, ...}
146145
self._out = defaultdict(lambda: MultiQueue.OutQueue(self))
147146
self._closed = asyncio.Event(loop=loop)
148147
self._putter_wakeup = asyncio.Event(loop=loop)
@@ -293,8 +292,8 @@ def group_add(self, group, asgi_channel):
293292
if self._closed.is_set():
294293
return None
295294

296-
channels = self.local_groups[group] # may create set
297-
channels[asgi_channel] = time.time() + self.group_expiry
295+
channels = self.local_groups.setdefault(group, set())
296+
channels.add(asgi_channel)
298297
return len(channels)
299298

300299
def group_discard(self, group, asgi_channel):
@@ -311,20 +310,10 @@ def group_discard(self, group, asgi_channel):
311310

312311
channels = self.local_groups[group]
313312
try:
314-
del channels[asgi_channel]
313+
channels.remove(asgi_channel)
315314
except KeyError:
316315
return None # it was already removed
317316

318-
# Discard stale group memberships. These will happen if a
319-
# group_add() has no matching group_discard(). We only discard
320-
# within this one group because after we've discarded memberships,
321-
# the caller needs to check whether it should unbind from RabbitMQ.
322-
other_keys = list(channels.keys())
323-
now = time.time()
324-
for other_key in other_keys:
325-
if channels[other_key] < now:
326-
del channels[other_key]
327-
328317
ret = len(channels)
329318

330319
if ret == 0:
@@ -445,7 +434,6 @@ def __init__(
445434
prefetch_count=10,
446435
expiry=60,
447436
local_expiry=None,
448-
group_expiry=86400,
449437
ssl_context=None,
450438
groups_exchange="groups",
451439
):
@@ -458,17 +446,14 @@ def __init__(
458446
self.prefetch_count = prefetch_count
459447
self.expiry = expiry
460448
self.local_expiry = local_expiry
461-
self.group_expiry = group_expiry
462449
self.queue_name = queue_name
463450
self.ssl_context = ssl_context
464451
self.groups_exchange = groups_exchange
465452

466453
# incoming_messages: await `get()` on any channel-name queue to receive
467454
# the next message. If the `get()` is canceled, that's probably because
468455
# the caller is going away: we'll delete the queue in that case.
469-
self._incoming_messages = MultiQueue(
470-
loop, local_capacity, local_expiry, group_expiry
471-
)
456+
self._incoming_messages = MultiQueue(loop, local_capacity, local_expiry)
472457

473458
# pending_puts: a "purgatory" for messages as we put them into
474459
# incoming_messages.

channels_rabbitmq/core.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import string
44
import threading
55
import types
6+
import warnings
67

78
from channels.layers import BaseChannelLayer
89

@@ -50,7 +51,7 @@ def __init__(
5051
prefetch_count=10,
5152
expiry=60,
5253
local_expiry=None,
53-
group_expiry=86400,
54+
group_expiry=None,
5455
ssl_context=None,
5556
groups_exchange="groups",
5657
):
@@ -60,10 +61,19 @@ def __init__(
6061
self.prefetch_count = prefetch_count
6162
self.expiry = expiry
6263
self.local_expiry = local_expiry
63-
self.group_expiry = 86400
6464
self.ssl_context = ssl_context
6565
self.groups_exchange = groups_exchange
6666

67+
if group_expiry is not None:
68+
warnings.warn(
69+
(
70+
"channels_rabbitmq does not support group_expiry. Please do not configure it. "
71+
"For rationale, see "
72+
"https://github.com/CJWorkbench/channels_rabbitmq/issues/18#issuecomment-547052373"
73+
),
74+
category=DeprecationWarning,
75+
)
76+
6777
# In inefficient client code (e.g., async_to_sync()), there may be
6878
# several send() or receive() calls within different event loops --
6979
# meaning callers might be coming from different threads at the same
@@ -94,7 +104,6 @@ def _create_connection(self, loop):
94104
prefetch_count=self.prefetch_count,
95105
expiry=self.expiry,
96106
local_expiry=self.local_expiry,
97-
group_expiry=self.group_expiry,
98107
ssl_context=self.ssl_context,
99108
groups_exchange=self.groups_exchange,
100109
)

tests/test_core.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,14 @@ def run():
153153
thread = threading.Thread(target=run, daemon=True)
154154
thread.start()
155155
thread.join()
156+
157+
158+
def test_warn_when_group_expiry_set():
159+
with pytest.warns(DeprecationWarning) as record:
160+
RabbitmqChannelLayer(host=HOST, group_expiry=86400)
161+
assert str(record[0].message) == (
162+
"channels_rabbitmq does not support group_expiry. Please do not configure it. "
163+
"For rationale, see "
164+
"https://github.com/CJWorkbench/channels_rabbitmq/issues/18#issuecomment-547052373"
165+
)
166+
assert len(record) == 1

0 commit comments

Comments
 (0)