Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ _build

# coverage
.coverage
.coverage.*
coverage.xml
1 change: 1 addition & 0 deletions .python-versions
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
3.11
3.12
3.13
3.14
72 changes: 41 additions & 31 deletions packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
from asyncio import Lock
from collections import deque
Expand All @@ -24,7 +25,7 @@

def run(
app: AMGIApplication,
*topics: Iterable[str],
*topics: str,
bootstrap_servers: str | list[str] = "localhost",
group_id: str | None = None,
) -> None:
Expand Down Expand Up @@ -96,7 +97,7 @@ class Server:
def __init__(
self,
app: AMGIApplication,
*topics: Iterable[str],
*topics: str,
bootstrap_servers: str | list[str],
group_id: str | None,
) -> None:
Expand Down Expand Up @@ -127,35 +128,44 @@ async def _main_loop(self, state: dict[str, Any]) -> None:
async for messages in self._stoppable.call(
self._consumer.getmany, timeout_ms=1000
):
for topic_partition, records in messages.items():
if records:
scope: MessageScope = {
"type": "message",
"amgi": {"version": "1.0", "spec_version": "1.0"},
"address": topic_partition.topic,
"state": state.copy(),
}

message_receive_ids = {
f"{record.topic}:{record.partition}:{record.offset}": {
TopicPartition(
record.topic, record.partition
): record.offset
+ 1
}
for record in records
}

await self._app(
scope,
_Receive(records),
_Send(
self._consumer,
message_receive_ids,
self._message_send,
self._ackable_consumer,
),
)
await asyncio.gather(
*[
self._handle_partition_records(topic_partition, records, state)
for topic_partition, records in messages.items()
]
)

async def _handle_partition_records(
self,
topic_partition: TopicPartition,
records: list[ConsumerRecord],
state: dict[str, Any],
) -> None:
if records:
scope: MessageScope = {
"type": "message",
"amgi": {"version": "1.0", "spec_version": "1.0"},
"address": topic_partition.topic,
"state": state.copy(),
}

message_receive_ids = {
f"{record.topic}:{record.partition}:{record.offset}": {
TopicPartition(record.topic, record.partition): record.offset + 1
}
for record in records
}

await self._app(
scope,
_Receive(records),
_Send(
self._consumer,
message_receive_ids,
self._message_send,
self._ackable_consumer,
),
)

async def _get_producer(self) -> AIOKafkaProducer:
async with self._producer_lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def __init__(
self._endpoint_url = endpoint_url
self._aws_access_key_id = aws_access_key_id
self._aws_secret_access_key = aws_secret_access_key
self._loop = asyncio.get_event_loop()
self._loop = asyncio.new_event_loop()
self._lifespan = lifespan
self._lifespan_context: Lifespan | None = None
self._state: dict[str, Any] = {}
Expand Down
23 changes: 14 additions & 9 deletions packages/test-utils/src/test_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import multiprocessing
import multiprocessing.synchronize
from asyncio import Event
from asyncio import Queue
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from functools import partial
from typing import Any
from typing import Callable
from typing import Optional
Expand Down Expand Up @@ -74,21 +75,25 @@ async def __call__(
await return_event.wait()


async def _app(
scope: Scope,
receive: AMGIReceiveCallable,
send: AMGISendCallable,
lifespan_event: multiprocessing.synchronize.Event,
) -> None:
if scope["type"] == "lifespan":
lifespan_event.set()
raise Exception


def assert_run_can_terminate(
run: Callable[..., None], *args: Any, **kwargs: Any
) -> None:
lifespan_event = multiprocessing.Event()

async def _app(
scope: Scope, receive: AMGIReceiveCallable, send: AMGISendCallable
) -> None:
if scope["type"] == "lifespan":
lifespan_event.set()
raise Exception

process = multiprocessing.Process(
target=run,
args=(_app, *args),
args=(partial(_app, lifespan_event=lifespan_event), *args),
kwargs=kwargs,
)
process.start()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ members = [

[tool.pytest.ini_options]
asyncio_mode = "auto"
timeout = 60
timeout = 10
timeout_func_only = true
filterwarnings = [
"ignore:^The wait_for_logs function with string or callable predicates is deprecated:DeprecationWarning",
Expand Down
41 changes: 21 additions & 20 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
requires =
tox>=4.2
env_list =
py314-asyncfast-pydantic212
py313-asyncfast-pydantic2{8-12}
clean
py3{10-13}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, asyncfast-cli}
py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, asyncfast-cli}
py3{10-12}-asyncfast-pydantic2{0-12}
py3{10-13}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, amgi-types, asyncfast, asyncfast-cli}-import
py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, amgi-types, asyncfast, asyncfast-cli}-import

