Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections.abc import Callable
from collections.abc import Iterable
from typing import Any
from typing import Literal

from aiokafka import AIOKafkaConsumer
from aiokafka import AIOKafkaProducer
Expand All @@ -23,14 +24,22 @@
logger = logging.getLogger("amgi-aiokafka.error")


AutoOffsetReset = Literal["earliest", "latest", "none"]


def run(
app: AMGIApplication,
*topics: str,
bootstrap_servers: str | list[str] = "localhost",
group_id: str | None = None,
auto_offset_reset: AutoOffsetReset = "latest",
) -> None:
server = Server(
app, *topics, bootstrap_servers=bootstrap_servers, group_id=group_id
app,
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset=auto_offset_reset,
)
server_serve(server)

Expand All @@ -40,12 +49,14 @@ def _run_cli(
topics: list[str],
bootstrap_servers: list[str] | None = None,
group_id: str | None = None,
auto_offset_reset: AutoOffsetReset = "latest",
) -> None:
run(
app,
*topics,
bootstrap_servers=bootstrap_servers or ["localhost"],
group_id=group_id,
auto_offset_reset=auto_offset_reset,
)


Expand Down Expand Up @@ -100,11 +111,13 @@ def __init__(
*topics: str,
bootstrap_servers: str | list[str],
group_id: str | None,
auto_offset_reset: AutoOffsetReset = "latest",
) -> None:
self._app = app
self._topics = topics
self._bootstrap_servers = bootstrap_servers
self._group_id = group_id
self._auto_offset_reset = auto_offset_reset
self._ackable_consumer = self._group_id is not None
self._producer: AIOKafkaProducer | None = None
self._producer_lock = Lock()
Expand All @@ -116,6 +129,7 @@ async def serve(self) -> None:
bootstrap_servers=self._bootstrap_servers,
group_id=self._group_id,
enable_auto_commit=False,
auto_offset_reset=self._auto_offset_reset,
)
async with self._consumer:
async with Lifespan(self._app) as state:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import pytest
from aiokafka import AIOKafkaConsumer
from aiokafka import AIOKafkaProducer
from aiokafka.admin import AIOKafkaAdminClient
from aiokafka.admin import NewTopic
from aiokafka.errors import TopicAlreadyExistsError
from amgi_aiokafka import _run_cli
from amgi_aiokafka import run
from amgi_aiokafka import Server
Expand All @@ -24,34 +27,60 @@ def bootstrap_server(kafka_container: KafkaContainer) -> str:
return kafka_container.get_bootstrap_server() # type: ignore


async def create_topic(bootstrap_server: str, topic: str) -> str:
admin = AIOKafkaAdminClient(bootstrap_servers=bootstrap_server)
await admin.start()

try:
await admin.create_topics(
[NewTopic(name=topic, num_partitions=1, replication_factor=1)]
)
except TopicAlreadyExistsError: # pragma: no cover
pass
finally:
await admin.close()

return topic


@pytest.fixture
def topic() -> str:
return f"receive-{uuid4()}"
async def receive_topic(bootstrap_server: str) -> str:
return await create_topic(bootstrap_server, f"receive-{uuid4()}")


@pytest.fixture
async def app(bootstrap_server: str, topic: str) -> AsyncGenerator[MockApp, None]:
async def send_topic(bootstrap_server: str) -> str:
return await create_topic(bootstrap_server, f"send-{uuid4()}")


@pytest.fixture
async def app(
bootstrap_server: str, receive_topic: str
) -> AsyncGenerator[MockApp, None]:
app = MockApp()
server = Server(
app,
topic,
receive_topic,
bootstrap_servers=bootstrap_server,
group_id=str(uuid4()),
auto_offset_reset="earliest",
)

async with app.lifespan(server=server):
yield app


@pytest.mark.integration
async def test_message(bootstrap_server: str, app: MockApp, topic: str) -> None:
async def test_message(bootstrap_server: str, app: MockApp, receive_topic: str) -> None:
producer = AIOKafkaProducer(bootstrap_servers=bootstrap_server)
await producer.start()

