Skip to content

Commit 8d7ac0b

Browse files
committed
refactor: infer the queue name from task priority
1 parent be77eeb commit 8d7ac0b

File tree

7 files changed

+11
-31
lines changed

7 files changed

+11
-31
lines changed

server/polar/checkout/tasks.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import uuid
22

33
from polar.exceptions import PolarTaskError
4-
from polar.worker import AsyncSessionMaker, CronTrigger, TaskPriority, TaskQueue, actor
4+
from polar.worker import AsyncSessionMaker, CronTrigger, TaskPriority, actor
55

66
from .repository import CheckoutRepository
77
from .service import checkout as checkout_service
@@ -20,7 +20,6 @@ def __init__(self, checkout_id: uuid.UUID) -> None:
2020
@actor(
2121
actor_name="checkout.handle_free_success",
2222
priority=TaskPriority.HIGH,
23-
queue_name=TaskQueue.HIGH_PRIORITY,
2423
)
2524
async def handle_free_success(checkout_id: uuid.UUID) -> None:
2625
async with AsyncSessionMaker() as session:

server/polar/email/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from polar.worker import TaskPriority, TaskQueue, actor
1+
from polar.worker import TaskPriority, actor
22

33
from .sender import email_sender
44

55

6-
@actor(actor_name="email.send", priority=TaskPriority.HIGH, queue_name=TaskQueue.HIGH_PRIORITY)
6+
@actor(actor_name="email.send", priority=TaskPriority.HIGH)
77
async def email_send(
88
to_email_addr: str,
99
subject: str,

server/polar/eventstream/tasks.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
from polar.worker import RedisMiddleware, TaskPriority, TaskQueue, actor
1+
from polar.worker import RedisMiddleware, TaskPriority, actor
22

33
from .service import send_event
44

55

66
@actor(
77
actor_name="eventstream.publish",
88
priority=TaskPriority.HIGH,
9-
queue_name=TaskQueue.HIGH_PRIORITY,
109
)
1110
async def eventstream_publish(event: str, channels: list[str]) -> None:
1211
await send_event(RedisMiddleware.get(), event, channels)

server/polar/integrations/stripe/tasks.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
AsyncSessionMaker,
4343
RedisMiddleware,
4444
TaskPriority,
45-
TaskQueue,
4645
actor,
4746
can_retry,
4847
get_retries,
@@ -81,7 +80,6 @@ class StripeTaskError(PolarTaskError): ...
8180
@actor(
8281
actor_name="stripe.webhook.account.updated",
8382
priority=TaskPriority.HIGH,
84-
queue_name=TaskQueue.HIGH_PRIORITY,
8583
)
8684
@stripe_api_connection_error_retry
8785
async def account_updated(event_id: uuid.UUID) -> None:
@@ -96,7 +94,6 @@ async def account_updated(event_id: uuid.UUID) -> None:
9694
@actor(
9795
actor_name="stripe.webhook.payment_intent.succeeded",
9896
priority=TaskPriority.HIGH,
99-
queue_name=TaskQueue.HIGH_PRIORITY,
10097
)
10198
@stripe_api_connection_error_retry
10299
async def payment_intent_succeeded(event_id: uuid.UUID) -> None:
@@ -141,7 +138,6 @@ async def payment_intent_succeeded(event_id: uuid.UUID) -> None:
141138
@actor(
142139
actor_name="stripe.webhook.payment_intent.payment_failed",
143140
priority=TaskPriority.HIGH,
144-
queue_name=TaskQueue.HIGH_PRIORITY,
145141
)
146142
@stripe_api_connection_error_retry
147143
async def payment_intent_payment_failed(event_id: uuid.UUID) -> None:
@@ -167,7 +163,6 @@ async def payment_intent_payment_failed(event_id: uuid.UUID) -> None:
167163
@actor(
168164
actor_name="stripe.webhook.setup_intent.succeeded",
169165
priority=TaskPriority.HIGH,
170-
queue_name=TaskQueue.HIGH_PRIORITY,
171166
)
172167
@stripe_api_connection_error_retry
173168
async def setup_intent_succeeded(event_id: uuid.UUID) -> None:
@@ -189,7 +184,6 @@ async def setup_intent_succeeded(event_id: uuid.UUID) -> None:
189184
@actor(
190185
actor_name="stripe.webhook.setup_intent.setup_failed",
191186
priority=TaskPriority.HIGH,
192-
queue_name=TaskQueue.HIGH_PRIORITY,
193187
)
194188
@stripe_api_connection_error_retry
195189
async def setup_intent_setup_failed(event_id: uuid.UUID) -> None:
@@ -210,7 +204,6 @@ async def setup_intent_setup_failed(event_id: uuid.UUID) -> None:
210204
@actor(
211205
actor_name="stripe.webhook.charge.pending",
212206
priority=TaskPriority.HIGH,
213-
queue_name=TaskQueue.HIGH_PRIORITY,
214207
)
215208
async def charge_pending(event_id: uuid.UUID) -> None:
216209
async with AsyncSessionMaker() as session:
@@ -234,7 +227,6 @@ async def charge_pending(event_id: uuid.UUID) -> None:
234227
@actor(
235228
actor_name="stripe.webhook.charge.failed",
236229
priority=TaskPriority.HIGH,
237-
queue_name=TaskQueue.HIGH_PRIORITY,
238230
)
239231
async def charge_failed(event_id: uuid.UUID) -> None:
240232
async with AsyncSessionMaker() as session:
@@ -254,7 +246,6 @@ async def charge_failed(event_id: uuid.UUID) -> None:
254246
@actor(
255247
actor_name="stripe.webhook.charge.succeeded",
256248
priority=TaskPriority.HIGH,
257-
queue_name=TaskQueue.HIGH_PRIORITY,
258249
)
259250
@stripe_api_connection_error_retry
260251
async def charge_succeeded(event_id: uuid.UUID) -> None:
@@ -276,7 +267,6 @@ async def charge_succeeded(event_id: uuid.UUID) -> None:
276267
@actor(
277268
actor_name="stripe.webhook.refund.created",
278269
priority=TaskPriority.HIGH,
279-
queue_name=TaskQueue.HIGH_PRIORITY,
280270
)
281271
@stripe_api_connection_error_retry
282272
async def refund_created(event_id: uuid.UUID) -> None:
@@ -295,7 +285,6 @@ async def refund_created(event_id: uuid.UUID) -> None:
295285
@actor(
296286
actor_name="stripe.webhook.refund.updated",
297287
priority=TaskPriority.HIGH,
298-
queue_name=TaskQueue.HIGH_PRIORITY,
299288
)
300289
@stripe_api_connection_error_retry
301290
async def refund_updated(event_id: uuid.UUID) -> None:
@@ -314,7 +303,6 @@ async def refund_updated(event_id: uuid.UUID) -> None:
314303
@actor(
315304
actor_name="stripe.webhook.refund.failed",
316305
priority=TaskPriority.HIGH,
317-
queue_name=TaskQueue.HIGH_PRIORITY,
318306
)
319307
@stripe_api_connection_error_retry
320308
async def refund_failed(event_id: uuid.UUID) -> None:
@@ -333,7 +321,6 @@ async def refund_failed(event_id: uuid.UUID) -> None:
333321
@actor(
334322
actor_name="stripe.webhook.charge.dispute.closed",
335323
priority=TaskPriority.HIGH,
336-
queue_name=TaskQueue.HIGH_PRIORITY,
337324
)
338325
@stripe_api_connection_error_retry
339326
async def charge_dispute_closed(event_id: uuid.UUID) -> None:
@@ -353,7 +340,6 @@ async def charge_dispute_closed(event_id: uuid.UUID) -> None:
353340
@actor(
354341
actor_name="stripe.webhook.customer.subscription.updated",
355342
priority=TaskPriority.HIGH,
356-
queue_name=TaskQueue.HIGH_PRIORITY,
357343
)
358344
@stripe_api_connection_error_retry
359345
async def customer_subscription_updated(event_id: uuid.UUID) -> None:
@@ -380,7 +366,6 @@ async def customer_subscription_updated(event_id: uuid.UUID) -> None:
380366
@actor(
381367
actor_name="stripe.webhook.customer.subscription.deleted",
382368
priority=TaskPriority.HIGH,
383-
queue_name=TaskQueue.HIGH_PRIORITY,
384369
)
385370
@stripe_api_connection_error_retry
386371
async def customer_subscription_deleted(event_id: uuid.UUID) -> None:
@@ -407,7 +392,6 @@ async def customer_subscription_deleted(event_id: uuid.UUID) -> None:
407392
@actor(
408393
actor_name="stripe.webhook.invoice.created",
409394
priority=TaskPriority.HIGH,
410-
queue_name=TaskQueue.HIGH_PRIORITY,
411395
)
412396
@stripe_api_connection_error_retry
413397
async def invoice_created(event_id: uuid.UUID) -> None:
@@ -433,7 +417,6 @@ async def invoice_created(event_id: uuid.UUID) -> None:
433417
@actor(
434418
actor_name="stripe.webhook.invoice.paid",
435419
priority=TaskPriority.HIGH,
436-
queue_name=TaskQueue.HIGH_PRIORITY,
437420
)
438421
@stripe_api_connection_error_retry
439422
async def invoice_paid(event_id: uuid.UUID) -> None:
@@ -474,7 +457,6 @@ async def payout_paid(event_id: uuid.UUID) -> None:
474457
@actor(
475458
actor_name="stripe.webhook.identity.verification_session.verified",
476459
priority=TaskPriority.HIGH,
477-
queue_name=TaskQueue.HIGH_PRIORITY,
478460
)
479461
async def identity_verification_session_verified(event_id: uuid.UUID) -> None:
480462
async with AsyncSessionMaker() as session:
@@ -490,7 +472,6 @@ async def identity_verification_session_verified(event_id: uuid.UUID) -> None:
490472
@actor(
491473
actor_name="stripe.webhook.identity.verification_session.processing",
492474
priority=TaskPriority.HIGH,
493-
queue_name=TaskQueue.HIGH_PRIORITY,
494475
)
495476
async def identity_verification_session_processing(event_id: uuid.UUID) -> None:
496477
async with AsyncSessionMaker() as session:
@@ -506,7 +487,6 @@ async def identity_verification_session_processing(event_id: uuid.UUID) -> None:
506487
@actor(
507488
actor_name="stripe.webhook.identity.verification_session.requires_input",
508489
priority=TaskPriority.HIGH,
509-
queue_name=TaskQueue.HIGH_PRIORITY,
510490
)
511491
async def identity_verification_session_requires_input(event_id: uuid.UUID) -> None:
512492
async with AsyncSessionMaker() as session:

server/polar/subscription/tasks.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
)
1616
from polar.product.repository import ProductRepository
1717
from polar.subscription.repository import SubscriptionRepository
18-
from polar.worker import AsyncSessionMaker, TaskPriority, TaskQueue, actor, enqueue_job
18+
from polar.worker import AsyncSessionMaker, TaskPriority, actor, enqueue_job
1919

