Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python SDK Rework #73

Merged
merged 60 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
4271c18
chore: add Pydantic
tubarao312 Jan 14, 2025
4bf75a3
refactor: Completely rework task models
tubarao312 Jan 14, 2025
96d8972
docs: Improve types documentation
tubarao312 Jan 14, 2025
29a29ea
chore: Add aiohttp-retry lib
tubarao312 Jan 14, 2025
2d55487
refactor: Refactor manager config to use aiohttp-retry
tubarao312 Jan 14, 2025
a32fb39
refactor: Rework manager client to only have the required functions a…
tubarao312 Jan 14, 2025
4f1e3cb
refactor: Rework publisher client
tubarao312 Jan 14, 2025
ac7caf1
feat: Add from_str method to TaskKind model
tubarao312 Jan 14, 2025
272ef70
refactor: Slightly improve interface for brokers
tubarao312 Jan 14, 2025
eee42bf
docs: Fix error in Task docss
tubarao312 Jan 14, 2025
468f385
fix: Remove useless broker client interface code
tubarao312 Jan 14, 2025
6d5949b
refactor: Rework RabbitMQ clients so there exists one for the worker …
tubarao312 Jan 14, 2025
246087c
feat: Add durable and auto_delete config to BrokerConfig
tubarao312 Jan 14, 2025
41264df
chore: dependencies organization
tubarao312 Jan 14, 2025
674fd6d
fix: Fix broker to find exchanges and queues based on the published task
tubarao312 Jan 14, 2025
aa3b126
feat: Set prefetch count to 1
tubarao312 Jan 23, 2025
6b26d4d
Merge branch 'pull-rework/main' into pull-rework/publisher
tubarao312 Jan 23, 2025
dd9e848
docs: Simplify docs for worker app config
tubarao312 Jan 23, 2025
8ee0b26
feat: Add worker kind registration to manager client
tubarao312 Jan 23, 2025
56991b0
feat: Add worker kind broker info model
tubarao312 Jan 23, 2025
99aae04
feat: Register worker kind on worker app init
tubarao312 Jan 23, 2025
0c7ca7e
refactor: Remove irrelevant fields from broker config
tubarao312 Jan 23, 2025
ebaf6f3
refactor: Remove broker instance create function
tubarao312 Jan 23, 2025
d7188d5
refactor: Rework broker clients to be more lightweight and only conne…
tubarao312 Jan 23, 2025
7234987
docs: Add RabbitMQ to brokerconfig docs
tubarao312 Jan 23, 2025
602232c
refactor: Remove broker client interface
tubarao312 Jan 23, 2025
537f7c8
chore: add all brokers to __init__ export
tubarao312 Jan 23, 2025
bca4bfb
rework: Refactor worker client to use new broker clients
tubarao312 Jan 23, 2025
fdbc9e7
chore: Fix inconsistent url name
tubarao312 Jan 23, 2025
1b5198b
docs: Update examples
tubarao312 Jan 23, 2025
c8d0b0e
refactor: Refactor publisher client to use new broker client
tubarao312 Jan 23, 2025
b46d6bc
fix: Fix task status name
tubarao312 Jan 24, 2025
37f01b6
Merge branch 'pull-rework/main' into pull-rework/publisher
tubarao312 Jan 27, 2025
ed8f815
chore: Add new deps and settings for tests
tubarao312 Jan 28, 2025
d469559
chore: Update UV lock
tubarao312 Jan 28, 2025
68a0b46
chore: Remove outdated tests
tubarao312 Jan 28, 2025
07f6bed
chore: Update package info
tubarao312 Jan 28, 2025
1070eeb
fix: Fix multiple pydantic models to use proper pydantic inits
tubarao312 Jan 28, 2025
ef3c8e4
chore: Improve inits for exports
tubarao312 Jan 28, 2025
69c6026
chore: Tune modal config to account for tests
tubarao312 Jan 28, 2025
15a4985
test: Refactor worker client tests
tubarao312 Jan 28, 2025
6247847
test: Refactor publisher client tests
tubarao312 Jan 28, 2025
42dba83
test: Refactor manager client tests
tubarao312 Jan 28, 2025
4c23a2c
test: Simplify conftest
tubarao312 Jan 28, 2025
c0fb066
test: Temporarily disable benchmarks
tubarao312 Jan 28, 2025
4da7b12
fix: Improve pydantic model usage for WorkerApplication
tubarao312 Jan 28, 2025
6bc8d59
feat: Completely remove task kind model
tubarao312 Jan 28, 2025
f6348b8
fix: Fix URL for mock manager client
tubarao312 Jan 28, 2025
b0a81ef
docs: Fix examples to use new interfaces
tubarao312 Jan 28, 2025
06aac68
docs: Inline worker app config example
tubarao312 Jan 28, 2025
c738677
docs: Tweak examples
tubarao312 Jan 28, 2025
a87ddd1
feat: Add health check to local docker compose
tubarao312 Jan 29, 2025
d1a78a8
chore: Move all tests into unit test folder
tubarao312 Jan 29, 2025
fbe13c8
test: Add e2e test suite
tubarao312 Jan 29, 2025
ef78890
feat: Move all queue setup logic back to client SDK
tubarao312 Jan 29, 2025
3d5c3b3
fix: Remove worker kind broker info model and references, make task r…
tubarao312 Jan 29, 2025
1d2559e
feat: Create an auto-serializable exception. This must be improved in…
tubarao312 Jan 29, 2025
87b8481
fix: Fix Rust queues to be compatible with Python ones
tubarao312 Jan 29, 2025
fae2f1b
fix: Fix wrong default shutdown signal
tubarao312 Jan 29, 2025
3e9e3bf
feat: Make tasks only get ackowledged AFTER being completed
tubarao312 Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions client_sdks/python/examples/example_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio

