1+ from __future__ import annotations
2+
13import collections
24import copy
35import logging
46import threading
57import time
68from concurrent .futures import Future
7- from typing import Optional , Set
9+ from typing import Any , Callable , Dict , Iterable , Optional , Set , Tuple , Union
10+
11+ from typing_extensions import TypeAlias , TypedDict
812
913from aiokafka import errors as Errors
14+ from aiokafka .client import CoordinationType
1015from aiokafka .conn import collect_hosts
16+ from aiokafka .protocol .commit import (
17+ GroupCoordinatorResponse_v0 ,
18+ GroupCoordinatorResponse_v1 ,
19+ )
20+ from aiokafka .protocol .metadata import (
21+ MetadataResponse_v0 ,
22+ MetadataResponse_v1 ,
23+ MetadataResponse_v2 ,
24+ MetadataResponse_v3 ,
25+ MetadataResponse_v4 ,
26+ MetadataResponse_v5 ,
27+ )
1128from aiokafka .structs import BrokerMetadata , PartitionMetadata , TopicPartition
1229
30+ ListenerCallable : TypeAlias = Callable [["ClusterMetadata" ], None ]
31+ NodeId : TypeAlias = Union [str , int ]
32+
1333log = logging .getLogger (__name__ )
1434
1535
36+ class Config (TypedDict ):
37+ retry_backoff_ms : float
38+ metadata_max_age_ms : float
39+ bootstrap_servers : Union [str , Iterable [str ]]
40+
41+
1642class ClusterMetadata :
1743 """
1844 A class to manage kafka cluster metadata.
@@ -35,61 +61,67 @@ class ClusterMetadata:
3561 specified, will default to localhost:9092.
3662 """
3763
38- DEFAULT_CONFIG = {
64+ DEFAULT_CONFIG : Config = {
3965 "retry_backoff_ms" : 100 ,
4066 "metadata_max_age_ms" : 300000 ,
4167 "bootstrap_servers" : [],
4268 }
4369
44- def __init__ (self , ** configs ):
45- self ._brokers = {} # node_id -> BrokerMetadata
46- self ._partitions = {} # topic -> partition -> PartitionMetadata
70+ def __init__ (self , ** configs : Any ) -> None :
71+ self ._brokers : Dict [NodeId , BrokerMetadata ] = {} # node_id -> BrokerMetadata
72+ self ._partitions : Dict [
73+ str , Dict [int , PartitionMetadata ]
74+ ] = {} # topic -> partition -> PartitionMetadata
4775 # node_id -> {TopicPartition...}
48- self ._broker_partitions = collections .defaultdict (set )
49- self ._groups = {} # group_name -> node_id
50- self ._last_refresh_ms = 0
51- self ._last_successful_refresh_ms = 0
76+ self ._broker_partitions : Dict [NodeId , Set [TopicPartition ]] = (
77+ collections .defaultdict (set )
78+ )
79+ self ._groups : Dict [str , NodeId ] = {} # group_name -> node_id
80+ self ._last_refresh_ms : float = 0
81+ self ._last_successful_refresh_ms : float = 0
5282 self ._need_update = True
53- self ._future = None
54- self ._listeners = set ()
83+ self ._future : Optional [ Future [ ClusterMetadata ]] = None
84+ self ._listeners : Set [ ListenerCallable ] = set ()
5585 self ._lock = threading .Lock ()
5686 self .need_all_topic_metadata = False
57- self .unauthorized_topics = set ()
58- self .internal_topics = set ()
59- self .controller = None
87+ self .unauthorized_topics : Set [ str ] = set ()
88+ self .internal_topics : Set [ str ] = set ()
89+ self .controller : Optional [ BrokerMetadata ] = None
6090
61- self .config = copy .copy (self .DEFAULT_CONFIG )
91+ self .config : Config = copy .copy (self .DEFAULT_CONFIG )
6292 for key in self .config :
6393 if key in configs :
64- self .config [key ] = configs [key ]
94+ self .config [key ] = configs [key ] # type: ignore[literal-required]
6595
66- self ._bootstrap_brokers = self ._generate_bootstrap_brokers ()
67- self ._coordinator_brokers = {}
68- self ._coordinators = {}
69- self ._coordinator_by_key = {}
96+ self ._bootstrap_brokers : Dict [NodeId , BrokerMetadata ] = (
97+ self ._generate_bootstrap_brokers ()
98+ )
99+ self ._coordinator_brokers : Dict [NodeId , BrokerMetadata ] = {}
100+ self ._coordinators : Dict [NodeId , BrokerMetadata ] = {}
101+ self ._coordinator_by_key : Dict [Tuple [CoordinationType , str ], NodeId ] = {}
70102
71- def _generate_bootstrap_brokers (self ):
103+ def _generate_bootstrap_brokers (self ) -> Dict [ NodeId , BrokerMetadata ] :
72104 # collect_hosts does not perform DNS, so we should be fine to re-use
73105 bootstrap_hosts = collect_hosts (self .config ["bootstrap_servers" ])
74106
75- brokers = {}
107+ brokers : Dict [ NodeId , BrokerMetadata ] = {}
76108 for i , (host , port , _ ) in enumerate (bootstrap_hosts ):
77109 node_id = "bootstrap-%s" % i
78110 brokers [node_id ] = BrokerMetadata (node_id , host , port , None )
79111 return brokers
80112
81- def is_bootstrap (self , node_id ) :
113+ def is_bootstrap (self , node_id : NodeId ) -> bool :
82114 return node_id in self ._bootstrap_brokers
83115
84- def brokers (self ):
116+ def brokers (self ) -> Set [ BrokerMetadata ] :
85117 """Get all BrokerMetadata
86118
87119 Returns:
88120 set: {BrokerMetadata, ...}
89121 """
90122 return set (self ._brokers .values ()) or set (self ._bootstrap_brokers .values ())
91123
92- def broker_metadata (self , broker_id ) :
124+ def broker_metadata (self , broker_id : NodeId ) -> Optional [ BrokerMetadata ] :
93125 """Get BrokerMetadata
94126
95127 Arguments:
@@ -117,7 +149,7 @@ def partitions_for_topic(self, topic: str) -> Optional[Set[int]]:
117149 return None
118150 return set (self ._partitions [topic ].keys ())
119151
120- def available_partitions_for_topic (self , topic ) :
152+ def available_partitions_for_topic (self , topic : str ) -> Optional [ Set [ int ]] :
121153 """Return set of partitions with known leaders
122154
123155 Arguments:
@@ -135,7 +167,7 @@ def available_partitions_for_topic(self, topic):
135167 if metadata .leader != - 1
136168 }
137169
138- def leader_for_partition (self , partition ) :
170+ def leader_for_partition (self , partition : PartitionMetadata ) -> Optional [ int ] :
139171 """Return node_id of leader, -1 unavailable, None if unknown."""
140172 if partition .topic not in self ._partitions :
141173 return None
@@ -144,7 +176,7 @@ def leader_for_partition(self, partition):
144176 return None
145177 return partitions [partition .partition ].leader
146178
147- def partitions_for_broker (self , broker_id ) :
179+ def partitions_for_broker (self , broker_id : NodeId ) -> Optional [ Set [ TopicPartition ]] :
148180 """Return TopicPartitions for which the broker is a leader.
149181
150182 Arguments:
@@ -156,7 +188,7 @@ def partitions_for_broker(self, broker_id):
156188 """
157189 return self ._broker_partitions .get (broker_id )
158190
159- def coordinator_for_group (self , group ) :
191+ def coordinator_for_group (self , group : str ) -> Optional [ NodeId ] :
160192 """Return node_id of group coordinator.
161193
162194 Arguments:
@@ -168,7 +200,7 @@ def coordinator_for_group(self, group):
168200 """
169201 return self ._groups .get (group )
170202
171- def request_update (self ):
203+ def request_update (self ) -> Future [ ClusterMetadata ] :
172204 """Flags metadata for update, return Future()
173205
174206 Actual update must be handled separately. This method will only
@@ -179,11 +211,11 @@ def request_update(self):
179211 """
180212 with self ._lock :
181213 self ._need_update = True
182- if not self ._future or self ._future .is_done :
214+ if not self ._future or self ._future .done () :
183215 self ._future = Future ()
184216 return self ._future
185217
186- def topics (self , exclude_internal_topics = True ):
218+ def topics (self , exclude_internal_topics : bool = True ) -> Set [ str ] :
187219 """Get set of known topics.
188220
189221 Arguments:
@@ -201,18 +233,28 @@ def topics(self, exclude_internal_topics=True):
201233 else :
202234 return topics
203235
204- def failed_update (self , exception ) :
236+ def failed_update (self , exception : Exception ) -> None :
205237 """Update cluster state given a failed MetadataRequest."""
206238 f = None
207239 with self ._lock :
208240 if self ._future :
209241 f = self ._future
210242 self ._future = None
211243 if f :
212- f .failure (exception )
244+ f .set_exception (exception )
213245 self ._last_refresh_ms = time .time () * 1000
214246
215- def update_metadata (self , metadata ):
247+ def update_metadata (
248+ self ,
249+ metadata : Union [
250+ MetadataResponse_v0 ,
251+ MetadataResponse_v1 ,
252+ MetadataResponse_v2 ,
253+ MetadataResponse_v3 ,
254+ MetadataResponse_v4 ,
255+ MetadataResponse_v5 ,
256+ ],
257+ ) -> None :
216258 """Update cluster state given a MetadataResponse.
217259
218260 Arguments:
@@ -225,27 +267,29 @@ def update_metadata(self, metadata):
225267 self .failed_update (Errors .MetadataEmptyBrokerList (metadata ))
226268 return
227269
228- _new_brokers = {}
270+ _new_brokers : Dict [ NodeId , BrokerMetadata ] = {}
229271 for broker in metadata .brokers :
230- if metadata . API_VERSION == 0 :
272+ if isinstance ( metadata , MetadataResponse_v0 ) :
231273 node_id , host , port = broker
232274 rack = None
233275 else :
234276 node_id , host , port , rack = broker
235277 _new_brokers .update ({node_id : BrokerMetadata (node_id , host , port , rack )})
236278
237- if metadata . API_VERSION == 0 :
279+ if isinstance ( metadata , MetadataResponse_v0 ) :
238280 _new_controller = None
239281 else :
240282 _new_controller = _new_brokers .get (metadata .controller_id )
241283
242- _new_partitions = {}
243- _new_broker_partitions = collections .defaultdict (set )
244- _new_unauthorized_topics = set ()
245- _new_internal_topics = set ()
284+ _new_partitions : Dict [str , Dict [int , PartitionMetadata ]] = {}
285+ _new_broker_partitions : Dict [NodeId , Set [TopicPartition ]] = (
286+ collections .defaultdict (set )
287+ )
288+ _new_unauthorized_topics : Set [str ] = set ()
289+ _new_internal_topics : Set [str ] = set ()
246290
247291 for topic_data in metadata .topics :
248- if metadata . API_VERSION == 0 :
292+ if isinstance ( metadata , MetadataResponse_v0 ) :
249293 error_code , topic , partitions = topic_data
250294 is_internal = False
251295 else :
@@ -307,7 +351,7 @@ def update_metadata(self, metadata):
307351 self ._last_successful_refresh_ms = now
308352
309353 if f :
310- f .success (self )
354+ f .set_result (self )
311355 log .debug ("Updated cluster metadata to %s" , self )
312356
313357 for listener in self ._listeners :
@@ -320,15 +364,19 @@ def update_metadata(self, metadata):
320364 # another fetch should be unnecessary.
321365 self ._need_update = False
322366
323- def add_listener (self , listener ) :
367+ def add_listener (self , listener : ListenerCallable ) -> None :
324368 """Add a callback function to be called on each metadata update"""
325369 self ._listeners .add (listener )
326370
327- def remove_listener (self , listener ) :
371+ def remove_listener (self , listener : ListenerCallable ) -> None :
328372 """Remove a previously added listener callback"""
329373 self ._listeners .remove (listener )
330374
331- def add_group_coordinator (self , group , response ):
375+ def add_group_coordinator (
376+ self ,
377+ group : str ,
378+ response : Union [GroupCoordinatorResponse_v0 , GroupCoordinatorResponse_v1 ],
379+ ) -> Optional [str ]:
332380 """Update with metadata for a group coordinator
333381
334382 Arguments:
@@ -355,7 +403,9 @@ def add_group_coordinator(self, group, response):
355403 self ._groups [group ] = node_id
356404 return node_id
357405
358- def with_partitions (self , partitions_to_add ):
406+ def with_partitions (
407+ self , partitions_to_add : Iterable [PartitionMetadata ]
408+ ) -> ClusterMetadata :
359409 """Returns a copy of cluster metadata with partitions added"""
360410 new_metadata = ClusterMetadata (** self .config )
361411 new_metadata ._brokers = copy .deepcopy (self ._brokers )
@@ -375,10 +425,18 @@ def with_partitions(self, partitions_to_add):
375425
376426 return new_metadata
377427
378- def coordinator_metadata (self , node_id ) :
428+ def coordinator_metadata (self , node_id : NodeId ) -> Optional [ BrokerMetadata ] :
379429 return self ._coordinators .get (node_id )
380430
381- def add_coordinator (self , node_id , host , port , rack = None , * , purpose ):
431+ def add_coordinator (
432+ self ,
433+ node_id : int ,
434+ host : str ,
435+ port : int ,
436+ rack : Optional [str ] = None ,
437+ * ,
438+ purpose : Tuple [CoordinationType , str ],
439+ ) -> None :
382440 """Keep track of all coordinator nodes separately and remove them if
383441 a new one was elected for the same purpose (For example group
384442 coordinator for group X).
@@ -390,7 +448,7 @@ def add_coordinator(self, node_id, host, port, rack=None, *, purpose):
390448 self ._coordinators [node_id ] = BrokerMetadata (node_id , host , port , rack )
391449 self ._coordinator_by_key [purpose ] = node_id
392450
393- def __str__ (self ):
451+ def __str__ (self ) -> str :
394452 return "ClusterMetadata(brokers: %d, topics: %d, groups: %d)" % (
395453 len (self ._brokers ),
396454 len (self ._partitions ),
0 commit comments