Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@ def utc_epoch() -> datetime:
return result


class _StubSelector(selectors.BaseSelector):
"""
Stub to stand in until the real selector is created.

This is used in DagFileProcessorManager to keep Mypy happy, and emit a
slightly better error message than TypeError (if None is used) if a
contributor accidentally initializes a selector in a wrong place in the
future.

Some selectors do not work well in daemon mode after fork (exact reason
unknown; it's CPython internal). This stub allows us to delay creating a
selector until after forking and work around the issue.
"""

def __getattribute__(self, name):
raise RuntimeError("Selector not initialized")

def register(self, fileobj, events, data=None): ...
def unregister(self, fileobj): ...
def select(self, timeout=None): ...
def get_map(self): ...


@attrs.define(kw_only=True)
class DagFileProcessorManager(LoggingMixin):
"""
Expand All @@ -171,7 +194,7 @@ class DagFileProcessorManager(LoggingMixin):
processor_timeout: float = attrs.field(
factory=_config_int_factory("dag_processor", "dag_file_processor_timeout")
)
selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector)
selector: selectors.BaseSelector = attrs.field(factory=_StubSelector)

_parallelism: int = attrs.field(factory=_config_int_factory("dag_processor", "parsing_processes"))

Expand Down Expand Up @@ -280,6 +303,10 @@ def run(self):
# Related: https://github.com/apache/airflow/pull/57459
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"

# Initialization is delayed until here to avoid fork issues in some
# selector implementations. Also see _StubSelector documentation.
self.selector = selectors.DefaultSelector()

stats_factory = stats_utils.get_stats_factory(Stats)
Stats.initialize(factory=stats_factory)

Expand Down
Loading