|
7 | 7 | import logging |
8 | 8 | import random |
9 | 9 | import time |
10 | | -import types |
11 | 10 | import uuid |
12 | 11 |
|
13 | 12 | import msgpack |
|
21 | 20 | logger = logging.getLogger(__name__) |
22 | 21 |
|
23 | 22 |
|
24 | | -def _wrap_close(loop, pool): |
25 | | - """ |
26 | | - Decorate an event loop's close method with our own. |
27 | | - """ |
28 | | - original_impl = loop.close |
29 | | - |
30 | | - def _wrapper(self, *args, **kwargs): |
31 | | - # If the event loop was closed, there's nothing we can do anymore. |
32 | | - if not self.is_closed(): |
33 | | - self.run_until_complete(pool.close_loop(self)) |
34 | | - # Restore the original close() implementation after we're done. |
35 | | - self.close = original_impl |
36 | | - return self.close(*args, **kwargs) |
37 | | - |
38 | | - loop.close = types.MethodType(_wrapper, loop) |
39 | | - |
40 | | - |
41 | 23 | class ChannelLock: |
42 | 24 | """ |
43 | 25 | Helper class for per-channel locking. |
@@ -74,10 +56,6 @@ def release(self, channel): |
74 | 56 | del self.wait_counts[channel] |
75 | 57 |
|
76 | 58 |
|
77 | | -class UnsupportedRedis(Exception): |
78 | | - pass |
79 | | - |
80 | | - |
81 | 59 | class BoundedQueue(asyncio.Queue): |
82 | 60 | def put_nowait(self, item): |
83 | 61 | if self.full(): |
@@ -635,41 +613,6 @@ async def group_send(self, group, message): |
635 | 613 | group, |
636 | 614 | ) |
637 | 615 |
|
638 | | - def _map_channel_to_connection(self, channel_names, message): |
639 | | - """ |
640 | | - For a list of channel names, bucket each one to a dict keyed by the |
641 | | - connection index |
642 | | - Also for each channel create a message specific to that channel, adding |
643 | | - the __asgi_channel__ key to the message |
644 | | - We also return a mapping from channel names to their corresponding Redis |
645 | | - keys, and a mapping of channels to their capacity |
646 | | - """ |
647 | | - connection_to_channels = collections.defaultdict(list) |
648 | | - channel_to_message = dict() |
649 | | - channel_to_capacity = dict() |
650 | | - channel_to_key = dict() |
651 | | - |
652 | | - for channel in channel_names: |
653 | | - channel_non_local_name = channel |
654 | | - if "!" in channel: |
655 | | - message = dict(message.items()) |
656 | | - message["__asgi_channel__"] = channel |
657 | | - channel_non_local_name = self.non_local_name(channel) |
658 | | - channel_key = self.prefix + channel_non_local_name |
659 | | - idx = self.consistent_hash(channel_non_local_name) |
660 | | - connection_to_channels[idx].append(channel_key) |
661 | | - channel_to_capacity[channel] = self.get_capacity(channel) |
662 | | - channel_to_message[channel] = self.serialize(message) |
663 | | - # We build a |
664 | | - channel_to_key[channel] = channel_key |
665 | | - |
666 | | - return ( |
667 | | - connection_to_channels, |
668 | | - channel_to_message, |
669 | | - channel_to_capacity, |
670 | | - channel_to_key, |
671 | | - ) |
672 | | - |
673 | 616 | def _map_channel_keys_to_connection(self, channel_names, message): |
674 | 617 | """ |
675 | 618 | For a list of channel names, GET |
|
0 commit comments