Skip to content

Add transport worker factory and abstract base class for background worker #4580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: srothh/transport-class-hierarchy
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from sentry_sdk.consts import EndpointType
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
from sentry_sdk.worker import BackgroundWorker
from sentry_sdk.worker import BackgroundWorker, Worker
from sentry_sdk.envelope import Envelope, Item, PayloadRef

from typing import TYPE_CHECKING
Expand Down Expand Up @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
Transport.__init__(self, options)
assert self.parsed_dsn is not None
self.options: Dict[str, Any] = options
self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
self._worker = self._create_worker(options)
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until: Dict[Optional[str], datetime] = {}
# We only use this Retry() class for the `get_retry_after` method it exposes
Expand Down Expand Up @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
elif self._compression_algo == "br":
self._compression_level = 4

def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
# For now, we only support the threaded sync background worker.
return BackgroundWorker(queue_size=options["transport_queue_size"])

def record_lost_event(
self: Self,
reason: str,
Expand Down
47 changes: 45 additions & 2 deletions sentry_sdk/worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import os
import threading

Expand All @@ -16,7 +17,49 @@
_TERMINATOR = object()


class BackgroundWorker:
class Worker(ABC):
"""
Base class for all workers.

A worker is used to process events in the background and send them to Sentry.
"""

@property
@abstractmethod
def is_alive(self) -> bool:
pass

@abstractmethod
def kill(self) -> None:
pass

def flush(
self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None
) -> None:
"""
Flush the worker.

This method blocks until the worker has flushed all events or the specified timeout is reached.
Default implementation is a no-op, since this method may only be relevant to some workers.
Subclasses should override this method if necessary.
"""
return None

@abstractmethod
def full(self) -> bool:
pass

@abstractmethod
def submit(self, callback: Callable[[], Any]) -> bool:
"""
Schedule a callback to be executed by the worker.

Returns True if the callback was scheduled, False if the queue is full.
"""
pass


class BackgroundWorker(Worker):
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
self._queue: Queue = Queue(queue_size)
self._lock = threading.Lock()
Expand Down Expand Up @@ -106,7 +149,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None:
pending = self._queue.qsize() + 1
logger.error("flush timed out, dropped %s events", pending)

def submit(self, callback: Callable[[], None]) -> bool:
def submit(self, callback: Callable[[], Any]) -> bool:
self._ensure_thread()
try:
self._queue.put_nowait(callback)
Expand Down
Loading