Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added docker tests
Browse files Browse the repository at this point in the history
atulya-astronomer committed Dec 17, 2024
1 parent 118cf57 commit 6f23f93
Showing 2 changed files with 35 additions and 8 deletions.
25 changes: 17 additions & 8 deletions astronomer_starship/compat/starship_compatability.py
Original file line number Diff line number Diff line change
@@ -150,7 +150,7 @@ def generic_set_one(session: Session, qualname: str, attrs: dict, **kwargs):
raise e


def generic_delete(session: Session, qualname: str, **kwargs):
def generic_delete(session: Session, qualname: str, **kwargs) -> Response:
from http import HTTPStatus
from sqlalchemy import delete

@@ -161,9 +161,9 @@ def generic_delete(session: Session, qualname: str, **kwargs):
deleted_rows = session.execute(delete(thing_cls).where(*filters)).rowcount
session.commit()
logger.info(f"Deleted {deleted_rows} rows for table {qualname}")
return Response(None, status=HTTPStatus.NO_CONTENT)
return Response(status=HTTPStatus.NO_CONTENT)
except Exception as e:
logger.error(f"Error deleting rows for table {qualname}: {e}")
logger.error(f"Error deleting row(s) for table {qualname}: {e}")
session.rollback()
raise e

@@ -251,7 +251,8 @@ def set_variable(self, **kwargs):
)

def delete_variable(self, **kwargs):
return generic_delete(self.session, "airflow.models.Variable", **kwargs)
attrs = {self.variable_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.Variable", **attrs)

@classmethod
def pool_attrs(cls) -> "Dict[str, AttrDesc]":
@@ -278,7 +279,12 @@ def set_pool(self, **kwargs):
)

def delete_pool(self, **kwargs):
return generic_delete(self.session, "airflow.models.Pool", **kwargs)
attrs = {
self.pool_attrs()[k]["attr"]: v
for k, v in kwargs.items()
if k in self.pool_attrs()
}
return generic_delete(self.session, "airflow.models.Pool", **attrs)

@classmethod
def connection_attrs(cls) -> "Dict[str, AttrDesc]":
@@ -341,7 +347,8 @@ def set_connection(self, **kwargs):
)

def delete_connection(self, **kwargs):
return generic_delete(self.session, "airflow.models.Connection", **kwargs)
attrs = {self.connection_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.Connection", **attrs)

@classmethod
def dag_attrs(cls) -> "Dict[str, AttrDesc]":
@@ -634,7 +641,8 @@ def set_dag_runs(self, dag_runs: list):
return {"dag_runs": dag_runs, "dag_run_count": self._get_dag_run_count(dag_id)}

def delete_dag_runs(self, **kwargs):
return generic_delete(self.session, "airflow.models.DagRun", **kwargs)
attrs = {self.dag_runs_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.DagRun", **attrs)

@classmethod
def task_instances_attrs(cls) -> "Dict[str, AttrDesc]":
@@ -899,7 +907,8 @@ def set_task_instances(self, task_instances: list):
return {"task_instances": task_instances}

def delete_task_instances(self, **kwargs):
return generic_delete(self.session, "airflow.models.TaskInstance", **kwargs)
attrs = {self.task_instances_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.TaskInstance", **attrs)

def insert_directly(self, table_name, items):
from sqlalchemy.exc import InvalidRequestError
18 changes: 18 additions & 0 deletions tests/docker_test/docker_test.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
import os
import pytest

from http import HTTPStatus

from astronomer_starship.compat.starship_compatability import (
StarshipCompatabilityLayer,
get_test_data,
@@ -27,6 +29,10 @@ def test_variables(starship):
actual = starship.get_variables()
assert test_input in actual, actual

test_input = get_test_data(method="DELETE", attrs=starship.variable_attrs())
actual = starship.delete_variable(**test_input)
assert actual.status_code == HTTPStatus.NO_CONTENT, actual


@docker_test
def test_pools(starship):
@@ -45,6 +51,10 @@ def test_pools(starship):
actual = starship.get_pools()
assert expected in actual, actual

test_input = get_test_data(method="DELETE", attrs=starship.pool_attrs())
actual = starship.delete_pool(**test_input)
assert actual.status_code == HTTPStatus.NO_CONTENT, actual


@docker_test
def test_connections(starship):
@@ -55,6 +65,10 @@ def test_connections(starship):
actual = starship.get_connections()
assert test_input in actual, actual

test_input = get_test_data(method="DELETE", attrs=starship.connection_attrs())
actual = starship.delete_connection(**test_input)
assert actual.status_code == HTTPStatus.NO_CONTENT, actual


@docker_test
def test_dags(starship):
@@ -118,3 +132,7 @@ def test_dag_runs_and_task_instances(starship):
assert json.dumps(actual_task_instances, default=str) in json.dumps(
test_input["task_instances"], default=str
), actual_task_instances

test_input = get_test_data(method="DELETE", attrs=starship.dag_runs_attrs())
actual = starship.delete_dag_runs(**test_input)
assert actual.status_code == HTTPStatus.NO_CONTENT, actual

0 comments on commit 6f23f93

Please sign in to comment.