await producer.send_and_wait(topic, b"value", b"key", headers=[("test", b"test")])
await producer.send_and_wait(
receive_topic, b"value", b"key", headers=[("test", b"test")]
)
async with app.call() as (scope, receive, send):
assert scope == {
"address": topic,
"address": receive_topic,
"amgi": {"spec_version": "1.0", "version": "1.0"},
"type": "message",
"state": {},
Expand All @@ -61,7 +90,7 @@ async def test_message(bootstrap_server: str, app: MockApp, topic: str) -> None:
assert message_receive["type"] == "message.receive"
assert message_receive == {
"headers": [(b"test", b"test")],
"id": f"{topic}:0:0",
"id": f"{receive_topic}:0:0",
"more_messages": False,
"payload": b"value",
"bindings": {"kafka": {"key": b"key"}},
Expand All @@ -78,12 +107,13 @@ async def test_message(bootstrap_server: str, app: MockApp, topic: str) -> None:


@pytest.mark.integration
async def test_message_send(bootstrap_server: str, app: MockApp, topic: str) -> None:
async def test_message_send(
bootstrap_server: str, app: MockApp, receive_topic: str, send_topic: str
) -> None:
producer = AIOKafkaProducer(bootstrap_servers=bootstrap_server)
await producer.start()

await producer.send_and_wait(topic, b"")
send_topic = f"send-{uuid4()}"
await producer.send_and_wait(receive_topic, b"")

async with AIOKafkaConsumer(
send_topic, bootstrap_servers=bootstrap_server
Expand All @@ -108,13 +138,12 @@ async def test_message_send(bootstrap_server: str, app: MockApp, topic: str) ->

@pytest.mark.integration
async def test_message_send_kafka_key(
bootstrap_server: str, app: MockApp, topic: str
bootstrap_server: str, app: MockApp, receive_topic: str, send_topic: str
) -> None:
producer = AIOKafkaProducer(bootstrap_servers=bootstrap_server)
await producer.start()

await producer.send_and_wait(topic, b"")
send_topic = f"send-{uuid4()}"
await producer.send_and_wait(receive_topic, b"")

async with AIOKafkaConsumer(
send_topic, bootstrap_servers=bootstrap_server
Expand All @@ -137,11 +166,11 @@ async def test_message_send_kafka_key(


@pytest.mark.integration
async def test_lifespan(bootstrap_server: str, topic: str) -> None:
async def test_lifespan(bootstrap_server: str, receive_topic: str) -> None:
app = MockApp()
server = Server(
app,
topic,
receive_topic,
bootstrap_servers=bootstrap_server,
group_id=None,
)
Expand All @@ -152,23 +181,25 @@ async def test_lifespan(bootstrap_server: str, topic: str) -> None:

async with app.lifespan({"item": state_item}, server):
await producer.send_and_wait(
topic,
receive_topic,
b"",
)
async with app.call() as (scope, receive, send):
assert scope == {
"address": topic,
"address": receive_topic,
"amgi": {"spec_version": "1.0", "version": "1.0"},
"type": "message",
"state": {"item": state_item},
}


@pytest.mark.integration
def test_run(bootstrap_server: str, topic: str) -> None:
assert_run_can_terminate(run, topic, bootstrap_servers=bootstrap_server)
def test_run(bootstrap_server: str, receive_topic: str) -> None:
assert_run_can_terminate(run, receive_topic, bootstrap_servers=bootstrap_server)


@pytest.mark.integration
def test_run_cli(bootstrap_server: str, topic: str) -> None:
assert_run_can_terminate(_run_cli, [topic], bootstrap_servers=bootstrap_server)
def test_run_cli(bootstrap_server: str, receive_topic: str) -> None:
assert_run_can_terminate(
_run_cli, [receive_topic], bootstrap_servers=bootstrap_server
)