|
1 | 1 | # This is a helper module to obfuscate types used by different broker implementations.
|
| 2 | +from collections import namedtuple |
2 | 3 | from typing import Union, Dict, Tuple, Type
|
3 | 4 |
|
4 | 5 | import redis
|
|
18 | 19 | PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline]
|
19 | 20 | SentinelType = Union[redis.sentinel.Sentinel, valkey.sentinel.Sentinel]
|
20 | 21 |
|
21 |
| -BrokerMetaData: Dict[Tuple[Broker, bool], Tuple[Type[ConnectionType], Type[SentinelType], str]] = { |
| 22 | +BrokerMetaDataType = namedtuple("BrokerMetaDataType", ["connection_type", "sentinel_type", "ssl_prefix"]) |
| 23 | + |
| 24 | +BrokerMetaData: Dict[Tuple[Broker, bool], BrokerMetaDataType] = { |
22 | 25 | # Map of (Broker, Strict flag) => Connection Class, Sentinel Class, SSL Connection Prefix
|
23 |
| - (Broker.REDIS, False): (redis.Redis, redis.sentinel.Sentinel, "rediss"), |
24 |
| - (Broker.VALKEY, False): (valkey.Valkey, valkey.sentinel.Sentinel, "valkeys"), |
25 |
| - (Broker.REDIS, True): (redis.StrictRedis, redis.sentinel.Sentinel, "rediss"), |
26 |
| - (Broker.VALKEY, True): (valkey.StrictValkey, valkey.sentinel.Sentinel, "valkeys"), |
| 26 | + (Broker.REDIS, False): BrokerMetaDataType(redis.Redis, redis.sentinel.Sentinel, "rediss"), |
| 27 | + (Broker.VALKEY, False): BrokerMetaDataType(valkey.Valkey, valkey.sentinel.Sentinel, "valkeys"), |
| 28 | + (Broker.REDIS, True): BrokerMetaDataType(redis.StrictRedis, redis.sentinel.Sentinel, "rediss"), |
| 29 | + (Broker.VALKEY, True): BrokerMetaDataType(valkey.StrictValkey, valkey.sentinel.Sentinel, "valkeys"), |
27 | 30 | }
|
0 commit comments