Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Jun 21, 2024
1 parent 6496089 commit 6d0e10a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 138 deletions.
74 changes: 18 additions & 56 deletions python-sdk/src/astro/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ class BaseDatabase(ABC):
NATIVE_PATHS: dict[Any, Any] = {}
DEFAULT_SCHEMA = SCHEMA
NATIVE_LOAD_EXCEPTIONS: Any = DatabaseCustomError
NATIVE_AUTODETECT_SCHEMA_CONFIG: Mapping[
FileLocation, Mapping[str, list[FileType] | Callable]
] = {}
NATIVE_AUTODETECT_SCHEMA_CONFIG: Mapping[FileLocation, Mapping[str, list[FileType] | Callable]] = {}
FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED: set[FileLocation] = set()

def __init__(
Expand Down Expand Up @@ -184,8 +182,7 @@ def columns_exist(self, table: BaseTable, columns: list[str]) -> bool:
"""
sqla_table = self.get_sqla_table(table)
return all(
any(sqla_column.name == column for sqla_column in sqla_table.columns)
for column in columns
any(sqla_column.name == column for sqla_column in sqla_table.columns) for column in columns
)

def table_exists(self, table: BaseTable) -> bool:
Expand Down Expand Up @@ -281,9 +278,7 @@ def create_table_using_native_schema_autodetection(
:param table: The table to be created.
:param file: File used to infer the new table columns.
"""
raise NotImplementedError(
"Missing implementation of native schema autodetection."
)
raise NotImplementedError("Missing implementation of native schema autodetection.")

def create_table_using_schema_autodetection(
self,
Expand All @@ -308,9 +303,7 @@ def create_table_using_schema_autodetection(
)
source_dataframe = dataframe
else:
source_dataframe = file.export_to_dataframe(
nrows=LOAD_TABLE_AUTODETECT_ROWS_COUNT
)
source_dataframe = file.export_to_dataframe(nrows=LOAD_TABLE_AUTODETECT_ROWS_COUNT)

try:
db = SQLDatabase(engine=self.sqlalchemy_engine)
Expand Down Expand Up @@ -357,11 +350,7 @@ def create_table(
"""
if table.columns:
self.create_table_using_columns(table)
elif (
use_native_support
and file
and self.is_native_autodetect_schema_available(file)
):
elif use_native_support and file and self.is_native_autodetect_schema_available(file):
self.create_table_using_native_schema_autodetection(table, file)
else:
self.create_table_using_schema_autodetection(
Expand Down Expand Up @@ -389,19 +378,15 @@ def create_table_from_select_statement(
statement = self._create_table_statement.format(
self.get_table_qualified_name(target_table), statement
)
self.run_sql(
sql=statement, parameters=parameters, query_modifier=query_modifier
)
self.run_sql(sql=statement, parameters=parameters, query_modifier=query_modifier)

def drop_table(self, table: BaseTable) -> None:
"""
Delete a SQL table, if it exists.
:param table: The table to be deleted.
"""
statement = self._drop_table_statement.format(
self.get_table_qualified_name(table)
)
statement = self._drop_table_statement.format(self.get_table_qualified_name(table))
self.run_sql(statement)

# ---------------------------------------------------------
Expand All @@ -426,21 +411,13 @@ def create_table_if_needed(
:param if_exists: Overwrite file if exists
:param use_native_support: Use native support for data transfer if available on the destination
"""
is_schema_autodetection_supported = (
self.check_schema_autodetection_is_supported(source_file=file)
)
is_schema_autodetection_supported = self.check_schema_autodetection_is_supported(source_file=file)
is_file_pattern_based_schema_autodetection_supported = (
self.check_file_pattern_based_schema_autodetection_is_supported(
source_file=file
)
self.check_file_pattern_based_schema_autodetection_is_supported(source_file=file)
)
if if_exists == "replace":
self.drop_table(table)
if (
use_native_support
and is_schema_autodetection_supported
and not file.is_pattern()
):
if use_native_support and is_schema_autodetection_supported and not file.is_pattern():
return
if (
use_native_support
Expand Down Expand Up @@ -532,9 +509,7 @@ def load_file_to_table(
)
use_native_support = False

self.create_schema_if_applicable(
output_table.metadata.schema, assume_schema_exists
)
self.create_schema_if_applicable(output_table.metadata.schema, assume_schema_exists)

self.create_table_if_needed(
file=input_file,
Expand Down Expand Up @@ -711,12 +686,8 @@ def append_table(
target_columns = [column(col) for col in target_table_sqla.c.keys()]
source_columns = target_columns
else:
target_columns = [
column(col) for col in source_to_target_columns_map.values()
]
source_columns = [
column(col) for col in source_to_target_columns_map.keys()
]
target_columns = [column(col) for col in source_to_target_columns_map.values()]
source_columns = [column(col) for col in source_to_target_columns_map.keys()]

sel = select(source_columns).select_from(source_table_sqla)
# TODO: We should fix the following Type Error
Expand Down Expand Up @@ -773,15 +744,11 @@ def export_table_to_pandas_dataframe(

if self.table_exists(source_table):
sqla_table = self.get_sqla_table(source_table)
df = pd.read_sql(
sql=sqla_table.select(**select_kwargs), con=self.sqlalchemy_engine
)
df = pd.read_sql(sql=sqla_table.select(**select_kwargs), con=self.sqlalchemy_engine)
return PandasDataframe.from_pandas_df(df)

table_qualified_name = self.get_table_qualified_name(source_table)
raise NonExistentTableException(
f"The table {table_qualified_name} does not exist"
)
raise NonExistentTableException(f"The table {table_qualified_name} does not exist")

def export_table_to_file(
self,
Expand Down Expand Up @@ -926,9 +893,7 @@ def check_schema_autodetection_is_supported(self, source_file: File) -> bool:
:param source_file: File from which we need to transfer data
"""
filetype_supported = self.NATIVE_AUTODETECT_SCHEMA_CONFIG.get(
source_file.location.location_type
)
filetype_supported = self.NATIVE_AUTODETECT_SCHEMA_CONFIG.get(source_file.location.location_type)

source_filetype = (
source_file
Expand All @@ -945,18 +910,15 @@ def check_schema_autodetection_is_supported(self, source_file: File) -> bool:
location_type = self.NATIVE_PATHS.get(source_file.location.location_type)
return bool(location_type and is_source_filetype_supported)

def check_file_pattern_based_schema_autodetection_is_supported(
self, source_file: File
) -> bool:
def check_file_pattern_based_schema_autodetection_is_supported(self, source_file: File) -> bool:
"""
Checks if schema autodetection is handled natively by the database for file
patterns and prefixes.
:param source_file: File from which we need to transfer data
"""
is_file_pattern_based_schema_autodetection_supported = (
source_file.location.location_type
in self.FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED
source_file.location.location_type in self.FILE_PATTERN_BASED_AUTODETECT_SCHEMA_SUPPORTED
)
return is_file_pattern_based_schema_autodetection_supported

Expand Down
8 changes: 2 additions & 6 deletions python-sdk/src/astro/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ def merge_table(

append_column_names = list(source_to_target_columns_map.keys())
target_column_names = list(source_to_target_columns_map.values())
update_statements = [
f"{col_name}=EXCLUDED.{col_name}" for col_name in target_column_names
]
update_statements = [f"{col_name}=EXCLUDED.{col_name}" for col_name in target_column_names]

query = statement.format(
target_columns=",".join(target_column_names),
Expand All @@ -130,9 +128,7 @@ def get_sqla_table(self, table: BaseTable) -> SqlaTable:
:param table: Astro Table to be converted to SQLAlchemy table instance
"""
return SqlaTable(
table.name, SqlaMetaData(), autoload_with=self.sqlalchemy_engine
)
return SqlaTable(table.name, SqlaMetaData(), autoload_with=self.sqlalchemy_engine)

def openlineage_dataset_name(self, table: BaseTable) -> str:
"""
Expand Down
Loading

0 comments on commit 6d0e10a

Please sign in to comment.