Skip to content

Commit

Permalink
Python SDK Rework (#73)
Browse files Browse the repository at this point in the history
* chore: add Pydantic

* refactor: Completely rework task models

* docs: Improve types documentation

* chore: Add aiohttp-retry lib

* refactor: Refactor manager config to use aiohttp-retry

* refactor: Rework manager client to only have the required functions and use Pydantic

* refactor: Rework publisher client

* feat: Add from_str method to TaskKind model

* refactor: Slightly improve interface for brokers

* docs: Fix error in Task docss

* fix: Remove useless broker client interface code

* refactor: Rework RabbitMQ clients so there exists one for the worker and one for the publisher

* feat: Add durable and auto_delete config to BrokerConfig

* chore: dependencies organization

* fix: Fix broker to find exchanges and queues based on the published task

* feat: Set prefetch count to 1

* docs: Simplify docs for worker app config

* feat: Add worker kind registration to manager client

* feat: Add worker kind broker info  model

* feat: Register worker kind on worker app init

* refactor: Remove irrelevant fields from broker config

* refactor: Remove broker instance create function

Doesn't make sense to keep it given that we only support RabbitMQ

* refactor: Rework broker clients to be more lightweight and only connect to exchanges and queues instead of creating them

* docs: Add RabbitMQ to brokerconfig docs

* refactor: Remove broker client interface

* chore: add all brokers to __init__ export

* rework: Refactor worker client to use new broker clients

* chore: Fix inconsistent url name

* docs: Update examples

* refactor: Refactor publisher client to use new broker client

* fix: Fix task status name

* chore: Add new deps and settings for tests

* chore: Update UV lock

* chore: Remove outdated tests

* chore: Update package info

* fix: Fix multiple pydantic models to use proper pydantic inits

* chore: Improve inits for exports

* chore: Tune modal config to account for tests

* test: Refactor worker client tests

* test: Refactor publisher client tests

* test: Refactor manager client tests

* test: Simplify conftest

* test: Temporarily disable benchmarks

* fix: Improve pydantic model usage for WorkerApplication

* feat: Completely remove task kind model

* fix: Fix URL for mock manager client

* docs: Fix examples to use new interfaces

* docs: Inline worker app config example

* docs: Tweak examples

* feat: Add health check to local docker compose

* chore: Move all tests into unit test folder

* test: Add e2e test suite

* feat: Move all queue setup logic back to client SDK

* fix: Remove worker kind broker info model and references, make task result optional

* feat: Create an auto-serializable exception. This must be improved in the future.

* fix: Fix Rust queues to be compatible with Python ones

* fix: Fix wrong default shutdown signal

* feat: Make tasks only get ackowledged AFTER being completed
  • Loading branch information
tubarao312 authored Feb 1, 2025
1 parent 0a8233a commit c54bf3b
Show file tree
Hide file tree
Showing 37 changed files with 1,438 additions and 764 deletions.
21 changes: 18 additions & 3 deletions client_sdks/python/examples/example_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio

from broker.config import BrokerConfig
from manager.config import ManagerConfig
from publisher.client import PublisherClient

Expand All @@ -9,21 +10,35 @@
# Setup the manager location configuration
manager_config = ManagerConfig(url="http://localhost:3000")

# Setup the broker configuration
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")

# Both the publisher and the worker need to know about the task kinds and
# should have unified names for them.
WORKER_KIND_NAME = "worker_kind"
TASK_1_NAME = "task_1"
TASK_2_NAME = "task_2"

# APPLICATION CONFIGURATION ___________________________________________________

# 1. Create a producer application
worker_application = PublisherClient(manager_config)
worker_application = PublisherClient(
manager_config=manager_config, broker_config=broker_config
)


# 2. Start the application
async def main():
task1 = await worker_application.publish_task(TASK_1_NAME, {"data": "task_1_data"})
task2 = await worker_application.publish_task(TASK_2_NAME, {"data": "task_2_data"})
task1 = await worker_application.publish_task(
TASK_1_NAME,
WORKER_KIND_NAME,
{"data": "task_1_data"},
)
task2 = await worker_application.publish_task(
TASK_2_NAME,
WORKER_KIND_NAME,
{"data": "task_2_data"},
)

print(f"Task 1: {task1}")
print(f"Task 2: {task2}")
Expand Down
23 changes: 13 additions & 10 deletions client_sdks/python/examples/example_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,37 @@

# Both the publisher and the worker need to know about the task kinds and
# should have unified names for them.
WORKER_KIND_NAME = "worker_kind"
TASK_1_NAME = "task_1"
TASK_2_NAME = "task_2"

# APPLICATION CONFIGURATION ___________________________________________________

# 2. Configure the worker
worker_config = WorkerApplicationConfig(
name="test_worker",
manager_config=manager_config,
broker_config=broker_config,
# 1. Create a worker application
worker_application = WorkerApplication(
config=WorkerApplicationConfig(
name="test_worker",
manager_config=manager_config,
broker_config=broker_config,
kind=WORKER_KIND_NAME,
),
)

# 3. Create a worker application
worker_application = WorkerApplication(worker_config)


# 4. Create tasks and register them with the worker application
# 2. Create tasks and register them with the worker application
@worker_application.task(TASK_1_NAME)
async def task_1(input_data: dict[Any, Any]) -> dict[Any, Any]:
await asyncio.sleep(1)
return input_data


@worker_application.task(TASK_2_NAME)
async def task_2(input_data: dict[Any, Any]) -> dict[Any, Any]:
async def task_2(_: dict[Any, Any]) -> dict[Any, Any]:
raise Exception("This is a test exception")


# 3. Run the worker application

if __name__ == "__main__":
# Application can be run either as a standalone script or via the CLI.
asyncio.run(worker_application.entrypoint())
6 changes: 6 additions & 0 deletions client_sdks/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ classifiers = [
requires-python = ">=3.12"
dependencies = [
"aio-pika>=9.5.3",
"aiohttp-retry>=2.9.1",
"aiohttp>=3.11.8",
"aioredis>=2.0.1",
"aioresponses>=0.7.8",
"click>=8.1.7",
"pydantic>=2.10.5",
"uuid>=1.30",
"watchfiles>=1.0.3",
]
Expand All @@ -28,6 +31,9 @@ asyncio_mode = "auto"
pythonpath = ["."]
markers = [
"bench: benchmark a function",
"target: current target test. Used locally.",
"unit: unit tests",
"service: tests that interact with external services",
]

[build-system]
Expand Down
33 changes: 7 additions & 26 deletions client_sdks/python/src/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,9 @@
from broker.client import PublisherBrokerClient, WorkerBrokerClient, BaseBrokerClient
from broker.config import BrokerConfig
from broker.core import BrokerClient

# TODO: Import only rabbit if tacoq[amqp] is installed
from broker.rabbitmq import RabbitMQBroker


def create_broker_instance(
config: BrokerConfig, exchange_name: str, worker_id: str
) -> BrokerClient:
"""Create appropriate broker client based on configuration.
### Parameters
- `config`: Configuration for the broker connection
- `exchange_name`: Name of the exchange to use
- `worker_id`: Unique identifier for this worker instance
### Returns
- `BrokerClient`: Configured broker client instance
### Raises
- `ValueError`: If broker URL scheme is not supported
"""

if config.url.startswith("amqp"):
return RabbitMQBroker(config, exchange_name, worker_id)
else:
raise ValueError(f"Unsupported broker URL: {config.url}")
__all__ = [
"PublisherBrokerClient",
"WorkerBrokerClient",
"BaseBrokerClient",
"BrokerConfig",
]
Loading

0 comments on commit c54bf3b

Please sign in to comment.