[testenv]
runner = uv-venv-lock-runner
Expand All @@ -21,7 +22,7 @@ pass_env =
commands =
pytest {tty:--color=yes} {posargs:-v --cov --cov-append --cov-report=xml}
depends =
{py313, py312, py311, py310}: clean
{py314, py313, py312, py311, py310}: clean
uv_sync_flags = --all-packages

[testenv:clean]
Expand All @@ -30,7 +31,7 @@ commands =
coverage erase
uv_sync_flags = --all-packages

[testenv:py3{10-13}-asyncfast-pydantic2{0-12}]
[testenv:py3{10-14}-asyncfast-pydantic2{0-12}]
commands_pre =
pydantic20: uv pip install "pydantic>=2.0,<2.1"
pydantic21: uv pip install "pydantic>=2.1,<2.2"
Expand All @@ -49,99 +50,99 @@ commands =
{[testenv]commands} packages/asyncfast
uv_sync_flags = --package=asyncfast

[testenv:py3{10-13}-amgi-aiobotocore]
[testenv:py3{10-14}-amgi-aiobotocore]
commands =
{[testenv]commands} packages/amgi-aiobotocore
uv_sync_flags = --package=amgi-aiobotocore

[testenv:py3{10-13}-amgi-aiokafka]
[testenv:py3{10-14}-amgi-aiokafka]
commands =
{[testenv]commands} packages/amgi-aiokafka
uv_sync_flags = --package=amgi-aiokafka

[testenv:py3{10-13}-amgi-common]
[testenv:py3{10-14}-amgi-common]
commands =
{[testenv]commands} packages/amgi-common
uv_sync_flags = --package=amgi-common

[testenv:py3{10-13}-amgi-paho-mqtt]
[testenv:py3{10-14}-amgi-paho-mqtt]
commands =
{[testenv]commands} packages/amgi-paho-mqtt
uv_sync_flags = --package=amgi-paho-mqtt

[testenv:py3{10-13}-amgi-redis]
[testenv:py3{10-14}-amgi-redis]
commands =
{[testenv]commands} packages/amgi-redis
uv_sync_flags = --package=amgi-redis

[testenv:py3{10-13}-amgi-sqs-event-source-mapping]
[testenv:py3{10-14}-amgi-sqs-event-source-mapping]
commands =
{[testenv]commands} packages/amgi-sqs-event-source-mapping
uv_sync_flags = --package=amgi-sqs-event-source-mapping

[testenv:py3{10-13}-amgi-aiobotocore-import]
[testenv:py3{10-14}-amgi-aiobotocore-import]
commands =
python -c "import amgi_aiobotocore"
uv_sync_flags =
--package=amgi-aiobotocore
--no-dev

[testenv:py3{10-13}-amgi-aiokafka-import]
[testenv:py3{10-14}-amgi-aiokafka-import]
commands =
python -c "import amgi_aiokafka"
uv_sync_flags =
--package=amgi-aiokafka
--no-dev

[testenv:py3{10-13}-amgi-common-import]
[testenv:py3{10-14}-amgi-common-import]
commands =
python -c "import amgi_common"
uv_sync_flags =
--package=amgi-common
--no-dev

[testenv:py3{10-13}-amgi-paho-mqtt-import]
[testenv:py3{10-14}-amgi-paho-mqtt-import]
commands =
python -c "import amgi_paho_mqtt"
uv_sync_flags =
--package=amgi-paho-mqtt
--no-dev

[testenv:py3{10-13}-amgi-redis-import]
[testenv:py3{10-14}-amgi-redis-import]
commands =
python -c "import amgi_redis"
uv_sync_flags =
--package=amgi-redis
--no-dev

[testenv:py3{10-13}-amgi-sqs-event-source-mapping-import]
[testenv:py3{10-14}-amgi-sqs-event-source-mapping-import]
commands =
python -c "import amgi_sqs_event_source_mapping"
uv_sync_flags =
--package=amgi-sqs-event-source-mapping
--no-dev
--extra=boto3

[testenv:py3{10-13}-amgi-types-import]
[testenv:py3{10-14}-amgi-types-import]
commands =
python -c "import amgi_types"
uv_sync_flags =
--package=amgi-types
--no-dev

[testenv:py3{10-13}-asyncfast-import]
[testenv:py3{10-14}-asyncfast-import]
commands =
python -c "import asyncfast"
uv_sync_flags =
--package=asyncfast
--no-dev

[testenv:py3{10-13}-asyncfast-cli]
[testenv:py3{10-14}-asyncfast-cli]
commands =
{[testenv]commands} packages/asyncfast-cli
uv_sync_flags = --package=asyncfast-cli

[testenv:py3{10-13}-asyncfast-cli-import]
[testenv:py3{10-14}-asyncfast-cli-import]
commands =
python -c "import asyncfast_cli"
uv_sync_flags =
Expand Down