Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python SDK Rework #73

Merged
merged 60 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
4271c18
chore: add Pydantic
tubarao312 Jan 14, 2025
4bf75a3
refactor: Completely rework task models
tubarao312 Jan 14, 2025
96d8972
docs: Improve types documentation
tubarao312 Jan 14, 2025
29a29ea
chore: Add aiohttp-retry lib
tubarao312 Jan 14, 2025
2d55487
refactor: Refactor manager config to use aiohttp-retry
tubarao312 Jan 14, 2025
a32fb39
refactor: Rework manager client to only have the required functions a…
tubarao312 Jan 14, 2025
4f1e3cb
refactor: Rework publisher client
tubarao312 Jan 14, 2025
ac7caf1
feat: Add from_str method to TaskKind model
tubarao312 Jan 14, 2025
272ef70
refactor: Slightly improve interface for brokers
tubarao312 Jan 14, 2025
eee42bf
docs: Fix error in Task docss
tubarao312 Jan 14, 2025
468f385
fix: Remove useless broker client interface code
tubarao312 Jan 14, 2025
6d5949b
refactor: Rework RabbitMQ clients so there exists one for the worker …
tubarao312 Jan 14, 2025
246087c
feat: Add durable and auto_delete config to BrokerConfig
tubarao312 Jan 14, 2025
41264df
chore: dependencies organization
tubarao312 Jan 14, 2025
674fd6d
fix: Fix broker to find exchanges and queues based on the published task
tubarao312 Jan 14, 2025
aa3b126
feat: Set prefetch count to 1
tubarao312 Jan 23, 2025
6b26d4d
Merge branch 'pull-rework/main' into pull-rework/publisher
tubarao312 Jan 23, 2025
dd9e848
docs: Simplify docs for worker app config
tubarao312 Jan 23, 2025
8ee0b26
feat: Add worker kind registration to manager client
tubarao312 Jan 23, 2025
56991b0
feat: Add worker kind broker info model
tubarao312 Jan 23, 2025
99aae04
feat: Register worker kind on worker app init
tubarao312 Jan 23, 2025
0c7ca7e
refactor: Remove irrelevant fields from broker config
tubarao312 Jan 23, 2025
ebaf6f3
refactor: Remove broker instance create function
tubarao312 Jan 23, 2025
d7188d5
refactor: Rework broker clients to be more lightweight and only conne…
tubarao312 Jan 23, 2025
7234987
docs: Add RabbitMQ to brokerconfig docs
tubarao312 Jan 23, 2025
602232c
refactor: Remove broker client interface
tubarao312 Jan 23, 2025
537f7c8
chore: add all brokers to __init__ export
tubarao312 Jan 23, 2025
bca4bfb
rework: Refactor worker client to use new broker clients
tubarao312 Jan 23, 2025
fdbc9e7
chore: Fix inconsistent url name
tubarao312 Jan 23, 2025
1b5198b
docs: Update examples
tubarao312 Jan 23, 2025
c8d0b0e
refactor: Refactor publisher client to use new broker client
tubarao312 Jan 23, 2025
b46d6bc
fix: Fix task status name
tubarao312 Jan 24, 2025
37f01b6
Merge branch 'pull-rework/main' into pull-rework/publisher
tubarao312 Jan 27, 2025
ed8f815
chore: Add new deps and settings for tests
tubarao312 Jan 28, 2025
d469559
chore: Update UV lock
tubarao312 Jan 28, 2025
68a0b46
chore: Remove outdated tests
tubarao312 Jan 28, 2025
07f6bed
chore: Update package info
tubarao312 Jan 28, 2025
1070eeb
fix: Fix multiple pydantic models to use proper pydantic inits
tubarao312 Jan 28, 2025
ef3c8e4
chore: Improve inits for exports
tubarao312 Jan 28, 2025
69c6026
chore: Tune modal config to account for tests
tubarao312 Jan 28, 2025
15a4985
test: Refactor worker client tests
tubarao312 Jan 28, 2025
6247847
test: Refactor publisher client tests
tubarao312 Jan 28, 2025
42dba83
test: Refactor manager client tests
tubarao312 Jan 28, 2025
4c23a2c
test: Simplify conftest
tubarao312 Jan 28, 2025
c0fb066
test: Temporarily disable benchmarks
tubarao312 Jan 28, 2025
4da7b12
fix: Improve pydantic model usage for WorkerApplication
tubarao312 Jan 28, 2025
6bc8d59
feat: Completely remove task kind model
tubarao312 Jan 28, 2025
f6348b8
fix: Fix URL for mock manager client
tubarao312 Jan 28, 2025
b0a81ef
docs: Fix examples to use new interfaces
tubarao312 Jan 28, 2025
06aac68
docs: Inline worker app config example
tubarao312 Jan 28, 2025
c738677
docs: Tweak examples
tubarao312 Jan 28, 2025
a87ddd1
feat: Add health check to local docker compose
tubarao312 Jan 29, 2025
d1a78a8
chore: Move all tests into unit test folder
tubarao312 Jan 29, 2025
fbe13c8
test: Add e2e test suite
tubarao312 Jan 29, 2025
ef78890
feat: Move all queue setup logic back to client SDK
tubarao312 Jan 29, 2025
3d5c3b3
fix: Remove worker kind broker info model and references, make task r…
tubarao312 Jan 29, 2025
1d2559e
feat: Create an auto-serializable exception. This must be improved in…
tubarao312 Jan 29, 2025
87b8481
fix: Fix Rust queues to be compatible with Python ones
tubarao312 Jan 29, 2025
fae2f1b
fix: Fix wrong default shutdown signal
tubarao312 Jan 29, 2025
3e9e3bf
feat: Make tasks only get ackowledged AFTER being completed
tubarao312 Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions client_sdks/python/examples/example_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio

