Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5266c79
Add async producer with metrics
k-raina Sep 9, 2025
d03a13c
Add producer strategy
k-raina Sep 9, 2025
c4306ea
Fix decorator
k-raina Sep 9, 2025
91998de
Try await in AsyncStrategy Produce.produce call
k-raina Sep 10, 2025
469a6b4
Rearchetect AIOProducer
k-raina Sep 10, 2025
6e9d67e
Pass producer config to strategy
k-raina Sep 11, 2025
63bfdd7
Pass polling interval
k-raina Sep 11, 2025
77315ec
Address feedback
k-raina Sep 11, 2025
762ebaa
Address feedback
k-raina Sep 11, 2025
babeabc
Temp commit to check latency of adding function to threadpool
k-raina Sep 12, 2025
be25ab4
Add produce batch api to producer
k-raina Sep 15, 2025
3aa9656
Implement AIO with produce_batch
k-raina Sep 15, 2025
1a1b26f
Add buffer timeout code
k-raina Sep 18, 2025
8e1f2c4
Reorganise code
k-raina Sep 18, 2025
ce534cf
Add capability of sync and async callbacks
k-raina Sep 18, 2025
287026d
Add unit tests
k-raina Sep 18, 2025
7ac679c
Add code comments for better understanding
k-raina Sep 18, 2025
ffadbaa
Add per message callback and callback pool
k-raina Sep 18, 2025
b066a15
Move batch to Producerbatchprocessor
k-raina Sep 18, 2025
80a0696
Minor
k-raina Sep 18, 2025
efcf6d4
Per message callbacks should be called for failed messages in batch
k-raina Sep 19, 2025
9014db9
refactor: extract callback handling into separate AsyncCallbackHandle…
k-raina Sep 19, 2025
1c36d3d
refactor: inject AsyncCallbackHandler into CallbackPool for better in…
k-raina Sep 19, 2025
35edc28
refactor: extract KafkaBatchExecutor for dedicated Kafka operations
k-raina Sep 19, 2025
fd772b4
refactor: eliminate AIOProducer dependency from BatchProcessor for tr…
k-raina Sep 19, 2025
215379d
refactor: complete Steps 5-7 - BufferManager, MessageBatch, and clean…
k-raina Sep 19, 2025
fe0df1c
refactor: rename BufferManager to BufferTimeoutManager for clarity
k-raina Sep 19, 2025
c44fbe8
refactor: organize producer components into dedicated src/aio/produce…
k-raina Sep 19, 2025
1562df2
refactor: merge CallbackPool and AsyncCallbackHandler into unified Ca…
k-raina Sep 19, 2025
2ab6208
Cleanup architecture
k-raina Sep 19, 2025
e91f163
refactor: rename classes for better clarity and consistency
k-raina Sep 19, 2025
e839926
Clean meta files
k-raina Sep 19, 2025
6a236e5
cleanup: remove duplicate test files in tests/aio/producer
k-raina Sep 19, 2025
b779fb8
cleanup: remove unrelated test result files and improve gitignore
k-raina Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ venv_examples
.coverage
**/coverage.xml
**/test-report.xml
.ducktape
results
47 changes: 47 additions & 0 deletions aio_producer_simple_diagram.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# AIOProducer Simple Class Diagram

```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ AIOProducer │ │BufferTimeout │ │ MessageBatch │
│ (Orchestrator) │ │ Manager │ │ (Value Object) │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ • produce() │───▶│ • timeout │ │ • immutable │
│ • flush() │ │ monitoring │ │ • type safe │
│ • orchestrate │ │ • mark_activity │ │ • clean data │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ ▲
▼ ▼ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ProducerBatch │ │ProducerBatch │ │CallbackManager │
│ Manager │───▶│ Executor │ │ (Unified Mgmt) │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ • create_batches│ │ • execute_batch │ │ • sync/async │
│ • group topics │ │ • thread pool │ │ • object pool │
│ • manage buffer │ │ • poll results │ │ • event loop │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│create_message │ │ ReusableMessage │
│ _batch() │ │ Callback Pool │
├─────────────────┤ ├─────────────────┤
│ • factory func │ │ • pooled objs │
│ • type safety │◄─────────────────────────│ • auto-return │
│ • validation │ │ • thread safe │
└─────────────────┘ └─────────────────┘
```

## Architecture Summary

**7 Components Total:**
- **1 Orchestrator**: AIOProducer (main API)
- **3 Core Services**: ProducerBatchManager, ProducerBatchExecutor, BufferTimeoutManager
- **1 Unified Manager**: CallbackManager (merged handler + pool)
- **2 Data Objects**: MessageBatch + factory function

**Key Benefits:**
- ✅ Single responsibility per component
- ✅ Clean dependency injection
- ✅ Unified callback management
- ✅ Immutable data structures
- ✅ Performance optimized pooling
9 changes: 1 addition & 8 deletions src/confluent_kafka/aio/_AIOConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,7 @@ def __init__(self, consumer_conf, max_workers=2, executor=None):
self._consumer = confluent_kafka.Consumer(consumer_conf)

async def _call(self, blocking_task, *args, **kwargs):
return (await asyncio.gather(
asyncio.get_running_loop().run_in_executor(self.executor,
functools.partial(
blocking_task,
*args,
**kwargs))

))[0]
return await _common.async_call(self.executor, blocking_task, *args, **kwargs)

def _wrap_callback(self, loop, callback, edit_args=None, edit_kwargs=None):
def ret(*args, **kwargs):
Expand Down
106 changes: 0 additions & 106 deletions src/confluent_kafka/aio/_AIOProducer.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/confluent_kafka/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# limitations under the License.

from ._AIOConsumer import AIOConsumer
from ._AIOProducer import AIOProducer
from .producer import AIOProducer
__all__ = ['AIOConsumer', 'AIOProducer']
21 changes: 21 additions & 0 deletions src/confluent_kafka/aio/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import functools


class AsyncLogger:
Expand Down Expand Up @@ -48,6 +49,26 @@ def wrap_conf_logger(loop, conf):
conf['logger'] = AsyncLogger(loop, conf['logger'])


async def async_call(executor, blocking_task, *args, **kwargs):
"""Helper function for blocking operations that need ThreadPool execution

Args:
executor: ThreadPoolExecutor to use for blocking operations
blocking_task: The blocking function to execute
*args, **kwargs: Arguments to pass to the blocking function

Returns:
Result of the blocking function execution
"""
return (await asyncio.gather(
asyncio.get_running_loop().run_in_executor(executor,
functools.partial(
blocking_task,
*args,
**kwargs))
))[0]


def wrap_common_callbacks(loop, conf):
wrap_conf_callback(loop, conf, 'error_cb')
wrap_conf_callback(loop, conf, 'throttle_cb')
Expand Down
Loading