Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
from fastapi import Depends, HTTPException, Query, status
from pendulum.parsing.exceptions import ParserError
from pydantic import AfterValidator, BaseModel, NonNegativeInt
from sqlalchemy import Column, and_, func, not_, or_, select as sql_select, true as sql_true
from sqlalchemy import Column, String, and_, func, not_, or_, select as sql_select, true as sql_true
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.functions import FunctionElement

from airflow._shared.timezones import timezone
from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
Expand Down Expand Up @@ -72,6 +74,7 @@
if TYPE_CHECKING:
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql import ColumnElement, Select
from sqlalchemy.sql.compiler import SQLCompiler

from airflow.serialization.definitions.dag import SerializedDAG

Expand All @@ -80,6 +83,44 @@
_FALLBACK_PAGE_LIMIT: int = conf.getint("api", "fallback_page_limit")


class _MySQLCollate(FunctionElement):
"""
Wraps a SQL expression so that on MySQL it is emitted with an explicit ``COLLATE`` clause.

On every other dialect the expression is passed through unchanged.

This is needed when a computed expression (e.g. a ``CASE … END`` that mixes
a stored ``VARCHAR`` column with a ``CAST(integer AS CHAR)``) ends up with
MySQL coercibility ``NONE`` because the two branches carry different implicit
collations. Comparing such an expression with a bound parameter fails with
"Illegal mix of collations". Wrapping the expression in an explicit
``COLLATE`` gives it ``EXPLICIT`` coercibility, which MySQL accepts in all
comparison operators.
"""

type = String()
inherit_cache = True

def __init__(self, expr: ColumnElement[Any], collation: str) -> None:
super().__init__(expr)
self.collation = collation


@compiles(_MySQLCollate)
def _compile_mysql_collate_default(element: _MySQLCollate, compiler: SQLCompiler, **kw: Any) -> str:
"""Non-MySQL: render the inner expression without any COLLATE clause."""
(expr,) = element.clauses
return compiler.process(expr, **kw)


@compiles(_MySQLCollate, "mysql")
def _compile_mysql_collate_mysql(element: _MySQLCollate, compiler: SQLCompiler, **kw: Any) -> str:
"""MySQL: wrap the inner expression with the requested COLLATE clause."""
(expr,) = element.clauses
inner = compiler.process(expr, **kw)
return f"({inner}) COLLATE {element.collation}"


class BaseParam(OrmClause[T], ABC):
"""Base class for path or query parameters with ORM transformation."""

Expand Down Expand Up @@ -1358,6 +1399,35 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
)
),
]
# On MySQL the CASE expression that backs rendered_map_index mixes a stored
# VARCHAR column (utf8mb4_bin, IMPLICIT) with CAST(map_index AS CHAR)
# (utf8mb4_0900_ai_ci, IMPLICIT), which gives the whole expression NONE
# coercibility. Comparing it against a bound parameter then fails with
# "Illegal mix of collations". _MySQLCollate wraps the expression so that
# on MySQL an explicit COLLATE clause is emitted (giving EXPLICIT coercibility);
# on PostgreSQL and SQLite the wrapper is transparent.
_rendered_map_index_collated = _MySQLCollate(
cast("ColumnElement[Any]", TaskInstance.rendered_map_index), "utf8mb4_0900_ai_ci"
)

QueryTIRenderedMapIndexPatternSearch = Annotated[
_SearchParam,
Depends(
search_param_factory(
_rendered_map_index_collated,
"rendered_map_index_pattern",
)
),
]
QueryTIRenderedMapIndexPrefixPatternSearch = Annotated[
_PrefixSearchParam,
Depends(
prefix_search_param_factory(
_rendered_map_index_collated,
"rendered_map_index_prefix_pattern",
)
),
]

