Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow-core/newsfragments/66787.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Validate ``conf`` payload size on Dag trigger, failing fast with a 413 instead of an opaque DB error.

A new ``[core] max_dagrun_conf_size_bytes`` config (default 65535) bounds the JSON-encoded size of
``conf`` accepted by ``DAG.create_dagrun()`` and the ``POST /api/v2/dags/{dag_id}/dagRuns`` route.
Oversized payloads now raise ``airflow.exceptions.DagRunConfTooLargeError`` (returns 413 Payload
Too Large) with a message pointing users to XCom, Variables, or external storage as the right
place to keep large payloads. Set the limit to ``0`` to disable the check.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
)
from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
from airflow.exceptions import DagRunConfTooLargeError, ParamValidationError
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent
Expand Down Expand Up @@ -560,6 +560,7 @@ def get_dag_runs(
status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
status.HTTP_413_CONTENT_TOO_LARGE,
]
),
dependencies=[
Expand Down Expand Up @@ -615,6 +616,8 @@ def trigger_dag_run(
partition_key=params["partition_key"],
session=session,
)
except DagRunConfTooLargeError as e:
raise HTTPException(status.HTTP_413_CONTENT_TOO_LARGE, str(e)) from e
except (ParamValidationError, ValueError) as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e

Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ core:
type: integer
example: ~
default: "0"
max_dagrun_conf_size_bytes:
description: |
Maximum size in bytes of the JSON-serialized ``conf`` payload accepted when a Dag run is
triggered. Requests with a larger ``conf`` are rejected with a clear error before the row
reaches the database, instead of failing later with a ``Data too long for column 'conf'``
DB error. The default of 65535 fits the smallest MySQL ``JSON`` column variant; deployments
on Postgres or with larger MySQL column types may raise it. Set to ``0`` to disable the
check entirely.
version_added: 3.2.0
type: integer
example: ~
default: "65535"
load_examples:
description: |
Whether to load the Dag examples that ship with Airflow. It's good to
Expand Down
24 changes: 24 additions & 0 deletions airflow-core/src/airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,30 @@ class AirflowBadRequest(AirflowException):
status_code = HTTPStatus.BAD_REQUEST


class DagRunConfTooLargeError(AirflowException):
"""
Raise when a Dag run is triggered with a ``conf`` payload above the configured limit.

Carries the serialized payload size and the limit so callers (e.g. the FastAPI route handler)
can render an actionable 413 Payload Too Large response.

:param size: Size of the serialized conf payload in bytes.
:param limit: Configured maximum payload size in bytes.
"""

status_code = HTTPStatus.REQUEST_ENTITY_TOO_LARGE

def __init__(self, size: int, limit: int) -> None:
self.size = size
self.limit = limit
super().__init__(
f"Dag run conf payload is {size} bytes, which exceeds the configured limit of "
f"{limit} bytes ([core] max_dagrun_conf_size_bytes). "
f"Store large payloads externally (XCom, Variables, or file storage) and pass a "
f"reference (e.g. a URI or key) in conf instead."
)


class InvalidStatsNameException(AirflowException):
"""Raise when name of the stats is invalid."""

Expand Down
29 changes: 28 additions & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import itertools
import json
import logging
import os
import re
Expand Down Expand Up @@ -66,7 +67,7 @@
from airflow._shared.timezones import timezone
from airflow.callbacks.callback_requests import DagCallbackRequest, DagRunContext
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException, NotMapped, TaskNotFound
from airflow.exceptions import AirflowException, DagRunConfTooLargeError, NotMapped, TaskNotFound
from airflow.listeners.listener import get_listener_manager
from airflow.models import Deadline, Log
from airflow.models.backfill import Backfill
Expand Down Expand Up @@ -147,6 +148,32 @@ def _creator_note(val):
return DagRunNote(*val)


def validate_dagrun_conf_size(conf: dict | None) -> None:
"""
Validate that ``conf`` serializes to no more than ``[core] max_dagrun_conf_size_bytes``.

Called at the trigger boundary (Dag run creation paths) so an oversized payload is rejected
with a clear error before the row reaches the database, instead of failing later with a
backend-specific size error such as MySQL's ``Data too long for column 'conf'``.

A configured limit of ``0`` disables the check. ``None`` and empty dicts are no-ops.

:param conf: The conf payload to validate.
:raises DagRunConfTooLargeError: If the serialized payload exceeds the configured limit.
"""
if not conf:
return
limit = airflow_conf.getint("core", "max_dagrun_conf_size_bytes", fallback=65535)
if limit <= 0:
return
# ``len`` on the JSON-encoded string is a tight upper bound on the bytes the backend will store:
# JSON only emits ASCII outside of string contents, where non-ASCII chars expand under UTF-8.
# Measuring the encoded bytes directly keeps the bound correct for unicode payloads.
size = len(json.dumps(conf, ensure_ascii=False).encode("utf-8"))
if size > limit:
raise DagRunConfTooLargeError(size=size, limit=limit)


class DagRun(Base, LoggingMixin):
"""
Invocation instance of a DAG.
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/serialization/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def create_dagrun(

:meta private:
"""
from airflow.models.dagrun import RUN_ID_REGEX
from airflow.models.dagrun import RUN_ID_REGEX, validate_dagrun_conf_size

log.info(
"creating dag run",
Expand All @@ -538,6 +538,7 @@ def create_dagrun(
logical_date=logical_date,
partition_key=partition_key,
)
validate_dagrun_conf_size(conf)
logical_date = coerce_datetime(logical_date)
# For manual runs where logical_date is None, ensure no data_interval is set.
if logical_date is None and data_interval is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,24 @@ def test_dagrun_creation_param_validation_error_returns_400(self, mock_create_da
assert response.status_code == 400
assert response.json() == {"detail": error_message}

@mock.patch("airflow.serialization.definitions.dag.SerializedDAG.create_dagrun")
def test_dagrun_creation_conf_too_large_returns_413(self, mock_create_dagrun, test_client):
"""Oversized ``conf`` must surface as 413 with an actionable message instead of a 500."""
from airflow.exceptions import DagRunConfTooLargeError

now = timezone.utcnow().isoformat()
mock_create_dagrun.side_effect = DagRunConfTooLargeError(size=200, limit=100)

response = test_client.post(
f"/dags/{DAG1_ID}/dagRuns",
json={"logical_date": now, "conf": {"k": "x" * 200}},
)
assert response.status_code == 413
detail = response.json()["detail"]
assert "200 bytes" in detail
assert "100 bytes" in detail
assert "max_dagrun_conf_size_bytes" in detail

@mock.patch("airflow.serialization.definitions.dag.SerializedDAG.create_dagrun")
def test_dagrun_creation_non_validation_error_propagates(self, mock_create_dagrun, test_client):
"""
Expand Down
57 changes: 57 additions & 0 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -3382,6 +3382,63 @@ def test_dag_run_id_rejects_path_traversal(session, dag_maker, run_id):
dag_maker.create_dagrun(run_id=run_id, run_type=DagRunType.MANUAL)


class TestValidateDagRunConfSize:
"""Tests for the ``validate_dagrun_conf_size`` trigger-boundary helper."""

@pytest.mark.parametrize(
"conf",
[
pytest.param(None, id="none"),
pytest.param({}, id="empty_dict"),
],
)
def test_empty_conf_is_noop(self, conf):
from airflow.models.dagrun import validate_dagrun_conf_size

# Should not raise regardless of the configured limit.
with conf_vars({("core", "max_dagrun_conf_size_bytes"): "1"}):
validate_dagrun_conf_size(conf)

def test_conf_at_or_below_limit_passes(self):
from airflow.models.dagrun import validate_dagrun_conf_size

with conf_vars({("core", "max_dagrun_conf_size_bytes"): "100"}):
validate_dagrun_conf_size({"key": "value"})

def test_conf_above_limit_raises_with_size_and_limit(self):
from airflow.exceptions import DagRunConfTooLargeError
from airflow.models.dagrun import validate_dagrun_conf_size

big_value = "x" * 200
with conf_vars({("core", "max_dagrun_conf_size_bytes"): "100"}):
with pytest.raises(DagRunConfTooLargeError) as exc_info:
validate_dagrun_conf_size({"k": big_value})
err = exc_info.value
assert err.limit == 100
assert err.size > 100
# The message should guide the user to the resolution.
assert "max_dagrun_conf_size_bytes" in str(err)
assert "XCom" in str(err)

def test_conf_size_check_disabled_when_limit_is_zero(self):
from airflow.models.dagrun import validate_dagrun_conf_size

# A 1 MiB payload should not raise when the check is disabled.
big_payload = {"k": "x" * (1024 * 1024)}
with conf_vars({("core", "max_dagrun_conf_size_bytes"): "0"}):
validate_dagrun_conf_size(big_payload)

def test_conf_size_check_measures_utf8_bytes_not_chars(self):
"""Multibyte unicode in conf must count toward the limit by encoded byte length."""
from airflow.exceptions import DagRunConfTooLargeError
from airflow.models.dagrun import validate_dagrun_conf_size

# Each emoji is 4 UTF-8 bytes; the JSON-encoded form is wider than the limit.
with conf_vars({("core", "max_dagrun_conf_size_bytes"): "20"}):
with pytest.raises(DagRunConfTooLargeError):
validate_dagrun_conf_size({"emoji": "\U0001f4a1" * 10})


def _get_states(dr):
"""
For a given dag run, get a dict of states.
Expand Down
Loading