Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,8 @@ def get_template_env(self, dag: DAG | None = None) -> jinja2.Environment:
render_op_template_as_native_obj = getattr(self, "render_template_as_native_obj", None)
if render_op_template_as_native_obj is not None:
if dag:
# Use dag's template settings (searchpath, macros, filters, etc.)
searchpath = [dag.folder]
if dag.template_searchpath:
searchpath += dag.template_searchpath
# Use dag's resolved searchpath (handles relative paths in zipped DAGs)
searchpath = dag._get_resolved_searchpath()
return create_template_env(
native=render_op_template_as_native_obj,
searchpath=searchpath,
Expand Down
240 changes: 238 additions & 2 deletions task-sdk/src/airflow/sdk/definitions/_internal/templater.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import datetime
import logging
import os
from collections.abc import Collection, Iterable, Sequence
import re
import zipfile
from collections.abc import Callable, Collection, Iterable, Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

Expand All @@ -38,6 +40,11 @@
from airflow.sdk.types import Operator


# Regex pattern to detect zip file paths: matches "path/to/archive.zip/inner/path"
# Uses [/\\] to support both Unix and Windows path separators since zip internal paths always use "/"
ZIP_REGEX = re.compile(r"(.*\.zip)(?:[/\\](.*))?$")


@dataclass(frozen=True)
class LiteralValue(ResolveMixin):
"""
Expand All @@ -58,6 +65,235 @@ def resolve(self, context: Context) -> Any:
log = logging.getLogger(__name__)


# This loader addresses the issue where template files in zipped DAG packages
# could not be resolved by the standard FileSystemLoader.
# See: https://github.com/apache/airflow/issues/59310
class ZipAwareFileSystemLoader(jinja2.FileSystemLoader):
"""
A Jinja2 template loader that supports resolving templates from zipped DAG packages.

Search paths may include filesystem directories, zip files, or subdirectories
within zip files. Searchpath ordering is preserved across zip and non-zip entries.
"""

def __init__(
self,
searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]],
encoding: str = "utf-8",
followlinks: bool = False,
) -> None:
# Convert to list first to process
if isinstance(searchpath, (str, os.PathLike)):
searchpath = [searchpath]
all_paths = [os.fspath(p) for p in searchpath]

# Separate zip paths from regular paths at initialization time (once)
# Store zip info by index to preserve searchpath order
self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)}
regular_paths: list[str] = []

for idx, path in enumerate(all_paths):
zip_info = self._parse_zip_path(path)
if zip_info:
self._zip_path_map[idx] = zip_info
else:
regular_paths.append(path)

# Store regular paths for filesystem lookups
self._regular_searchpaths = regular_paths

# Initialize parent with regular paths only (empty list is OK for our use case)
# We override get_source anyway, so parent's searchpath is only used for list_templates
super().__init__(regular_paths if regular_paths else [], encoding, followlinks)

# Store all paths for reference and error messages
self._all_searchpaths = all_paths
self.searchpath = all_paths

@staticmethod
def _parse_zip_path(path: str) -> tuple[str, str] | None:
"""
Parse a path to extract zip archive and internal path components.

:param path: The path to parse
:return: Tuple of (archive_path, internal_base_path) if path is a zip path,
None otherwise
"""
# Check if the path itself is a zip file (no internal path)
if path.endswith(".zip") and os.path.isfile(path) and zipfile.is_zipfile(path):
return (path, "")

# Check for paths inside a zip (e.g., "archive.zip/subdir" or "archive.zip\subdir")
match = ZIP_REGEX.match(path)
if match:
archive, internal = match.groups()
if archive and os.path.isfile(archive) and zipfile.is_zipfile(archive):
return (archive, internal or "")

return None

def _read_from_zip(self, archive_path: str, internal_path: str) -> str:
"""
Read a file from inside a zip archive.

:param archive_path: Path to the zip file
:param internal_path: Path to the file inside the zip
:return: The file contents as a string
:raises TemplateNotFound: If the file doesn't exist in the zip
"""
try:
with zipfile.ZipFile(archive_path, "r") as zf:
# Normalize path separators for zip (always forward slashes)
normalized_path = internal_path.replace(os.sep, "/")
with zf.open(normalized_path) as f:
return f.read().decode(self.encoding)
except KeyError as exc:
raise jinja2.TemplateNotFound(internal_path) from exc
except (OSError, zipfile.BadZipFile) as exc:
raise jinja2.TemplateNotFound(
f"{internal_path} (error reading from {archive_path}: {exc})"
) from exc

def _get_source_from_single_zip(
self, archive_path: str, base_internal_path: str, template: str
) -> tuple[str, str, Callable[[], bool]] | None:
"""
Try to get template source from a single zip archive.

:param archive_path: Path to the zip file
:param base_internal_path: Base path inside the zip (may be empty)
:param template: The name of the template to load
:return: A tuple of (source, filename, up_to_date_func) if found, None otherwise
"""
import posixpath

from jinja2.loaders import split_template_path

pieces = split_template_path(template)
if base_internal_path:
internal_path = posixpath.join(base_internal_path, *pieces)
else:
internal_path = "/".join(pieces)

try:
source = self._read_from_zip(archive_path, internal_path)
filename = os.path.join(archive_path, internal_path)

archive_mtime = os.path.getmtime(archive_path)

def up_to_date(archive: str = archive_path, mtime: float = archive_mtime) -> bool:
try:
return os.path.getmtime(archive) == mtime
except OSError:
return False

return source, filename, up_to_date
except jinja2.TemplateNotFound:
return None

def _get_source_from_filesystem(
self, searchpath: str, template: str
) -> tuple[str, str, Callable[[], bool]] | None:
"""
Try to get template source from a single filesystem path.

:param searchpath: The directory to search in
:param template: The name of the template to load
:return: A tuple of (source, filename, up_to_date_func) if found, None otherwise
"""
from jinja2.loaders import split_template_path

pieces = split_template_path(template)
filename = os.path.join(searchpath, *pieces)

if not os.path.isfile(filename):
return None

try:
with open(filename, encoding=self.encoding) as f:
contents = f.read()

mtime = os.path.getmtime(filename)

def up_to_date(filepath: str = filename, file_mtime: float = mtime) -> bool:
try:
return os.path.getmtime(filepath) == file_mtime
except OSError:
return False

return contents, os.path.normpath(filename), up_to_date
except OSError:
return None

def get_source(
self, environment: jinja2.Environment, template: str
) -> tuple[str, str, Callable[[], bool]]:
"""
Get the template source, filename, and reload helper for a template.

Searches through searchpaths in order, handling both zip archives and
regular filesystem paths according to their original order.

:param environment: The Jinja2 environment
:param template: The name of the template to load
:return: A tuple of (source, filename, up_to_date_func)
:raises TemplateNotFound: If the template cannot be found
"""
for idx, path in enumerate(self._all_searchpaths):
if idx in self._zip_path_map:
archive_path, base_internal_path = self._zip_path_map[idx]
result = self._get_source_from_single_zip(archive_path, base_internal_path, template)
else:
result = self._get_source_from_filesystem(path, template)
if result:
return result

# Template not found in any searchpath
raise jinja2.TemplateNotFound(
f"'{template}' not found in search path: {', '.join(repr(p) for p in self._all_searchpaths)}"
)

def list_templates(self) -> list[str]:
"""
Return a list of available templates.

Combines templates from both zip archives and regular filesystem paths.

:return: A sorted list of template names
"""
found: set[str] = set()

# Get templates from zip paths
for archive_path, base_internal_path in self._zip_path_map.values():
try:
with zipfile.ZipFile(archive_path, "r") as zf:
for name in zf.namelist():
# Skip directories
if name.endswith("/"):
continue
if base_internal_path:
prefix = base_internal_path.replace(os.sep, "/") + "/"
if name.startswith(prefix):
relative = name[len(prefix) :]
found.add(relative)
else:
found.add(name)
except (OSError, zipfile.BadZipFile):
continue

# Get templates from regular paths
for searchpath in self._regular_searchpaths:
if not os.path.isdir(searchpath):
continue
for dirpath, _, filenames in os.walk(searchpath, followlinks=self.followlinks):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
relative = os.path.relpath(filepath, searchpath)
found.add(relative.replace(os.sep, "/"))

return sorted(found)


class Templater:
"""
This renders the template fields of object.
Expand Down Expand Up @@ -317,7 +553,7 @@ def create_template_env(
"cache_size": 0,
}
if searchpath:
jinja_env_options["loader"] = jinja2.FileSystemLoader(searchpath)
jinja_env_options["loader"] = ZipAwareFileSystemLoader(searchpath)
if jinja_environment_kwargs:
jinja_env_options.update(jinja_environment_kwargs)

Expand Down
23 changes: 18 additions & 5 deletions task-sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,15 +816,28 @@ def resolve_template_files(self):
if hasattr(t, "resolve_template_files"):
t.resolve_template_files()

def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environment:
"""Build a Jinja2 environment."""
from airflow.sdk.definitions._internal.templater import create_template_env
def _get_resolved_searchpath(self) -> list[str]:
"""
Return searchpath with relative paths resolved for zipped DAGs.

# Collect directories to search for template files
For zipped DAGs, relative template_searchpath entries (e.g., ``["templates"]``)
are resolved against the DAG folder (the zip file path).
"""
searchpath = [self.folder]
if self.template_searchpath:
searchpath += self.template_searchpath
is_zipped_dag = self.folder.endswith(".zip")
for path in self.template_searchpath:
if os.path.isabs(path) or not is_zipped_dag:
searchpath.append(path)
else:
searchpath.append(os.path.join(self.folder, path))
return searchpath

def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environment:
"""Build a Jinja2 environment."""
from airflow.sdk.definitions._internal.templater import create_template_env

searchpath = self._get_resolved_searchpath()
use_native = self.render_template_as_native_obj and not force_sandboxed
return create_template_env(
native=use_native,
Expand Down
Loading