Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ class ThrottleEvent(object):
:ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
"""

def __init__(self, broker_name,
broker_id,
throttle_time):
def __init__(self, broker_name: str,
broker_id: int,
throttle_time: float) -> None:
self.broker_name = broker_name
self.broker_id = broker_id
self.throttle_time = throttle_time

def __str__(self):
def __str__(self) -> str:
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
int(self.throttle_time * 1000))


def _resolve_plugins(plugins):
def _resolve_plugins(plugins: str) -> str:
""" Resolve embedded plugins from the wheel's library directory.

For internal module use only.
Expand Down
20 changes: 11 additions & 9 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional, Any
from enum import Enum
from .. import cimpl
from ..cimpl import TopicPartition


class Node:
Expand All @@ -35,14 +37,14 @@ class Node:
The rack for this node.
"""

def __init__(self, id, host, port, rack=None):
def __init__(self, id: int, host: str, port: int, rack: Optional[str] = None) -> None:
self.id = id
self.id_string = str(id)
self.host = host
self.port = port
self.rack = rack

def __str__(self):
def __str__(self) -> str:
return f"({self.id}) {self.host}:{self.port} {f'(Rack - {self.rack})' if self.rack else ''}"


Expand All @@ -60,7 +62,7 @@ class ConsumerGroupTopicPartitions:
List of topic partitions information.
"""