2020
from .service import SubscriptionNotReadyForMigration
2121
from .service import subscription as subscription_service
@@ -89,7 +89,6 @@ async def subscription_update_meters(subscription_id: uuid.UUID) -> None:
8989
@actor(
9090
actor_name="subscription.cancel_customer",
9191
priority=TaskPriority.HIGH,
92-
queue_name=TaskQueue.HIGH_PRIORITY,
9392
)
9493
async def subscription_cancel_customer(customer_id: uuid.UUID) -> None:
9594
async with AsyncSessionMaker() as session:

server/polar/webhook/tasks.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from polar.kit.utils import utc_now
1515
from polar.logging import Logger
1616
from polar.models.webhook_delivery import WebhookDelivery
17-
from polar.worker import AsyncSessionMaker, TaskPriority, TaskQueue, actor, can_retry, enqueue_job
17+
from polar.worker import AsyncSessionMaker, TaskPriority, actor, can_retry, enqueue_job
1818

1919
from .service import webhook as webhook_service
2020

@@ -129,7 +129,6 @@ async def _webhook_event_send(
129129
@actor(
130130
actor_name="webhook_event.success",
131131
priority=TaskPriority.HIGH,
132-
queue_name=TaskQueue.HIGH_PRIORITY,
133132
)
134133
async def webhook_event_success(webhook_event_id: UUID) -> None:
135134
async with AsyncSessionMaker() as session:

server/polar/worker/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,15 @@ class TaskQueue:
189189
def actor[**P, R](
190190
actor_class: Callable[..., dramatiq.Actor[Any, Any]] = dramatiq.Actor,
191191
actor_name: str | None = None,
192-
queue_name: str = "default",
192+
queue_name: str | None = None,
193193
priority: TaskPriority = TaskPriority.LOW,
194194
broker: dramatiq.Broker | None = None,
195195
**options: Any,
196196
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
197+
# Auto-map priority to queue if not explicitly set
198+
if queue_name is None:
199+
queue_name = TaskQueue.HIGH_PRIORITY if priority == TaskPriority.HIGH else TaskQueue.DEFAULT
200+
197201
def decorator(
198202
fn: Callable[P, Awaitable[R]],
199203
) -> Callable[P, Awaitable[R]]:

0 commit comments

Comments
 (0)