from broker.config import BrokerConfig
from manager.config import ManagerConfig
from publisher.client import PublisherClient

Expand All @@ -9,21 +10,35 @@
# Setup the manager location configuration
manager_config = ManagerConfig(url="http://localhost:3000")

# Setup the broker configuration
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")

# Both the publisher and the worker need to know about the task kinds and
# should have unified names for them.
WORKER_KIND_NAME = "worker_kind"
TASK_1_NAME = "task_1"
TASK_2_NAME = "task_2"

# APPLICATION CONFIGURATION ___________________________________________________

# 1. Create a producer application
worker_application = PublisherClient(manager_config)
worker_application = PublisherClient(
manager_config=manager_config, broker_config=broker_config
)


# 2. Start the application
async def main():
task1 = await worker_application.publish_task(TASK_1_NAME, {"data": "task_1_data"})
task2 = await worker_application.publish_task(TASK_2_NAME, {"data": "task_2_data"})
task1 = await worker_application.publish_task(
TASK_1_NAME,
WORKER_KIND_NAME,
{"data": "task_1_data"},
)
task2 = await worker_application.publish_task(
TASK_2_NAME,
WORKER_KIND_NAME,
{"data": "task_2_data"},
)

print(f"Task 1: {task1}")
print(f"Task 2: {task2}")
Expand Down
23 changes: 13 additions & 10 deletions client_sdks/python/examples/example_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,37 @@

# Both the publisher and the worker need to know about the task kinds and
# should have unified names for them.
WORKER_KIND_NAME = "worker_kind"
TASK_1_NAME = "task_1"
TASK_2_NAME = "task_2"

# APPLICATION CONFIGURATION ___________________________________________________

# 2. Configure the worker
worker_config = WorkerApplicationConfig(
name="test_worker",
manager_config=manager_config,
broker_config=broker_config,
# 1. Create a worker application
worker_application = WorkerApplication(
config=WorkerApplicationConfig(
name="test_worker",
manager_config=manager_config,
broker_config=broker_config,
kind=WORKER_KIND_NAME,
),
)

# 3. Create a worker application
worker_application = WorkerApplication(worker_config)


# 4. Create tasks and register them with the worker application
# 2. Create tasks and register them with the worker application
@worker_application.task(TASK_1_NAME)
async def task_1(input_data: dict[Any, Any]) -> dict[Any, Any]:
await asyncio.sleep(1)
return input_data


@worker_application.task(TASK_2_NAME)
async def task_2(input_data: dict[Any, Any]) -> dict[Any, Any]:
async def task_2(_: dict[Any, Any]) -> dict[Any, Any]:
raise Exception("This is a test exception")


# 3. Run the worker application

if __name__ == "__main__":
# Application can be run either as a standalone script or via the CLI.
asyncio.run(worker_application.entrypoint())
6 changes: 6 additions & 0 deletions client_sdks/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ classifiers = [
requires-python = ">=3.12"
dependencies = [
"aio-pika>=9.5.3",
"aiohttp-retry>=2.9.1",
"aiohttp>=3.11.8",
"aioredis>=2.0.1",
"aioresponses>=0.7.8",
"click>=8.1.7",
"pydantic>=2.10.5",
"uuid>=1.30",
"watchfiles>=1.0.3",
]
Expand All @@ -28,6 +31,9 @@ asyncio_mode = "auto"
pythonpath = ["."]
markers = [
"bench: benchmark a function",
"target: current target test. Used locally.",
"unit: unit tests",
"service: tests that interact with external services",
]

[build-system]
Expand Down
33 changes: 7 additions & 26 deletions client_sdks/python/src/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,9 @@
from broker.client import PublisherBrokerClient, WorkerBrokerClient, BaseBrokerClient
from broker.config import BrokerConfig
from broker.core import BrokerClient

# TODO: Import only rabbit if tacoq[amqp] is installed
from broker.rabbitmq import RabbitMQBroker


def create_broker_instance(
config: BrokerConfig, exchange_name: str, worker_id: str
) -> BrokerClient:
"""Create appropriate broker client based on configuration.

### Parameters
- `config`: Configuration for the broker connection
- `exchange_name`: Name of the exchange to use
- `worker_id`: Unique identifier for this worker instance

### Returns
- `BrokerClient`: Configured broker client instance

### Raises
- `ValueError`: If broker URL scheme is not supported
"""

if config.url.startswith("amqp"):
return RabbitMQBroker(config, exchange_name, worker_id)
else:
raise ValueError(f"Unsupported broker URL: {config.url}")
__all__ = [
"PublisherBrokerClient",
"WorkerBrokerClient",
"BaseBrokerClient",
"BrokerConfig",
]
Loading