from broker.config import BrokerConfig
from manager.config import ManagerConfig
from publisher.client import PublisherClient

Expand All @@ -9,21 +10,35 @@
# Setup the manager location configuration
manager_config = ManagerConfig(url="http://localhost:3000")

# Setup the broker configuration
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")

# Both the publisher and the worker need to know about the task kinds and
# should have unified names for them.
WORKER_KIND_NAME = "worker_kind"
TASK_1_NAME = "task_1"
TASK_2_NAME = "task_2"

# APPLICATION CONFIGURATION ___________________________________________________

# 1. Create a producer application
worker_application = PublisherClient(manager_config)
worker_application = PublisherClient(
manager_config=manager_config, broker_config=broker_config
)


# 2. Start the application
async def main():
task1 = await worker_application.publish_task(TASK_1_NAME, {"data": "task_1_data"})
task2 = await worker_application.publish_task(TASK_2_NAME, {"data": "task_2_data"})
task1 = await worker_application.publish_task(
TASK_1_NAME,
WORKER_KIND_NAME,
{"data": "task_1_data"},
)
task2 = await worker_application.publish_task(
TASK_2_NAME,
WORKER_KIND_NAME,
{"data": "task_2_data"},
)

print(f"Task 1: {task1}")
print(f"Task 2: {task2}")
Expand Down
23 changes: 13 additions & 10 deletions client_sdks/python/examples/example_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,37 @@

# Both the publisher and the worker need to know about the task kinds and
# should have unified names for them.
WORKER_KIND_NAME = "worker_kind"
TASK_1_NAME = "task_1"
TASK_2_NAME = "task_2"

# APPLICATION CONFIGURATION ___________________________________________________