def __init__(self, group_id, topic_partitions=None):
def __init__(self, group_id: str, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
self.group_id = group_id
self.topic_partitions = topic_partitions

Expand Down Expand Up @@ -89,7 +91,7 @@ class ConsumerGroupState(Enum):
#: Consumer Group is empty.
EMPTY = cimpl.CONSUMER_GROUP_STATE_EMPTY

def __lt__(self, other):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
Expand All @@ -109,7 +111,7 @@ class ConsumerGroupType(Enum):
#: Classic Type
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC

def __lt__(self, other):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
Expand All @@ -126,7 +128,7 @@ class TopicCollection:
List of topic names.
"""

def __init__(self, topic_names):
def __init__(self, topic_names: List[str]) -> None:
self.topic_names = topic_names


Expand All @@ -147,7 +149,7 @@ class TopicPartitionInfo:
In-Sync-Replica brokers for the partition.
"""

def __init__(self, id, leader, replicas, isr):
def __init__(self, id: int, leader: Node, replicas: List[Node], isr: List[Node]) -> None:
self.id = id
self.leader = leader
self.replicas = replicas
Expand All @@ -165,7 +167,7 @@ class IsolationLevel(Enum):
READ_UNCOMMITTED = cimpl.ISOLATION_LEVEL_READ_UNCOMMITTED #: Receive all the offsets.
READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction.

def __lt__(self, other):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
Expand All @@ -184,7 +186,7 @@ class ElectionType(Enum):
#: Unclean election
UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN

def __lt__(self, other):
def __lt__(self, other) -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
44 changes: 44 additions & 0 deletions src/confluent_kafka/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2025 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Common type definitions for confluent_kafka package.

This module provides centralized type aliases to maintain DRY principle
and ensure consistency across the package.
"""

from typing import Any, Optional, Dict, Union, Callable, List, Tuple

# Configuration dictionary type
ConfigDict = Dict[str, Union[str, int, float, bool]]

# Headers can be either dict format or list of tuples format
HeadersType = Union[
Dict[str, Union[str, bytes, None]],
List[Tuple[str, Union[str, bytes, None]]]
]

# Serializer/Deserializer callback types (will need SerializationContext import where used)
Serializer = Callable[[Any, Any], bytes] # (obj, SerializationContext) -> bytes
Deserializer = Callable[[Optional[bytes], Any], Any] # (Optional[bytes], SerializationContext) -> obj

# Forward declarations for callback types that reference classes from cimpl
# These are defined here to avoid circular imports
DeliveryCallback = Callable[[Optional[Any], Any], None] # (KafkaError, Message) -> None
RebalanceCallback = Callable[[Any, List[Any]], None] # (Consumer, List[TopicPartition]) -> None
3 changes: 2 additions & 1 deletion src/confluent_kafka/_util/conversion_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union, Type
from enum import Enum


class ConversionUtil:
@staticmethod
def convert_to_enum(val, enum_clazz):
def convert_to_enum(val: Union[str, int, Enum], enum_clazz: Type[Enum]) -> Enum:
if type(enum_clazz) is not type(Enum):
raise TypeError("'enum_clazz' must be of type Enum")

Expand Down
13 changes: 7 additions & 6 deletions src/confluent_kafka/_util/validation_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, List
from ..cimpl import KafkaError

try:
Expand All @@ -22,35 +23,35 @@

class ValidationUtil:
@staticmethod
def check_multiple_not_none(obj, vars_to_check):
def check_multiple_not_none(obj: Any, vars_to_check: List[str]) -> None:
for param in vars_to_check:
ValidationUtil.check_not_none(obj, param)

@staticmethod
def check_not_none(obj, param):
def check_not_none(obj: Any, param: str) -> None:
if getattr(obj, param) is None:
raise ValueError("Expected %s to be not None" % (param,))

@staticmethod
def check_multiple_is_string(obj, vars_to_check):
def check_multiple_is_string(obj: Any, vars_to_check: List[str]) -> None:
for param in vars_to_check:
ValidationUtil.check_is_string(obj, param)

@staticmethod
def check_is_string(obj, param):
def check_is_string(obj: Any, param: str) -> None:
param_value = getattr(obj, param)
if param_value is not None and not isinstance(param_value, string_type):
raise TypeError("Expected %s to be a string" % (param,))

@staticmethod
def check_kafka_errors(errors):
def check_kafka_errors(errors: List[KafkaError]) -> None:
if not isinstance(errors, list):
raise TypeError("errors should be None or a list")
for error in errors:
if not isinstance(error, KafkaError):
raise TypeError("Expected list of KafkaError")

@staticmethod
def check_kafka_error(error):
def check_kafka_error(error: KafkaError) -> None:
if not isinstance(error, KafkaError):
raise TypeError("Expected error to be a KafkaError")
39 changes: 21 additions & 18 deletions src/confluent_kafka/admin/_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, List, Dict, Union, Tuple
from enum import Enum
import functools

from .. import cimpl as _cimpl
from ._resource import ResourceType, ResourcePatternType
from .._util import ValidationUtil, ConversionUtil
Expand Down Expand Up @@ -42,7 +44,7 @@ class AclOperation(Enum):
ALTER_CONFIGS = _cimpl.ACL_OPERATION_ALTER_CONFIGS #: ALTER_CONFIGS operation
IDEMPOTENT_WRITE = _cimpl.ACL_OPERATION_IDEMPOTENT_WRITE #: IDEMPOTENT_WRITE operation

def __lt__(self, other):
def __lt__(self, other: 'AclOperation') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
Expand All @@ -57,7 +59,7 @@ class AclPermissionType(Enum):
DENY = _cimpl.ACL_PERMISSION_TYPE_DENY #: Disallows access
ALLOW = _cimpl.ACL_PERMISSION_TYPE_ALLOW #: Grants access

def __lt__(self, other):
def __lt__(self, other: 'AclPermissionType') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
Expand Down Expand Up @@ -89,9 +91,10 @@ class AclBinding(object):
The permission type for the specified operation.
"""

def __init__(self, restype, name,
resource_pattern_type, principal, host,
operation, permission_type):
def __init__(self, restype: Union[ResourceType, str, int], name: str,
resource_pattern_type: Union[ResourcePatternType, str, int], principal: str, host: str,
operation: Union[AclOperation, str, int],
permission_type: Union[AclPermissionType, str, int]) -> None:
self.restype = restype
self.name = name
self.resource_pattern_type = resource_pattern_type
Expand All @@ -106,7 +109,7 @@ def __init__(self, restype, name,
self.operation_int = int(self.operation.value)
self.permission_type_int = int(self.permission_type.value)

def _convert_enums(self):
def _convert_enums(self) -> None:
self.restype = ConversionUtil.convert_to_enum(self.restype, ResourceType)
self.resource_pattern_type = ConversionUtil.convert_to_enum(
self.resource_pattern_type, ResourcePatternType)
Expand All @@ -115,20 +118,20 @@ def _convert_enums(self):
self.permission_type = ConversionUtil.convert_to_enum(
self.permission_type, AclPermissionType)

def _check_forbidden_enums(self, forbidden_enums):
def _check_forbidden_enums(self, forbidden_enums: Dict[str, List[Enum]]) -> None:
for k, v in forbidden_enums.items():
enum_value = getattr(self, k)
if enum_value in v:
raise ValueError("Cannot use enum %s, value %s in this class" % (k, enum_value.name))

def _not_none_args(self):
def _not_none_args(self) -> List[str]:
return ["restype", "name", "resource_pattern_type",
"principal", "host", "operation", "permission_type"]

def _string_args(self):
def _string_args(self) -> List[str]:
return ["name", "principal", "host"]

def _forbidden_enums(self):
def _forbidden_enums(self) -> Dict[str, List[Enum]]:
return {
"restype": [ResourceType.ANY],
"resource_pattern_type": [ResourcePatternType.ANY,
Expand All @@ -137,7 +140,7 @@ def _forbidden_enums(self):
"permission_type": [AclPermissionType.ANY]
}

def _convert_args(self):
def _convert_args(self) -> None:
not_none_args = self._not_none_args()
string_args = self._string_args()
forbidden_enums = self._forbidden_enums()
Expand All @@ -146,24 +149,24 @@ def _convert_args(self):
self._convert_enums()
self._check_forbidden_enums(forbidden_enums)

def __repr__(self):
def __repr__(self) -> str:
type_name = type(self).__name__
return "%s(%s,%s,%s,%s,%s,%s,%s)" % ((type_name,) + self._to_tuple())

def _to_tuple(self):
def _to_tuple(self) -> Tuple[ResourceType, str, ResourcePatternType, str, str, AclOperation, AclPermissionType]:
return (self.restype, self.name, self.resource_pattern_type,
self.principal, self.host, self.operation,
self.permission_type)

def __hash__(self):
def __hash__(self) -> int:
return hash(self._to_tuple())

def __lt__(self, other):
def __lt__(self, other: 'AclBinding') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self._to_tuple() < other._to_tuple()

def __eq__(self, other):
def __eq__(self, other: 'AclBinding') -> Any:
if self.__class__ != other.__class__:
return NotImplemented
return self._to_tuple() == other._to_tuple()
Expand Down Expand Up @@ -194,11 +197,11 @@ class AclBindingFilter(AclBinding):
The permission type to match or :attr:`AclPermissionType.ANY` to match any value.
"""

def _not_none_args(self):
def _not_none_args(self) -> List[str]:
return ["restype", "resource_pattern_type",
"operation", "permission_type"]

def _forbidden_enums(self):
def _forbidden_enums(self) -> Dict[str, List[Enum]]:
return {
"restype": [ResourceType.UNKNOWN],
"resource_pattern_type": [ResourcePatternType.UNKNOWN],
Expand Down
5 changes: 4 additions & 1 deletion src/confluent_kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional, Union

from .._util import ConversionUtil
from .._model import Node
from ._acl import AclOperation


Expand All @@ -34,7 +36,8 @@ class DescribeClusterResult:
AclOperations allowed for the cluster.
"""

def __init__(self, controller, nodes, cluster_id=None, authorized_operations=None):
def __init__(self, controller: Node, nodes: List[Node], cluster_id: Optional[str] = None,
authorized_operations: Optional[List[Union[str, int, AclOperation]]] = None) -> None:
self.cluster_id = cluster_id
self.controller = controller
self.nodes = nodes
Expand Down
Loading