# XCom
QueryXComKeyPatternSearch = Annotated[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6892,6 +6892,51 @@ paths:
items:
type: integer
title: Map Index
- name: rendered_map_index_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular\
\ expressions are **not** supported. \n\n**Performance note:** this full-match\
\ pattern is evaluated as ``ILIKE '%term%'`` and most of the time prevents\
\ the database from using B-tree indexes, which can be very slow on large\
\ tables. Prefer the equivalent ``rendered_map_index_prefix_pattern``\
\ parameter when possible."
title: Rendered Map Index Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions\
\ are **not** supported. \n\n**Performance note:** this full-match pattern\
\ is evaluated as ``ILIKE '%term%'`` and most of the time prevents the database\
\ from using B-tree indexes, which can be very slow on large tables. Prefer\
\ the equivalent ``rendered_map_index_prefix_pattern`` parameter when possible."
- name: rendered_map_index_prefix_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "Prefix match \u2014 returns items whose value starts with\
\ the given string (case-sensitive, index-friendly). Use the pipe `|`\
\ operator for OR logic (e.g. `dag1|dag2`). Use `~` to match all. Wildcard\
\ characters (`%`, `_`) are treated as literal characters. Trailing non-alphanumeric\
\ characters in the prefix are stripped before matching so the range scan\
\ stays index-compatible under locale-aware collations \u2014 e.g. `test_`\
\ effectively matches items starting with `test`, and `s3://` matches\
\ items starting with `s3`."
title: Rendered Map Index Prefix Pattern
description: "Prefix match \u2014 returns items whose value starts with the\
\ given string (case-sensitive, index-friendly). Use the pipe `|` operator\
\ for OR logic (e.g. `dag1|dag2`). Use `~` to match all. Wildcard characters\
\ (`%`, `_`) are treated as literal characters. Trailing non-alphanumeric\
\ characters in the prefix are stripped before matching so the range scan\
\ stays index-compatible under locale-aware collations \u2014 e.g. `test_`\
\ effectively matches items starting with `test`, and `s3://` matches items\
\ starting with `s3`."
- name: limit
in: query
required: false
Expand Down Expand Up @@ -8009,6 +8054,51 @@ paths:
items:
type: integer
title: Map Index
- name: rendered_map_index_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular\
\ expressions are **not** supported. \n\n**Performance note:** this full-match\
\ pattern is evaluated as ``ILIKE '%term%'`` and most of the time prevents\
\ the database from using B-tree indexes, which can be very slow on large\
\ tables. Prefer the equivalent ``rendered_map_index_prefix_pattern``\
\ parameter when possible."
title: Rendered Map Index Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions\
\ are **not** supported. \n\n**Performance note:** this full-match pattern\
\ is evaluated as ``ILIKE '%term%'`` and most of the time prevents the database\
\ from using B-tree indexes, which can be very slow on large tables. Prefer\
\ the equivalent ``rendered_map_index_prefix_pattern`` parameter when possible."
- name: rendered_map_index_prefix_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "Prefix match \u2014 returns items whose value starts with\
\ the given string (case-sensitive, index-friendly). Use the pipe `|`\
\ operator for OR logic (e.g. `dag1|dag2`). Use `~` to match all. Wildcard\
\ characters (`%`, `_`) are treated as literal characters. Trailing non-alphanumeric\
\ characters in the prefix are stripped before matching so the range scan\
\ stays index-compatible under locale-aware collations \u2014 e.g. `test_`\
\ effectively matches items starting with `test`, and `s3://` matches\
\ items starting with `s3`."
title: Rendered Map Index Prefix Pattern
description: "Prefix match \u2014 returns items whose value starts with the\
\ given string (case-sensitive, index-friendly). Use the pipe `|` operator\
\ for OR logic (e.g. `dag1|dag2`). Use `~` to match all. Wildcard characters\
\ (`%`, `_`) are treated as literal characters. Trailing non-alphanumeric\
\ characters in the prefix are stripped before matching so the range scan\
\ stays index-compatible under locale-aware collations \u2014 e.g. `test_`\
\ effectively matches items starting with `test`, and `s3://` matches items\
\ starting with `s3`."
- name: limit
in: query
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
QueryTIQueueFilter,
QueryTIQueueNamePatternSearch,
QueryTIQueueNamePrefixPatternSearch,
QueryTIRenderedMapIndexPatternSearch,
QueryTIRenderedMapIndexPrefixPatternSearch,
QueryTIStateFilter,
QueryTITaskDisplayNamePatternSearch,
QueryTITaskDisplayNamePrefixPatternSearch,
Expand Down Expand Up @@ -184,6 +186,8 @@ def get_mapped_task_instances(
operator_name_pattern: QueryTIOperatorNamePatternSearch,
operator_name_prefix_pattern: QueryTIOperatorNamePrefixPatternSearch,
map_index: QueryTIMapIndexFilter,
rendered_map_index_pattern: QueryTIRenderedMapIndexPatternSearch,
rendered_map_index_prefix_pattern: QueryTIRenderedMapIndexPrefixPatternSearch,
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -260,6 +264,8 @@ def get_mapped_task_instances(
operator_name_pattern,
operator_name_prefix_pattern,
map_index,
rendered_map_index_pattern,
rendered_map_index_prefix_pattern,
],
order_by=order_by,
offset=offset,
Expand Down Expand Up @@ -472,6 +478,8 @@ def get_task_instances(
operator_name_pattern: QueryTIOperatorNamePatternSearch,
operator_name_prefix_pattern: QueryTIOperatorNamePrefixPatternSearch,
map_index: QueryTIMapIndexFilter,
rendered_map_index_pattern: QueryTIRenderedMapIndexPatternSearch,
rendered_map_index_prefix_pattern: QueryTIRenderedMapIndexPrefixPatternSearch,
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -579,6 +587,8 @@ def get_task_instances(
operator_name_pattern,
operator_name_prefix_pattern,
map_index,
rendered_map_index_pattern,
rendered_map_index_prefix_pattern,
]

if use_cursor:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ def ti_update_state(
exclude={"task_outlets", "outlet_events", "retry_delay_seconds", "retry_reason"},
exclude_unset=True,
)
if "rendered_map_index" in data:
data["_rendered_map_index"] = data.pop("rendered_map_index")
query = update(TI).where(TI.id == task_instance_id).values(data)

try:
Expand Down Expand Up @@ -905,7 +907,7 @@ def ti_patch_rendered_map_index(

log.debug("Updating rendered_map_index", length=len(rendered_map_index))

query = update(TI).where(TI.id == task_instance_id).values(rendered_map_index=rendered_map_index)
query = update(TI).where(TI.id == task_instance_id).values(_rendered_map_index=rendered_map_index)
result = session.execute(query)

result = cast("CursorResult[Any]", result)
Expand Down
9 changes: 9 additions & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
Uuid,
and_,
case,
cast,
delete,
extract,
false,
Expand Down Expand Up @@ -783,6 +784,14 @@ def rendered_map_index(self) -> str | None:
return str(self.map_index)
return None

@rendered_map_index.expression # type: ignore[no-redef]
def rendered_map_index(cls):
return case(
(cls._rendered_map_index.isnot(None), cls._rendered_map_index),
(cls.map_index >= 0, cast(cls.map_index, String)),
else_=None,
)

@property
def log_url(self) -> str:
"""Log URL for TaskInstance."""
Expand Down
Loading
Loading