Skip to content

Commit

Permalink
style: fix ruff issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Jun 21, 2024
1 parent e5f5e82 commit 6496089
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 80 deletions.
87 changes: 65 additions & 22 deletions python-sdk/src/astro/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ class BaseDatabase(ABC):
NATIVE_PATHS: dict[Any, Any] = {}
DEFAULT_SCHEMA = SCHEMA
NATIVE_LOAD_EXCEPTIONS: Any = DatabaseCustomError
NATIVE_AUTODETECT_SCHEMA_CONFIG: Mapping[FileLocation, Mapping[str, list[FileType] | Callable]] = {}
NATIVE_AUTODETECT_SCHEMA_CONFIG: Mapping[
FileLocation, Mapping[str, list[FileType] | Callable]
] = {}
FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED: set[FileLocation] = set()

def __init__(
Expand Down Expand Up @@ -182,7 +184,8 @@ def columns_exist(self, table: BaseTable, columns: list[str]) -> bool:
"""
sqla_table = self.get_sqla_table(table)
return all(
any(sqla_column.name == column for sqla_column in sqla_table.columns) for column in columns
any(sqla_column.name == column for sqla_column in sqla_table.columns)
for column in columns
)

def table_exists(self, table: BaseTable) -> bool:
Expand All @@ -205,7 +208,7 @@ def get_merge_initialization_query(parameters: tuple) -> str:
it agnostic to database.
"""
constraints = ",".join(parameters)
sql = "ALTER TABLE {{table}} ADD CONSTRAINT airflow UNIQUE (%s)" % constraints
sql = f"ALTER TABLE {{{{table}}}} ADD CONSTRAINT airflow UNIQUE ({constraints})"
return sql

@staticmethod
Expand Down Expand Up @@ -278,7 +281,9 @@ def create_table_using_native_schema_autodetection(
:param table: The table to be created.
:param file: File used to infer the new table columns.
"""
raise NotImplementedError("Missing implementation of native schema autodetection.")
raise NotImplementedError(
"Missing implementation of native schema autodetection."
)

def create_table_using_schema_autodetection(
self,
Expand All @@ -303,7 +308,9 @@ def create_table_using_schema_autodetection(
)
source_dataframe = dataframe
else:
source_dataframe = file.export_to_dataframe(nrows=LOAD_TABLE_AUTODETECT_ROWS_COUNT)
source_dataframe = file.export_to_dataframe(
nrows=LOAD_TABLE_AUTODETECT_ROWS_COUNT
)

try:
db = SQLDatabase(engine=self.sqlalchemy_engine)
Expand All @@ -319,7 +326,8 @@ def create_table_using_schema_autodetection(
)

def is_native_autodetect_schema_available( # skipcq: PYL-R0201
self, file: File # skipcq: PYL-W0613
self,
file: File, # skipcq: PYL-W0613
) -> bool:
"""
Check if native auto detection of schema is available.
Expand Down Expand Up @@ -349,7 +357,11 @@ def create_table(
"""
if table.columns:
self.create_table_using_columns(table)
elif use_native_support and file and self.is_native_autodetect_schema_available(file):
elif (
use_native_support
and file
and self.is_native_autodetect_schema_available(file)
):
self.create_table_using_native_schema_autodetection(table, file)
else:
self.create_table_using_schema_autodetection(
Expand Down Expand Up @@ -377,15 +389,19 @@ def create_table_from_select_statement(
statement = self._create_table_statement.format(
self.get_table_qualified_name(target_table), statement
)
self.run_sql(sql=statement, parameters=parameters, query_modifier=query_modifier)
self.run_sql(
sql=statement, parameters=parameters, query_modifier=query_modifier
)

def drop_table(self, table: BaseTable) -> None:
"""
Delete a SQL table, if it exists.
:param table: The table to be deleted.
"""
statement = self._drop_table_statement.format(self.get_table_qualified_name(table))
statement = self._drop_table_statement.format(
self.get_table_qualified_name(table)
)
self.run_sql(statement)

# ---------------------------------------------------------
Expand All @@ -410,13 +426,21 @@ def create_table_if_needed(
:param if_exists: Overwrite file if exists
:param use_native_support: Use native support for data transfer if available on the destination
"""
is_schema_autodetection_supported = self.check_schema_autodetection_is_supported(source_file=file)
is_schema_autodetection_supported = (
self.check_schema_autodetection_is_supported(source_file=file)
)
is_file_pattern_based_schema_autodetection_supported = (
self.check_file_pattern_based_schema_autodetection_is_supported(source_file=file)
self.check_file_pattern_based_schema_autodetection_is_supported(
source_file=file
)
)
if if_exists == "replace":
self.drop_table(table)
if use_native_support and is_schema_autodetection_supported and not file.is_pattern():
if (
use_native_support
and is_schema_autodetection_supported
and not file.is_pattern()
):
return
if (
use_native_support
Expand Down Expand Up @@ -508,7 +532,9 @@ def load_file_to_table(
)
use_native_support = False

self.create_schema_if_applicable(output_table.metadata.schema, assume_schema_exists)
self.create_schema_if_applicable(
output_table.metadata.schema, assume_schema_exists
)

self.create_table_if_needed(
file=input_file,
Expand Down Expand Up @@ -685,8 +711,12 @@ def append_table(
target_columns = [column(col) for col in target_table_sqla.c.keys()]
source_columns = target_columns
else:
target_columns = [column(col) for col in source_to_target_columns_map.values()]
source_columns = [column(col) for col in source_to_target_columns_map.keys()]
target_columns = [
column(col) for col in source_to_target_columns_map.values()
]
source_columns = [
column(col) for col in source_to_target_columns_map.keys()
]

sel = select(source_columns).select_from(source_table_sqla)
# TODO: We should fix the following Type Error
Expand Down Expand Up @@ -743,11 +773,15 @@ def export_table_to_pandas_dataframe(

if self.table_exists(source_table):
sqla_table = self.get_sqla_table(source_table)
df = pd.read_sql(sql=sqla_table.select(**select_kwargs), con=self.sqlalchemy_engine)
df = pd.read_sql(
sql=sqla_table.select(**select_kwargs), con=self.sqlalchemy_engine
)
return PandasDataframe.from_pandas_df(df)

table_qualified_name = self.get_table_qualified_name(source_table)
raise NonExistentTableException(f"The table {table_qualified_name} does not exist")
raise NonExistentTableException(
f"The table {table_qualified_name} does not exist"
)

def export_table_to_file(
self,
Expand Down Expand Up @@ -801,7 +835,9 @@ def schema_exists(self, schema: str) -> bool:
# ---------------------------------------------------------

def get_sqlalchemy_template_table_identifier_and_parameter(
self, table: BaseTable, jinja_table_identifier: str # skipcq PYL-W0613
self,
table: BaseTable,
jinja_table_identifier: str, # skipcq PYL-W0613
) -> tuple[str, str]:
"""
During the conversion from a Jinja-templated SQL query to a SQLAlchemy query, there is the need to
Expand Down Expand Up @@ -853,7 +889,9 @@ def parameterize_variable(self, variable: str):
return ":" + variable

def is_native_load_file_available( # skipcq: PYL-R0201
self, source_file: File, target_table: BaseTable # skipcq: PYL-W0613
self,
source_file: File,
target_table: BaseTable, # skipcq: PYL-W0613
) -> bool:
"""
Check if there is an optimised path for source to destination.
Expand Down Expand Up @@ -888,7 +926,9 @@ def check_schema_autodetection_is_supported(self, source_file: File) -> bool:
:param source_file: File from which we need to transfer data
"""
filetype_supported = self.NATIVE_AUTODETECT_SCHEMA_CONFIG.get(source_file.location.location_type)
filetype_supported = self.NATIVE_AUTODETECT_SCHEMA_CONFIG.get(
source_file.location.location_type
)

source_filetype = (
source_file
Expand All @@ -905,15 +945,18 @@ def check_schema_autodetection_is_supported(self, source_file: File) -> bool:
location_type = self.NATIVE_PATHS.get(source_file.location.location_type)
return bool(location_type and is_source_filetype_supported)

def check_file_pattern_based_schema_autodetection_is_supported(self, source_file: File) -> bool:
def check_file_pattern_based_schema_autodetection_is_supported(
self, source_file: File
) -> bool:
"""
Checks if schema autodetection is handled natively by the database for file
patterns and prefixes.
:param source_file: File from which we need to transfer data
"""
is_file_pattern_based_schema_autodetection_supported = (
source_file.location.location_type in self.FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED
source_file.location.location_type
in self.FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED
)
return is_file_pattern_based_schema_autodetection_supported

Expand Down
11 changes: 8 additions & 3 deletions python-sdk/src/astro/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def get_merge_initialization_query(parameters: tuple) -> str:
"""
Handles database-specific logic to handle index for DuckDB.
"""
return "CREATE UNIQUE INDEX merge_index ON {{table}}(%s)" % ",".join(parameters) # skipcq PYL-C0209
joined_parameters = ",".join(parameters)
return f"CREATE UNIQUE INDEX merge_index ON {{{{table}}}}({joined_parameters})"

def merge_table(
self,
Expand Down Expand Up @@ -109,7 +110,9 @@ def merge_table(

append_column_names = list(source_to_target_columns_map.keys())
target_column_names = list(source_to_target_columns_map.values())
update_statements = [f"{col_name}=EXCLUDED.{col_name}" for col_name in target_column_names]
update_statements = [
f"{col_name}=EXCLUDED.{col_name}" for col_name in target_column_names
]

query = statement.format(
target_columns=",".join(target_column_names),
Expand All @@ -127,7 +130,9 @@ def get_sqla_table(self, table: BaseTable) -> SqlaTable:
:param table: Astro Table to be converted to SQLAlchemy table instance
"""
return SqlaTable(table.name, SqlaMetaData(), autoload_with=self.sqlalchemy_engine)
return SqlaTable(
table.name, SqlaMetaData(), autoload_with=self.sqlalchemy_engine
)

def openlineage_dataset_name(self, table: BaseTable) -> str:
"""
Expand Down
Loading

0 comments on commit 6496089

Please sign in to comment.