Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ def clear_task_instances(
ti.state = None
ti.external_executor_id = None
ti.clear_next_method_args()
# Match DagVersion to latest serialized DAG when run_on_latest_version.
if run_on_latest_version:
latest_dag_version = DagVersion.get_latest_version(ti.dag_id, session=session)
if latest_dag_version is not None:
ti.dag_version_id = latest_dag_version.id
session.merge(ti)

if dag_run_state is not False and tis:
Expand Down Expand Up @@ -418,8 +423,7 @@ def clear_task_instances(
dr.created_dag_version_id = dag_version.id
dr.dag = dr_dag
dr.verify_integrity(session=session, dag_version_id=dag_version.id)
for ti in dr.task_instances:
ti.dag_version_id = dag_version.id
# Only cleared TIs get latest dag_version_id above; do not rewrite others.
else:
dr_dag = scheduler_dagbag.get_dag_for_run(dag_run=dr, session=session)
if not dr_dag:
Expand All @@ -431,6 +435,20 @@ def clear_task_instances(
if dag_run_state == DagRunState.QUEUED:
dr.last_scheduling_decision = None
dr.start_date = None
elif run_on_latest_version:
# Queued/running DagRun: update DR to latest version/bundle for workloads that use it.
dag_version = DagVersion.get_latest_version(dr.dag_id, session=session)
if dag_version and dr.created_dag_version_id != dag_version.id:
dr_dag = scheduler_dagbag.get_latest_version_of_dag(dr.dag_id, session=session)
if not dr_dag:
log.warning("No serialized dag found for dag '%s'", dr.dag_id)
else:
dr.created_dag_version_id = dag_version.id
dr.dag = dr_dag
if not dr_dag.disable_bundle_versioning:
bundle_version = dr.dag_model.bundle_version
if bundle_version is not None:
dr.bundle_version = bundle_version
for ti in tis:
ti.context_carrier = new_task_run_carrier(ti.dag_run.context_carrier)
session.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/
import { Button, Flex, Heading, VStack } from "@chakra-ui/react";
import { useState } from "react";
import { useEffect, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
import { useParams } from "react-router-dom";
Expand All @@ -31,6 +31,8 @@ import { useClearTaskInstances } from "src/queries/useClearTaskInstances";
import { useClearTaskInstancesDryRun } from "src/queries/useClearTaskInstancesDryRun";
import { isStatePending, useAutoRefresh } from "src/utils";

import { getRunOnLatestVersionState } from "./runOnLatestVersion";

type Props = {
readonly onClose: () => void;
readonly open: boolean;
Expand All @@ -56,6 +58,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
const userToggledRunOnLatestRef = useRef(false);

const [note, setNote] = useState<string>("");

Expand Down Expand Up @@ -106,8 +109,21 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr
total_entries: 0,
};

const shouldShowBundleVersionOption =
dagDetails?.bundle_version !== null && dagDetails?.bundle_version !== "";
const { dagVersionsDiffer, shouldShowRunOnLatestOption } = getRunOnLatestVersionState({
latestBundleVersion: dagDetails?.bundle_version,
latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
selectedDagVersionNumber: taskInstance.dag_version_number,
// Fall back to legacy heuristic when grid summary has no version (older API).
useLatestBundleVersionAsFallback: true,
});

useEffect(() => {
if (!open) {
userToggledRunOnLatestRef.current = false;
} else if (!userToggledRunOnLatestRef.current) {
setRunOnLatestVersion(dagVersionsDiffer);
}
}, [open, dagVersionsDiffer]);

return (
<Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl">
Expand Down Expand Up @@ -160,14 +176,18 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr
</Flex>
<ActionAccordion affectedTasks={affectedTasks} note={note} setNote={setNote} />
<Flex
{...(shouldShowBundleVersionOption ? { alignItems: "center" } : {})}
justifyContent={shouldShowBundleVersionOption ? "space-between" : "end"}
{...(shouldShowRunOnLatestOption ? { alignItems: "center" } : {})}
gap={3}
justifyContent={shouldShowRunOnLatestOption ? "space-between" : "end"}
mt={3}
>
{shouldShowBundleVersionOption ? (
{shouldShowRunOnLatestOption ? (
<Checkbox
checked={runOnLatestVersion}
onCheckedChange={(event) => setRunOnLatestVersion(Boolean(event.checked))}
onCheckedChange={(event) => {
userToggledRunOnLatestRef.current = true;
setRunOnLatestVersion(Boolean(event.checked));
}}
>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/
import { Button, Flex, Heading, useDisclosure, VStack } from "@chakra-ui/react";
import { useState } from "react";
import { useEffect, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";

Expand All @@ -33,6 +33,7 @@ import { usePatchTaskInstance } from "src/queries/usePatchTaskInstance";
import { isStatePending, useAutoRefresh } from "src/utils";

import ClearTaskInstanceConfirmationDialog from "./ClearTaskInstanceConfirmationDialog";
import { getRunOnLatestVersionState } from "./runOnLatestVersion";

type Props = {
readonly onClose: () => void;
Expand Down Expand Up @@ -63,6 +64,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
const userToggledRunOnLatestRef = useRef(false);
const [preventRunningTask, setPreventRunningTask] = useState(true);

const [note, setNote] = useState<string | null>(taskInstance.note);
Expand Down Expand Up @@ -107,14 +109,20 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
total_entries: 0,
};

// Check if bundle versions are different
const currentDagBundleVersion = dagDetails?.bundle_version;
const taskInstanceDagVersionBundleVersion = taskInstance.dag_version?.bundle_version;
const bundleVersionsDiffer = currentDagBundleVersion !== taskInstanceDagVersionBundleVersion;
const shouldShowBundleVersionOption =
bundleVersionsDiffer &&
taskInstanceDagVersionBundleVersion !== null &&
taskInstanceDagVersionBundleVersion !== "";
const { dagVersionsDiffer, shouldShowRunOnLatestOption } = getRunOnLatestVersionState({
latestBundleVersion: dagDetails?.bundle_version,
latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
selectedBundleVersion: taskInstance.dag_version?.bundle_version,
selectedDagVersionNumber: taskInstance.dag_version?.version_number,
});

useEffect(() => {
if (!openDialog) {
userToggledRunOnLatestRef.current = false;
} else if (!userToggledRunOnLatestRef.current) {
setRunOnLatestVersion(dagVersionsDiffer);
}
}, [openDialog, dagVersionsDiffer]);

return (
<>
Expand Down Expand Up @@ -170,14 +178,18 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
</Flex>
<ActionAccordion affectedTasks={affectedTasks} note={note} setNote={setNote} />
<Flex
{...(shouldShowBundleVersionOption ? { alignItems: "center" } : {})}
justifyContent={shouldShowBundleVersionOption ? "space-between" : "end"}
{...(shouldShowRunOnLatestOption ? { alignItems: "center" } : {})}
gap={3}
justifyContent={shouldShowRunOnLatestOption ? "space-between" : "end"}
mt={3}
>
{shouldShowBundleVersionOption ? (
{shouldShowRunOnLatestOption ? (
<Checkbox
checked={runOnLatestVersion}
onCheckedChange={(event) => setRunOnLatestVersion(Boolean(event.checked))}
onCheckedChange={(event) => {
userToggledRunOnLatestRef.current = true;
setRunOnLatestVersion(Boolean(event.checked));
}}
>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { describe, expect, it } from "vitest";

import { getRunOnLatestVersionState } from "./runOnLatestVersion";

describe("getRunOnLatestVersionState", () => {
it.each([
{
expectedDagVersionsDiffer: true,
expectedShouldShowRunOnLatestOption: true,
latestDagVersionNumber: 3,
name: "shows and defaults on when DAG version numbers differ",
selectedDagVersionNumber: 2,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestDagVersionNumber: 3,
name: "does not show when DAG version numbers match and there is no bundle difference",
selectedDagVersionNumber: 3,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestDagVersionNumber: undefined,
name: "does not treat a missing latest DAG version number as different",
selectedDagVersionNumber: 3,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestDagVersionNumber: 3,
name: "does not treat a missing selected DAG version number as different",
selectedDagVersionNumber: undefined,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: "bundle-a",
name: "does not show when task bundle versions match",
selectedBundleVersion: "bundle-a",
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: true,
latestBundleVersion: "bundle-b",
name: "shows when task bundle versions differ",
selectedBundleVersion: "bundle-a",
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: true,
latestBundleVersion: null,
name: "shows when task bundle version differs from a known null latest bundle",
selectedBundleVersion: "bundle-a",
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: undefined,
name: "does not compare task bundle version before the latest bundle is loaded",
selectedBundleVersion: "bundle-a",
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: "bundle-b",
name: "does not show for task bundle comparison when selected bundle is null",
selectedBundleVersion: null,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: "bundle-b",
name: "does not show for task bundle comparison when selected bundle is empty",
selectedBundleVersion: "",
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: "bundle-b",
name: "does not show for task bundle comparison when selected bundle is missing",
selectedBundleVersion: undefined,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: true,
latestBundleVersion: "bundle-b",
name: "shows for group fallback when latest bundle is available",
selectedBundleVersion: undefined,
useLatestBundleVersionAsFallback: true,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: null,
name: "does not show for group fallback when latest bundle is null",
useLatestBundleVersionAsFallback: true,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: "",
name: "does not show for group fallback when latest bundle is empty",
useLatestBundleVersionAsFallback: true,
},
{
expectedDagVersionsDiffer: false,
expectedShouldShowRunOnLatestOption: false,
latestBundleVersion: undefined,
name: "does not show for group fallback when latest bundle is missing",
useLatestBundleVersionAsFallback: true,
},
])(
"$name",
({
expectedDagVersionsDiffer,
expectedShouldShowRunOnLatestOption,
latestBundleVersion,
latestDagVersionNumber,
selectedBundleVersion,
selectedDagVersionNumber,
useLatestBundleVersionAsFallback,
}) => {
expect(
getRunOnLatestVersionState({
latestBundleVersion,
latestDagVersionNumber,
selectedBundleVersion,
selectedDagVersionNumber,
useLatestBundleVersionAsFallback,
}),
).toEqual({
dagVersionsDiffer: expectedDagVersionsDiffer,
shouldShowRunOnLatestOption: expectedShouldShowRunOnLatestOption,
});
},
);
});
Loading
Loading