Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions providers/amazon/docs/operators/mwaa_serverless.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,17 @@ To stop a running Amazon MWAA Serverless workflow run, use
:dedent: 4
:start-after: [START howto_operator_mwaa_serverless_stop_workflow_run]
:end-before: [END howto_operator_mwaa_serverless_stop_workflow_run]

.. _howto/operator:MwaaServerlessDeleteWorkflowOperator:

Delete a Workflow
-----------------

To delete an Amazon MWAA Serverless workflow, use
:class:`~airflow.providers.amazon.aws.operators.mwaa_serverless.MwaaServerlessDeleteWorkflowOperator`.

.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_mwaa_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_mwaa_serverless_delete_workflow]
:end-before: [END howto_operator_mwaa_serverless_delete_workflow]
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,49 @@ def execute(self, context: Context) -> str:
return workflow_arn


class MwaaServerlessDeleteWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
"""
Delete an Amazon MWAA Serverless workflow.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:MwaaServerlessDeleteWorkflowOperator`

:param workflow_arn: The ARN of the workflow to delete. (templated)
:param workflow_version: Optional specific version to delete. If not specified,
all versions are deleted. (templated)
"""

aws_hook_class = AwsBaseHook
template_fields: tuple[str, ...] = aws_template_fields("workflow_arn", "workflow_version")

def __init__(
self,
*,
workflow_arn: str,
workflow_version: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.workflow_arn = workflow_arn
self.workflow_version = workflow_version

@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "mwaa-serverless"}

def execute(self, context: Context) -> None:
self.log.info("Deleting MWAA Serverless workflow %s", self.workflow_arn)
kwargs: dict[str, Any] = prune_dict(
{
"WorkflowArn": self.workflow_arn,
"WorkflowVersion": self.workflow_version,
}
)
self.hook.conn.delete_workflow(**kwargs)
self.log.info("Workflow %s deleted.", self.workflow_arn)


class MwaaServerlessStopWorkflowRunOperator(AwsBaseOperator[AwsBaseHook]):
"""
Stop a running Amazon MWAA Serverless workflow run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from airflow.providers.amazon.aws.operators.mwaa_serverless import (
MwaaServerlessCreateWorkflowOperator,
MwaaServerlessDeleteWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
MwaaServerlessStopWorkflowRunOperator,
MwaaServerlessUpdateWorkflowOperator,
Expand All @@ -36,9 +37,8 @@
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import TriggerRule, task
from airflow.sdk import TriggerRule
Comment thread
kars0508 marked this conversation as resolved.
else:
from airflow.decorators import task # type: ignore[attr-defined,no-redef]
from airflow.utils.trigger_rule import TriggerRule # type: ignore[no-redef,attr-defined]

DAG_ID = "example_mwaa_serverless"
Expand All @@ -62,14 +62,6 @@
sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()


@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_workflow(workflow_arn: str):
"""Delete the MWAA Serverless workflow."""
import boto3

boto3.client("mwaa-serverless").delete_workflow(WorkflowArn=workflow_arn)


with DAG(
dag_id=DAG_ID,
schedule=None,
Expand Down Expand Up @@ -142,6 +134,14 @@ def delete_workflow(workflow_arn: str):
)
# [END howto_operator_mwaa_serverless_stop_workflow_run]

# [START howto_operator_mwaa_serverless_delete_workflow]
delete_workflow = MwaaServerlessDeleteWorkflowOperator(
task_id="delete_workflow",
workflow_arn=workflow_arn,
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_mwaa_serverless_delete_workflow]

delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
Expand All @@ -150,16 +150,19 @@ def delete_workflow(workflow_arn: str):
)

chain(
# TEST SETUP
test_context,
create_bucket,
upload_workflow_yaml,
workflow_arn,
# TEST BODY
start_workflow,
wait_for_run,
update_workflow,
Comment thread
kars0508 marked this conversation as resolved.
start_workflow_2,
stop_workflow_run,
delete_workflow(workflow_arn=workflow_arn),
# TEST TEARDOWN
delete_workflow,
delete_bucket,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.mwaa_serverless import (
MwaaServerlessCreateWorkflowOperator,
MwaaServerlessDeleteWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
MwaaServerlessStopWorkflowRunOperator,
MwaaServerlessUpdateWorkflowOperator,
Expand Down Expand Up @@ -220,6 +221,61 @@ def test_template_fields(self):
validate_template_fields(self.operator)


class TestMwaaServerlessDeleteWorkflowOperator:
def setup_method(self):
self.operator = MwaaServerlessDeleteWorkflowOperator(
task_id="delete_workflow",
workflow_arn=WORKFLOW_ARN,
)

@mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
def test_execute(self, mock_conn):
mock_client = mock.MagicMock()
mock_client.delete_workflow.return_value = {"WorkflowArn": WORKFLOW_ARN}
mock_conn.return_value = mock_client

result = self.operator.execute({})

mock_client.delete_workflow.assert_called_once_with(WorkflowArn=WORKFLOW_ARN)
assert result is None

@mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
def test_execute_with_version(self, mock_conn):
op = MwaaServerlessDeleteWorkflowOperator(
task_id="delete_workflow",
workflow_arn=WORKFLOW_ARN,
workflow_version="abc123def456abc123def456abc123de",
)
mock_client = mock.MagicMock()
mock_client.delete_workflow.return_value = {
"WorkflowArn": WORKFLOW_ARN,
"WorkflowVersion": "abc123def456abc123def456abc123de",
}
mock_conn.return_value = mock_client

result = op.execute({})
Comment thread
kars0508 marked this conversation as resolved.

mock_client.delete_workflow.assert_called_once_with(
WorkflowArn=WORKFLOW_ARN,
WorkflowVersion="abc123def456abc123def456abc123de",
)
assert result is None

@mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
def test_execute_not_found(self, mock_conn):
mock_client = mock.MagicMock()
mock_client.delete_workflow.side_effect = ClientError(
{"Error": {"Code": "ResourceNotFoundException", "Message": "not found"}}, "DeleteWorkflow"
Comment thread
kars0508 marked this conversation as resolved.
)
mock_conn.return_value = mock_client

with pytest.raises(ClientError, match="ResourceNotFoundException"):
self.operator.execute({})

def test_template_fields(self):
validate_template_fields(self.operator)


class TestMwaaServerlessStopWorkflowRunOperator:
def setup_method(self):
self.operator = MwaaServerlessStopWorkflowRunOperator(
Expand Down
Loading