Skip to content

Commit 480e769

Browse files
authored
Fix: Fetch column types if the type is unknown for a partition column in the BQ engine adapter (#1326)
1 parent 89bedc0 commit 480e769

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,9 @@ def insert_overwrite_by_partition(
391391
)
392392

393393
with self.session(), self.temp_table(query_or_df, name=table_name) as temp_table_name:
394-
if columns_to_types is None:
394+
if columns_to_types is None or columns_to_types[
395+
partition_column.name
396+
] == exp.DataType.build("unknown"):
395397
columns_to_types = self.columns(temp_table_name)
396398

397399
partition_type_sql = columns_to_types[partition_column.name].sql(dialect=self.dialect)

tests/core/engine_adapter/test_bigquery.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,52 @@ def test_insert_overwrite_by_partition_query(
7575
]
7676

7777

78+
def test_insert_overwrite_by_partition_query_unknown_column_types(
79+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
80+
):
81+
adapter = make_mocked_engine_adapter(BigQueryEngineAdapter)
82+
execute_mock = mocker.patch(
83+
"sqlmesh.core.engine_adapter.bigquery.BigQueryEngineAdapter.execute"
84+
)
85+
86+
columns_mock = mocker.patch(
87+
"sqlmesh.core.engine_adapter.bigquery.BigQueryEngineAdapter.columns"
88+
)
89+
columns_mock.return_value = {
90+
"a": exp.DataType.build("int"),
91+
"ds": exp.DataType.build("DATETIME"),
92+
}
93+
94+
temp_table_uuid = uuid.uuid4()
95+
uuid4_mock = mocker.patch("uuid.uuid4")
96+
uuid4_mock.return_value = temp_table_uuid
97+
98+
adapter.insert_overwrite_by_partition(
99+
"test_schema.test_table",
100+
parse_one("SELECT a, ds FROM tbl"),
101+
partitioned_by=[
102+
d.parse_one("DATETIME_TRUNC(ds, MONTH)"),
103+
],
104+
columns_to_types={
105+
"a": exp.DataType.build("unknown"),
106+
"ds": exp.DataType.build("UNKNOWN"),
107+
},
108+
)
109+
110+
columns_mock.assert_called_once_with(
111+
exp.to_table(f"test_schema.__temp_test_table_{temp_table_uuid.hex}")
112+
)
113+
114+
sql_calls = _to_sql_calls(execute_mock)
115+
assert sql_calls == [
116+
"CREATE SCHEMA IF NOT EXISTS `test_schema`",
117+
f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_uuid.hex}` AS SELECT `a`, `ds` FROM `tbl`",
118+
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(DISTINCT DATETIME_TRUNC(ds, MONTH)) FROM test_schema.__temp_test_table_{temp_table_uuid.hex});",
119+
f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT * FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_uuid.hex}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)",
120+
f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_uuid.hex}`",
121+
]
122+
123+
78124
def test_insert_overwrite_by_time_partition_pandas(
79125
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
80126
):

0 commit comments

Comments
 (0)