Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/66888.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Translate ``sqlalchemy.exc.DataError`` (raised when the database rejects an INSERT/UPDATE because a value exceeds the column type or is out of range) to an actionable ``413 Content Too Large`` / ``422 Unprocessable Entity`` response on both the public REST API and the task-execution API, instead of a generic ``500 Internal Server Error``. This covers every write endpoint (DagRun ``conf``, Connection ``extra``, Variable ``val``, XCom ``value``, TaskInstance ``note``, HITL fields, and so on) on every backend automatically.
48 changes: 46 additions & 2 deletions airflow-core/src/airflow/api_fastapi/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Generic, TypeVar

from fastapi import HTTPException, Request, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.exc import DataError, IntegrityError

from airflow.configuration import conf
from airflow.exceptions import DeserializationError
Expand Down Expand Up @@ -108,6 +108,46 @@ def _is_dialect_matched(self, exc: IntegrityError) -> bool:
return False


class _DataErrorHandler(BaseErrorHandler[DataError]):
"""
Translate ``sqlalchemy.exc.DataError`` into an actionable HTTP response.

``DataError`` wraps the database rejecting an INSERT/UPDATE because a value
exceeds the column's declared type (MySQL ``1406 Data too long``, Postgres
``value too long for type``) or is out of range (MySQL ``1264 Out of range
value``, Postgres ``numeric field overflow``). The wrapped value came from
request input that passed Pydantic validation, so the failure is always a
client problem — translate to a 4xx with an actionable hint rather than
surfacing as a generic 500.
"""

_TOO_LARGE_MARKERS: tuple[str, ...] = ("too long", "too large", "too big")

def __init__(self):
super().__init__(DataError)

def exception_handler(self, request: Request, exc: DataError):
orig_error = str(exc.orig)
if any(marker in orig_error.lower() for marker in self._TOO_LARGE_MARKERS):
status_code = status.HTTP_413_CONTENT_TOO_LARGE
reason = "Payload exceeded database column limit"
else:
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
reason = "Value rejected by database"
raise HTTPException(
status_code=status_code,
detail={
"reason": reason,
"orig_error": orig_error,
"message": (
"Database rejected the payload. Reduce the field size, or "
"your operator may widen the column type (e.g. MEDIUMTEXT / "
"LONGTEXT on MySQL)."
),
},
)


class DagErrorHandler(BaseErrorHandler[DeserializationError]):
"""Handler for Dag related errors."""

Expand All @@ -122,4 +162,8 @@ def exception_handler(self, request: Request, exc: DeserializationError):
)


ERROR_HANDLERS: list[BaseErrorHandler] = [_UniqueConstraintErrorHandler(), DagErrorHandler()]
ERROR_HANDLERS: list[BaseErrorHandler] = [
_UniqueConstraintErrorHandler(),
_DataErrorHandler(),
DagErrorHandler(),
]
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/api_fastapi/execution_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,18 @@ def custom_generate_unique_id(route: APIRoute):

app.generate_and_include_versioned_routers(execution_api_router)

# Same translation as the public API: DB-rejected payloads become 4xx with an
# actionable hint instead of being swallowed by the generic 500 handler below.
# The narrower ``Callable[[Request, DataError], ...]`` signature is widened to
# ``Callable[[Request, Exception], ...]`` by Starlette at registration time;
# the same variance pattern is masked by type erasure in
# ``core_api/app.py``'s ``ERROR_HANDLERS`` loop.
from sqlalchemy.exc import DataError

from airflow.api_fastapi.common.exceptions import _DataErrorHandler

app.add_exception_handler(DataError, _DataErrorHandler().exception_handler) # type: ignore[arg-type]

# As we are mounted as a sub app, we don't get any logs for unhandled exceptions without this!
@app.exception_handler(Exception)
def handle_exceptions(request: Request, exc: Exception):
Expand Down
87 changes: 85 additions & 2 deletions airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
from unittest.mock import Mock, patch

import pytest
from fastapi import HTTPException, status
from fastapi import FastAPI, HTTPException, status
from fastapi.testclient import TestClient
from sqlalchemy import select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.exc import DataError, IntegrityError
from sqlalchemy.orm import Session

from airflow.api_fastapi.common.exceptions import (
ERROR_HANDLERS,
DagErrorHandler,
_DatabaseDialect,
_DataErrorHandler,
_UniqueConstraintErrorHandler,
)
from airflow.configuration import conf
Expand Down Expand Up @@ -388,6 +391,86 @@ def test_handle_multiple_columns_unique_constraint_error_with_stacktrace(
assert exeinfo_response_error.value.detail == expected_exception.detail


class TestDataErrorHandler:
handler = _DataErrorHandler()

@staticmethod
def _make_data_error(orig_msg: str) -> DataError:
return DataError(
statement="INSERT INTO dag_run (conf) VALUES (?)",
params={},
orig=Exception(orig_msg),
)

@pytest.mark.parametrize(
("orig_msg", "expected_status", "expected_reason"),
[
pytest.param(
"(1406, \"Data too long for column 'conf' at row 1\")",
status.HTTP_413_CONTENT_TOO_LARGE,
"Payload exceeded database column limit",
id="mysql-1406-data-too-long",
),
pytest.param(
"value too long for type character varying(250)",
status.HTTP_413_CONTENT_TOO_LARGE,
"Payload exceeded database column limit",
id="postgres-value-too-long",
),
pytest.param(
"string or blob too big",
status.HTTP_413_CONTENT_TOO_LARGE,
"Payload exceeded database column limit",
id="sqlite-blob-too-big",
),
pytest.param(
"(1264, \"Out of range value for column 'slots' at row 1\")",
status.HTTP_422_UNPROCESSABLE_ENTITY,
"Value rejected by database",
id="mysql-1264-out-of-range",
),
pytest.param(
"numeric field overflow",
status.HTTP_422_UNPROCESSABLE_ENTITY,
"Value rejected by database",
id="postgres-numeric-field-overflow",
),
],
)
def test_dataerror_translates_to_actionable_http_response(
self,
orig_msg: str,
expected_status: int,
expected_reason: str,
) -> None:
exc = self._make_data_error(orig_msg)
with pytest.raises(HTTPException) as exc_info:
self.handler.exception_handler(Mock(), exc)
assert exc_info.value.status_code == expected_status
detail = exc_info.value.detail
assert isinstance(detail, dict)
assert detail["reason"] == expected_reason
assert detail["orig_error"] == orig_msg
assert "MEDIUMTEXT" in detail["message"]

def test_dataerror_dispatched_through_fastapi_app(self) -> None:
"""End-to-end: a route raising DataError returns 413 via the registered handler."""
app = FastAPI()
for h in ERROR_HANDLERS:
app.add_exception_handler(h.exception_cls, h.exception_handler)

@app.post("/test")
def trigger_data_error():
raise self._make_data_error("(1406, \"Data too long for column 'conf' at row 1\")")

response = TestClient(app, raise_server_exceptions=False).post("/test")
assert response.status_code == status.HTTP_413_CONTENT_TOO_LARGE
body = response.json()
detail = body["detail"]
assert detail["reason"] == "Payload exceeded database column limit"
assert "MEDIUMTEXT" in detail["message"]


class TestDagErrorHandler:
@pytest.mark.parametrize(
"cause",
Expand Down
Loading