# 2. Configure the worker
worker_config = WorkerApplicationConfig(
name="test_worker",
manager_config=manager_config,
broker_config=broker_config,
# 1. Create a worker application
worker_application = WorkerApplication(
config=WorkerApplicationConfig(
name="test_worker",
manager_config=manager_config,
broker_config=broker_config,
kind=WORKER_KIND_NAME,
),
)

# 3. Create a worker application
worker_application = WorkerApplication(worker_config)


# 4. Create tasks and register them with the worker application
# 2. Create tasks and register them with the worker application
@worker_application.task(TASK_1_NAME)
async def task_1(input_data: dict[Any, Any]) -> dict[Any, Any]:
await asyncio.sleep(1)
return input_data


@worker_application.task(TASK_2_NAME)
async def task_2(input_data: dict[Any, Any]) -> dict[Any, Any]:
async def task_2(_: dict[Any, Any]) -> dict[Any, Any]:
raise Exception("This is a test exception")


# 3. Run the worker application

if __name__ == "__main__":
# Application can be run either as a standalone script or via the CLI.
asyncio.run(worker_application.entrypoint())
6 changes: 6 additions & 0 deletions client_sdks/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ classifiers = [
requires-python = ">=3.12"
dependencies = [
"aio-pika>=9.5.3",
"aiohttp-retry>=2.9.1",
"aiohttp>=3.11.8",
"aioredis>=2.0.1",
"aioresponses>=0.7.8",
"click>=8.1.7",
"pydantic>=2.10.5",
"uuid>=1.30",
"watchfiles>=1.0.3",
]
Expand All @@ -28,6 +31,9 @@ asyncio_mode = "auto"
pythonpath = ["."]
markers = [
"bench: benchmark a function",
"target: current target test. Used locally.",
"unit: unit tests",
"service: tests that interact with external services",
]

[build-system]
Expand Down
33 changes: 7 additions & 26 deletions client_sdks/python/src/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,9 @@
from broker.client import PublisherBrokerClient, WorkerBrokerClient, BaseBrokerClient
from broker.config import BrokerConfig
from broker.core import BrokerClient

# TODO: Import only rabbit if tacoq[amqp] is installed
from broker.rabbitmq import RabbitMQBroker


def create_broker_instance(
config: BrokerConfig, exchange_name: str, worker_id: str
) -> BrokerClient:
"""Create appropriate broker client based on configuration.

### Parameters
- `config`: Configuration for the broker connection
- `exchange_name`: Name of the exchange to use
- `worker_id`: Unique identifier for this worker instance

### Returns
- `BrokerClient`: Configured broker client instance

### Raises
- `ValueError`: If broker URL scheme is not supported
"""

if config.url.startswith("amqp"):
return RabbitMQBroker(config, exchange_name, worker_id)
else:
raise ValueError(f"Unsupported broker URL: {config.url}")
__all__ = [
"PublisherBrokerClient",
"WorkerBrokerClient",
"BaseBrokerClient",
"BrokerConfig",
]
224 changes: 224 additions & 0 deletions client_sdks/python/src/broker/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import json
from typing import AsyncGenerator, Optional
from broker.config import BrokerConfig
from aio_pika import Message, connect_robust
from models.task import Task
from pydantic import BaseModel

from aio_pika.abc import (
AbstractChannel,
AbstractQueue,
AbstractRobustConnection,
AbstractExchange,
)

# =========================================
# Constants - Exchange and queue names
# =========================================

TASK_ASSIGNMENT_EXCHANGE = "task_assignment_exchange"
""" Exchange for task assignments. """

TASK_RESULT_EXCHANGE = "task_result_exchange"
""" Exchange for task results. """
tubarao312 marked this conversation as resolved.
Show resolved Hide resolved

# =========================================
# Errors
# =========================================


class NoChannelError(Exception):
"""Raised when a RabbitMQ client is not connected to the broker while
trying to perform an operation that requires a channel."""

pass


class NotConnectedError(Exception):
"""Raised when a RabbitMQ client is not connected to the broker while
trying to perform an operation that requires a connection."""

pass


class QueueNotDeclaredError(Exception):
"""Raised when a RabbitMQ client tries to use a queue that has not been
declared."""

pass


class ExchangeNotDeclaredError(Exception):
"""Raised when a RabbitMQ client tries to use an exchange that has not been
declared."""

pass


## =========================================
## Base Client
## =========================================


class BaseBrokerClient(BaseModel):
"""RabbitMQ implementation of the broker interface."""

config: BrokerConfig
""" Configuration for the broker. """

_connection: Optional[AbstractRobustConnection] = None
""" The connection to the RabbitMQ server. """

_channel: Optional[AbstractChannel] = None
""" The channel to the RabbitMQ server. """

async def connect(self) -> None:
"""Establish connection to RabbitMQ server and setup channel.

### Raises
- `ConnectionError`: If connection to RabbitMQ fails
"""

self._connection = await connect_robust(self.config.url)
tubarao312 marked this conversation as resolved.
Show resolved Hide resolved
self._channel = await self._connection.channel()

async def disconnect(self) -> None:
"""Close RabbitMQ connection.

### Raises
- `RabbitMQNotConnectedError`: If connection is not established
"""

if self._connection is None:
raise NotConnectedError(
"Tried to disconnect from RabbitMQ, but connection was not established."
)

# Remove the exchanges
await self._connection.close()


## =========================================
## Publisher Client
## =========================================


class PublisherBrokerClient(BaseBrokerClient):
"""RabbitMQ client for publishing tasks to workers.
Uses a fanout exchange to send tasks to both the manager queue
and the appropriate worker kind queue."""

_task_exchange: Optional[AbstractExchange] = None
""" The exchange for task assignments. """

async def connect(self) -> None:
await super().connect()

if self._channel is None:
raise NoChannelError(
"Tried to connect to RabbitMQ, but channel was not established."
)

# Connect to the exchange so to which we will publish tasks.
self._task_exchange = await self._channel.declare_exchange(
TASK_ASSIGNMENT_EXCHANGE,
passive=True, # We only want to connect to the exchange if it already exists.
)

async def publish_task(self, routing_key: str, task: Task) -> None:
"""Publish a task to both manager and worker queues via exchange and routing mechanisms.

### Arguments
- `routing_key`: The routing key for the task. This is based on the worker kind. The publisher
client will know the routing key based on the requests it has made to the manager, who creates
the queues and binds them to the exchange.

### Raises
- `RuntimeError`: If the exchange was not declared.
"""

if self._task_exchange is None:
raise RuntimeError("Tried to publish task, but exchange was not declared.")

message = Message(body=task.model_dump_json().encode())

await self._task_exchange.publish(message, routing_key)


## =========================================
## Worker Client
## =========================================


class WorkerBrokerClient(BaseBrokerClient):
"""RabbitMQ client for workers to consume tasks and publish results.
Each worker kind has its own queue for task assignments, but all workers
share a single queue for publishing results."""

task_assignment_queue_name: str
""" The name of the task assignment queue. """

_task_assignment_queue: Optional[AbstractQueue] = None
""" Queue for task assignments. """

_result_exchange: Optional[AbstractExchange] = None
""" Exchange for publishing results (shared by all workers). """

async def connect(self) -> None:
await super().connect()

if self._channel is None:
raise NoChannelError(
"Tried to connect to RabbitMQ, but channel was not established."
)

# =========================================
# Setup task assignment queue for this worker kind
# =========================================

self._task_assignment_queue = await self._channel.declare_queue(
self.task_assignment_queue_name,
passive=True, # We only want to connect to the queue if it already exists.
)

# =========================================
# Setup result publishing infrastructure
# =========================================

# Set prefetch to one to enable fair dispatching
await self._channel.set_qos(prefetch_count=1)

# Setup result publishing infrastructure
self._result_exchange = await self._channel.declare_exchange(
TASK_RESULT_EXCHANGE,
passive=True, # We only want to connect to the queue if it already exists.
)

async def listen(self) -> AsyncGenerator[Task, None]:
"""Listen for tasks assigned to this worker's kind."""
if self._task_assignment_queue is None:
raise QueueNotDeclaredError(
"Tried to listen for tasks, but queue was not declared."
)

async for message in self._task_assignment_queue.iterator():
async with message.process():
yield Task(**json.loads(message.body.decode()))

async def publish_task_result(self, task: Task) -> None:
"""Publish a task result to the shared results queue."""

# Check if the task has a result attached
if task.result is None:
raise ValueError(
"Tried to publish task result, but task has no result attached. How did it get to this point?"
)

if self._result_exchange is None:
raise ExchangeNotDeclaredError(
"Tried to publish task result, but exchange was not declared."
)

message = Message(body=task.model_dump_json().encode())

await self._result_exchange.publish(message, routing_key="")
8 changes: 4 additions & 4 deletions client_sdks/python/src/broker/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dataclasses import dataclass
from pydantic import BaseModel


@dataclass
class BrokerConfig:
"""Configuration for a broker."""
class BrokerConfig(BaseModel):
"""Configuration for a RabbitMQ broker."""

url: str
""" The URL of the broker. """
Loading