Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ venv_examples
.coverage
**/coverage.xml
**/test-report.xml
*.ducktape
.ducktape/
24 changes: 24 additions & 0 deletions src/confluent_kafka/aio/_AIOConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,33 @@ def ret(*args, **kwargs):
return ret

async def poll(self, *args, **kwargs):
"""
Polls for a single message from the subscribed topics.

Performance Note:
For high-throughput applications, prefer consume() over poll():
consume() can retrieve multiple messages per call and amortize the async
overhead across the entire batch.

On the other hand, poll() retrieves one message per call, which means
the ThreadPoolExecutor overhead is applied to each individual message.
This can result inlower throughput compared to the synchronous consumer.poll()
due tothe async coordination overhead not being amortized.

"""
return await self._call(self._consumer.poll, *args, **kwargs)

async def consume(self, *args, **kwargs):
"""
Consumes a batch of messages from the subscribed topics.

Performance Note:
This method is recommended for high-throughput applications.

By retrieving multiple messages per ThreadPoolExecutor call, the async
coordination overhead is shared across all messages in the batch,
resulting in much better throughput compared to repeated poll() calls.
"""
return await self._call(self._consumer.consume, *args, **kwargs)

def _edit_rebalance_callbacks_args(self, args):
Expand Down
4 changes: 1 addition & 3 deletions tests/ducktape/benchmark_metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Benchmark metrics collection and validation for Kafka performance testing.
Producer benchmark metrics collection and validation for Kafka performance testing.

Implements comprehensive metrics tracking including latency percentiles,
per-topic/partition breakdowns, memory monitoring, and batch efficiency analysis.
Expand All @@ -25,8 +25,6 @@ def __init__(self):
self.messages_sent = 0
self.messages_delivered = 0
self.messages_failed = 0

# Latency tracking
self.delivery_latencies = [] # in milliseconds

# Data tracking
Expand Down
Loading