Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion render.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ services:
fromRegistryCreds:
name: Registry Credentials (by fvoron)
plan: pro
dockerCommand: uv run dramatiq -p 4 -t 8 -f polar.worker.scheduler:start polar.worker.run
dockerCommand: uv run dramatiq -p 2 -t 4 --queues default -f polar.worker.scheduler:start polar.worker.run
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't start the scheduler on both workers, otherwise we'll get duplicated jobs 🙂

Next stop would be to have a dedicated server just for the scheduler, so we can independently scale the worker horizontally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha! Good catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler is considered a high or normal/medium priority? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed to default queue for now

healthCheckPath: /
region: ohio
numInstances: 1
Expand Down Expand Up @@ -174,6 +174,84 @@ services:
- fromGroup: aws-s3-production
- fromGroup: openai-production

# Worker - High Priority
- type: web # Use a web service to allow health checks
name: worker-high-priority
runtime: image
image:
url: ghcr.io/polarsource/polar:latest
creds:
fromRegistryCreds:
name: Registry Credentials (by fvoron)
plan: pro
dockerCommand: uv run dramatiq -p 2 -t 4 --queues high_priority polar.worker.run
healthCheckPath: /
region: ohio
numInstances: 1
autoDeploy: false # deploys are triggered by github actions
envVars:
- key: dramatiq_prom_port
value: 10001
- key: POLAR_POSTGRES_DATABASE
fromDatabase:
name: db
property: database
- key: POLAR_POSTGRES_HOST
fromDatabase:
name: db
property: host
- key: POLAR_POSTGRES_PORT
fromDatabase:
name: db
property: port
- key: POLAR_POSTGRES_PWD
fromDatabase:
name: db
property: password
- key: POLAR_POSTGRES_USER
fromDatabase:
name: db
property: user
- key: POLAR_POSTGRES_READ_DATABASE
fromDatabase:
name: polar-read
property: database
- key: POLAR_POSTGRES_READ_HOST
fromDatabase:
name: polar-read
property: host
- key: POLAR_POSTGRES_READ_PORT
fromDatabase:
name: polar-read
property: port
- key: POLAR_POSTGRES_READ_PWD
fromDatabase:
name: polar-read
property: password
- key: POLAR_POSTGRES_READ_USER
fromDatabase:
name: polar-read
property: user
- key: POLAR_REDIS_HOST
fromService:
type: redis
name: redis
property: host
- key: POLAR_REDIS_PORT
fromService:
type: redis
name: redis
property: port
- key: POLAR_REDIS_DB
value: 0
- fromGroup: google-production
- fromGroup: github-production
- fromGroup: backend-production
- fromGroup: stripe-production
- fromGroup: logfire-worker
- fromGroup: aws-s3-production
- fromGroup: openai-production

- type: redis
name: redis
plan: standard
Expand Down
16 changes: 15 additions & 1 deletion server/polar/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,29 @@ class TaskPriority(IntEnum):
LOW = 100


class TaskQueue:
HIGH_PRIORITY = "high_priority"
DEFAULT = "default"


P = ParamSpec("P")


def actor[**P, R](
actor_class: Callable[..., dramatiq.Actor[Any, Any]] = dramatiq.Actor,
actor_name: str | None = None,
queue_name: str = "default",
queue_name: str | None = None,
priority: TaskPriority = TaskPriority.LOW,
broker: dramatiq.Broker | None = None,
**options: Any,
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
if queue_name is None:
queue_name = (
TaskQueue.HIGH_PRIORITY
if priority == TaskPriority.HIGH
else TaskQueue.DEFAULT
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever! I like the idea that it's made transparently :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate duplicated code :D


def decorator(
fn: Callable[P, Awaitable[R]],
) -> Callable[P, Awaitable[R]]:
Expand Down Expand Up @@ -225,4 +237,6 @@ async def _wrapped_fn(*args: P.args, **kwargs: P.kwargs) -> R:
"enqueue_events",
"get_retries",
"can_retry",
"TaskPriority",
"TaskQueue",
]
2 changes: 1 addition & 1 deletion server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ dev = [
emails = { cmd = "cd emails && pnpm i --frozen-lockfile && pnpm run build", help = "build emails" }
backoffice = { cmd = "pnpm -C polar/web_backoffice install && pnpm -C polar/web_backoffice build", help = "build backoffice CSS and JS" }
api = { cmd = "env AUTHLIB_INSECURE_TRANSPORT=true uvicorn polar.app:app --reload --workers 1 --host 127.0.0.1 --port 8000", help = "run api service" }
worker = { cmd = "dramatiq -p 1 -t 1 --watch polar -f polar.worker.scheduler:start polar.worker.run", help = "run worker" }
worker = { cmd = "dramatiq -p 1 -t 1 --queues high_priority default --watch polar -f polar.worker.scheduler:start polar.worker.run", help = "run worker" }
test = { cmd = "POLAR_ENV=testing python -m pytest --cov polar/ --cov-report=term-missing", help = "run all tests" }
test_fast = { cmd = "POLAR_ENV=testing python -m pytest -n auto -p no:sugar --no-cov", help = "run all tests, but fast" }
lint = { cmd = "ruff format . && ruff check --fix .", help = "run linters with autofix" }
Expand Down
Loading