@@ -1646,7 +1646,7 @@ class ClusterPubSub(PubSub):
1646
1646
https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
1647
1647
"""
1648
1648
1649
- def __init__ (self , redis_cluster , node = None , host = None , port = None , replica = False , ** kwargs ):
1649
+ def __init__ (self , redis_cluster , node = None , host = None , port = None , ** kwargs ):
1650
1650
"""
1651
1651
When a pubsub instance is created without specifying a node, a single
1652
1652
node will be transparently chosen for the pubsub connection on the
@@ -1661,7 +1661,6 @@ def __init__(self, redis_cluster, node=None, host=None, port=None, replica=False
1661
1661
:type port: int
1662
1662
"""
1663
1663
self .node = None
1664
- self .replica = replica
1665
1664
self .set_pubsub_node (redis_cluster , node , host , port )
1666
1665
connection_pool = (
1667
1666
None
@@ -1795,8 +1794,16 @@ def get_sharded_message(
1795
1794
if message ["channel" ] in self .pending_unsubscribe_shard_channels :
1796
1795
self .pending_unsubscribe_shard_channels .remove (message ["channel" ])
1797
1796
self .shard_channels .pop (message ["channel" ], None )
1798
- node = self .cluster .get_node_from_key (message ["channel" ], self .replica )
1799
- if self .node_pubsub_mapping [node .name ].subscribed is False :
1797
+ slot = self .cluster .keyslot (message ["channel" ])
1798
+ slot_cache = self .cluster .nodes_manager .slots_cache .get (slot )
1799
+ if slot_cache is None or len (slot_cache ) == 0 :
1800
+ raise SlotNotCoveredError (f'Slot "{ slot } " is not covered by the cluster.' )
1801
+ for node in slot_cache :
1802
+ p = self .node_pubsub_mapping .get (node .name )
1803
+ if p is None :
1804
+ continue
1805
+ if p .subscribed is not False :
1806
+ continue
1800
1807
self .node_pubsub_mapping .pop (node .name )
1801
1808
if not self .channels and not self .patterns and not self .shard_channels :
1802
1809
# There are no subscriptions anymore, set subscribed_event flag
@@ -1812,7 +1819,10 @@ def ssubscribe(self, *args, **kwargs):
1812
1819
s_channels = dict .fromkeys (args )
1813
1820
s_channels .update (kwargs )
1814
1821
for s_channel , handler in s_channels .items ():
1815
- node = self .cluster .get_node_from_key (s_channel , self .replica )
1822
+ slot = self .cluster .keyslot (s_channel )
1823
+ node = self .cluster .nodes_manager .get_node_from_slot (
1824
+ slot , self .cluster .read_from_replicas
1825
+ )
1816
1826
pubsub = self ._get_node_pubsub (node )
1817
1827
if handler :
1818
1828
pubsub .ssubscribe (** {s_channel : handler })
@@ -1833,12 +1843,20 @@ def sunsubscribe(self, *args):
1833
1843
args = self .shard_channels
1834
1844
1835
1845
for s_channel in args :
1836
- node = self .cluster .get_node_from_key (s_channel , self .replica )
1837
- p = self ._get_node_pubsub (node )
1838
- p .sunsubscribe (s_channel )
1839
- self .pending_unsubscribe_shard_channels .update (
1840
- p .pending_unsubscribe_shard_channels
1841
- )
1846
+ slot = self .cluster .keyslot (s_channel )
1847
+ slot_cache = self .cluster .nodes_manager .slots_cache .get (slot )
1848
+ if slot_cache is None or len (slot_cache ) == 0 :
1849
+ raise SlotNotCoveredError (f'Slot "{ slot } " is not covered by the cluster.' )
1850
+ for node in slot_cache :
1851
+ p = self .node_pubsub_mapping .get (node .name )
1852
+ if p is None :
1853
+ continue
1854
+ if s_channel not in p .shard_channels :
1855
+ continue
1856
+ p .sunsubscribe (s_channel )
1857
+ self .pending_unsubscribe_shard_channels .update (
1858
+ p .pending_unsubscribe_shard_channels
1859
+ )
1842
1860
1843
1861
def get_redis_connection (self ):
1844
1862
"""
0 commit comments