-
Notifications
You must be signed in to change notification settings - Fork 928
Add ducktape benchmark tests for consumer (sync + async) #2045
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: async
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for PR! Added initial review comments.
@@ -0,0 +1,363 @@ | |||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can have same file tests/ducktape/benchmark_metrics.py
for producer and consumer benchmarks. And in bounds.json we can mention default producer and consumer bounds.
Reason : Benchmarks for both producer and consumer should mostly be same i.e "Latency" "Throghput" "Message processed" etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I created this new file to avoid all the merge conflicts we have to deal with otherwise :). I think let's refactor the test suite (consolidating shared code, renaming files properly, etc) once both of our PRs are merged
return messages_consumed | ||
|
||
|
||
class AsyncConsumerStrategy(ConsumerStrategy): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of duplicate code in this and the prior class. Would be nice if we could remove some more of that duplication. Not blocking merge on it though
What
This PR implements comprehensive performance benchmarking for
AIOConsumer
and adds critical performance guidance to help developers optimize their async Kafka consumer applications.What This PR Does
AIOConsumer
with configurable batch sizes (batch_size=[1, 5, 20]
)AIOConsumer.poll()
andconsume()
method docstringsPerformance Results
poll()
consume()
consume()
consume()
poll()
consume()
consume()
consume()
Performance Explanation
The dramatic performance difference stems from
AIOConsumer
's use ofThreadPoolExecutor
to make blocking librdkafka calls async-compatible. For single-message operations (poll()
orconsume(1)
), each message pays the full ThreadPool coordination overhead (~7x slower). However, with larger batch sizes, this overhead is amortized across multiple messages, achieving performance parity with sync consumers atbatch_size=20
.Developer Guidance Added
consume()
withbatch_size >= 20
for optimal async performanceconsume()
withbatch_size=5
for balanced performance (65K msg/s)poll()
for high-throughput scenarios (7x performance penalty)This PR provides developers with concrete, data-driven guidance for optimizing their Kafka consumer performance based on their specific throughput and latency requirements.
Checklist
References
JIRA: https://confluentinc.atlassian.net/browse/DGS-22195
Test & Review
Open questions / Follow-ups