Skip to content

Commit 369dde0

Browse files
committed
refactor: add failsafe mechanism for the stable compiler configuration
1 parent 915cce5 commit 369dde0

File tree

3 files changed

+239
-69
lines changed

3 files changed

+239
-69
lines changed

bigframes/core/compile/__init__.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,30 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
from typing import Any
16+
from typing import Literal
1717

18-
from bigframes import options
1918
from bigframes.core.compile.api import test_only_ibis_inferred_schema
2019
from bigframes.core.compile.configs import CompileRequest, CompileResult
2120

2221

23-
def compiler() -> Any:
24-
"""Returns the appropriate compiler module based on session options."""
25-
if options.experiments.sql_compiler == "experimental":
22+
def compile_sql(
23+
request: CompileRequest,
24+
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
25+
) -> CompileResult:
26+
"""Compiles a BigFrameNode according to the request into SQL."""
27+
if compiler_name == "sqlglot":
2628
import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler
2729

28-
return sqlglot_compiler
30+
return sqlglot_compiler.compile_sql(request)
2931
else:
3032
import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler
3133

32-
return ibis_compiler
34+
return ibis_compiler.compile_sql(request)
3335

3436

3537
__all__ = [
3638
"test_only_ibis_inferred_schema",
3739
"CompileRequest",
3840
"CompileResult",
39-
"compiler",
41+
"compile_sql",
4042
]

bigframes/session/bq_caching_executor.py

Lines changed: 119 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import math
1818
import threading
1919
from typing import Literal, Mapping, Optional, Sequence, Tuple
20+
import uuid
21+
import warnings
2022

2123
import google.api_core.exceptions
2224
from google.cloud import bigquery
2325
import google.cloud.bigquery.job as bq_job
2426
import google.cloud.bigquery.table as bq_table
2527
import google.cloud.bigquery_storage_v1
28+
import google.cloud.exceptions
2629

2730
import bigframes
2831
from bigframes import exceptions as bfe
@@ -108,6 +111,43 @@ def __init__(
108111
)
109112
self._upload_lock = threading.Lock()
110113

114+
def _compile(
115+
self,
116+
node: nodes.BigFrameNode,
117+
*,
118+
ordered: bool = False,
119+
peek: Optional[int] = None,
120+
materialize_all_order_keys: bool = False,
121+
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
122+
) -> compile.CompileResult:
123+
return compile.compile_sql(
124+
compile.CompileRequest(
125+
node,
126+
sort_rows=ordered,
127+
peek_count=peek,
128+
materialize_all_order_keys=materialize_all_order_keys,
129+
),
130+
compiler_name=compiler_name,
131+
)
132+
133+
def _with_fallback(self, run_fn):
134+
compiler_option = bigframes.options.experiments.sql_compiler
135+
if compiler_option == "legacy":
136+
return run_fn("ibis")
137+
elif compiler_option == "experimental":
138+
return run_fn("sqlglot")
139+
else: # stable
140+
compiler_id = f"{uuid.uuid1().hex[:12]}"
141+
try:
142+
return run_fn("sqlglot", compiler_id=compiler_id)
143+
except google.cloud.exceptions.BadRequest as e:
144+
msg = bfe.format_message(
145+
f"Compiler ID {compiler_id}: BadRequest on sqlglot. "
146+
f"Falling back to ibis. Details: {e.message}"
147+
)
148+
warnings.warn(msg, category=UserWarning)
149+
return run_fn("ibis", compiler_id=compiler_id)
150+
111151
def to_sql(
112152
self,
113153
array_value: bigframes.core.ArrayValue,
@@ -123,9 +163,7 @@ def to_sql(
123163
else array_value.node
124164
)
125165
node = self._substitute_large_local_sources(node)
126-
compiled = compile.compiler().compile_sql(
127-
compile.CompileRequest(node, sort_rows=ordered)
128-
)
166+
compiled = self._compile(node, ordered=ordered)
129167
return compiled.sql
130168

131169
def execute(
@@ -241,46 +279,56 @@ def _export_gbq(
241279
# validate destination table
242280
existing_table = self._maybe_find_existing_table(spec)
243281

244-
compiled = compile.compiler().compile_sql(
245-
compile.CompileRequest(plan, sort_rows=False)
246-
)
247-
sql = compiled.sql
282+
def run_with_compiler(compiler_name, compiler_id=None):
283+
compiled = self._compile(plan, ordered=False, compiler_name=compiler_name)
284+
sql = compiled.sql
248285

249-
if (existing_table is not None) and _if_schema_match(
250-
existing_table.schema, array_value.schema
251-
):
252-
# b/409086472: Uses DML for table appends and replacements to avoid
253-
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
254-
# https://cloud.google.com/bigquery/quotas#standard_tables
255-
job_config = bigquery.QueryJobConfig()
286+
if (existing_table is not None) and _if_schema_match(
287+
existing_table.schema, array_value.schema
288+
):
289+
# b/409086472: Uses DML for table appends and replacements to avoid
290+
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
291+
# https://cloud.google.com/bigquery/quotas#standard_tables
292+
job_config = bigquery.QueryJobConfig()
293+
294+
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
295+
if spec.if_exists == "append":
296+
sql = sg_sql.to_sql(
297+
sg_sql.insert(ir.expr.as_select_all(), spec.table)
298+
)
299+
else: # for "replace"
300+
assert spec.if_exists == "replace"
301+
sql = sg_sql.to_sql(
302+
sg_sql.replace(ir.expr.as_select_all(), spec.table)
303+
)
304+
else:
305+
dispositions = {
306+
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
307+
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
308+
"append": bigquery.WriteDisposition.WRITE_APPEND,
309+
}
310+
job_config = bigquery.QueryJobConfig(
311+
write_disposition=dispositions[spec.if_exists],
312+
destination=spec.table,
313+
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
314+
)
256315

257-
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
258-
if spec.if_exists == "append":
259-
sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table))
260-
else: # for "replace"
261-
assert spec.if_exists == "replace"
262-
sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table))
263-
else:
264-
dispositions = {
265-
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
266-
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
267-
"append": bigquery.WriteDisposition.WRITE_APPEND,
268-
}
269-
job_config = bigquery.QueryJobConfig(
270-
write_disposition=dispositions[spec.if_exists],
271-
destination=spec.table,
272-
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
316+
# Attach data type usage to the job labels
317+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
318+
job_config.labels["bigframes-compiler"] = (
319+
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
273320
)
274321

275-
# Attach data type usage to the job labels
276-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
277-
# TODO(swast): plumb through the api_name of the user-facing api that
278-
# caused this query.
279-
iterator, job = self._run_execute_query(
280-
sql=sql,
281-
job_config=job_config,
282-
session=array_value.session,
283-
)
322+
# TODO(swast): plumb through the api_name of the user-facing api that
323+
# caused this query.
324+
iterator, job = self._run_execute_query(
325+
sql=sql,
326+
job_config=job_config,
327+
session=array_value.session,
328+
)
329+
return iterator, job
330+
331+
iterator, job = self._with_fallback(run_with_compiler)
284332

285333
has_special_dtype_col = any(
286334
t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE)
@@ -599,34 +647,44 @@ def _execute_plan_gbq(
599647
]
600648
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]
601649

602-
compiled = compile.compiler().compile_sql(
603-
compile.CompileRequest(
650+
def run_with_compiler(compiler_name, compiler_id=None):
651+
compiled = self._compile(
604652
plan,
605-
sort_rows=ordered,
606-
peek_count=peek,
653+
ordered=ordered,
654+
peek=peek,
607655
materialize_all_order_keys=(cache_spec is not None),
656+
compiler_name=compiler_name,
608657
)
609-
)
610-
# might have more columns than og schema, for hidden ordering columns
611-
compiled_schema = compiled.sql_schema
658+
# might have more columns than og schema, for hidden ordering columns
659+
compiled_schema = compiled.sql_schema
612660

613-
destination_table: Optional[bigquery.TableReference] = None
661+
destination_table: Optional[bigquery.TableReference] = None
614662

615-
job_config = bigquery.QueryJobConfig()
616-
if create_table:
617-
destination_table = self.storage_manager.create_temp_table(
618-
compiled_schema, cluster_cols
663+
job_config = bigquery.QueryJobConfig()
664+
if create_table:
665+
destination_table = self.storage_manager.create_temp_table(
666+
compiled_schema, cluster_cols
667+
)
668+
job_config.destination = destination_table
669+
670+
# Attach data type usage to the job labels
671+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
672+
job_config.labels["bigframes-compiler"] = (
673+
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
619674
)
620-
job_config.destination = destination_table
621-
622-
# Attach data type usage to the job labels
623-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
624-
iterator, query_job = self._run_execute_query(
625-
sql=compiled.sql,
626-
job_config=job_config,
627-
query_with_job=(destination_table is not None),
628-
session=plan.session,
629-
)
675+
676+
iterator, query_job = self._run_execute_query(
677+
sql=compiled.sql,
678+
job_config=job_config,
679+
query_with_job=(destination_table is not None),
680+
session=plan.session,
681+
)
682+
return iterator, query_job, compiled
683+
684+
iterator, query_job, compiled = self._with_fallback(run_with_compiler)
685+
686+
# might have more columns than og schema, for hidden ordering columns
687+
compiled_schema = compiled.sql_schema
630688

631689
# we could actually cache even when caching is not explicitly requested, but being conservative for now
632690
result_bq_data = None
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from unittest import mock
16+
17+
import google.cloud.bigquery as bigquery
18+
import google.cloud.exceptions
19+
import pyarrow as pa
20+
import pytest
21+
22+
import bigframes
23+
import bigframes.core.nodes as nodes
24+
import bigframes.core.schema as schemata
25+
from bigframes.session.bq_caching_executor import BigQueryCachingExecutor
26+
27+
28+
@pytest.fixture
29+
def mock_executor():
30+
bqclient = mock.create_autospec(bigquery.Client)
31+
bqclient.project = "test-project"
32+
storage_manager = mock.Mock()
33+
bqstoragereadclient = mock.Mock()
34+
loader = mock.Mock()
35+
publisher = mock.Mock()
36+
return BigQueryCachingExecutor(
37+
bqclient, storage_manager, bqstoragereadclient, loader, publisher=publisher
38+
)
39+
40+
41+
def test_compiler_with_fallback_legacy(mock_executor):
42+
run_fn = mock.Mock()
43+
with bigframes.option_context("experiments.sql_compiler", "legacy"):
44+
mock_executor._with_fallback(run_fn)
45+
run_fn.assert_called_once_with("ibis")
46+
47+
48+
def test_compiler_with_fallback_experimental(mock_executor):
49+
run_fn = mock.Mock()
50+
with bigframes.option_context("experiments.sql_compiler", "experimental"):
51+
mock_executor._with_fallback(run_fn)
52+
run_fn.assert_called_once_with("sqlglot")
53+
54+
55+
def test_compiler_with_fallback_stable_success(mock_executor):
56+
run_fn = mock.Mock()
57+
with bigframes.option_context("experiments.sql_compiler", "stable"):
58+
mock_executor._with_fallback(run_fn)
59+
run_fn.assert_called_once_with("sqlglot", compiler_id=mock.ANY)
60+
61+
62+
def test_compiler_execute_plan_gbq_fallback_labels(mock_executor):
63+
plan = mock.create_autospec(nodes.BigFrameNode)
64+
plan.schema = schemata.ArraySchema(tuple())
65+
plan.session = None
66+
67+
# Mock prepare_plan
68+
mock_executor.prepare_plan = mock.Mock(return_value=plan)
69+
70+
# Mock _compile
71+
from bigframes.core.compile.configs import CompileResult
72+
73+
fake_compiled = CompileResult(
74+
sql="SELECT 1", sql_schema=[], row_order=None, encoded_type_refs="fake_refs"
75+
)
76+
mock_executor._compile = mock.Mock(return_value=fake_compiled)
77+
78+
# Mock _run_execute_query to fail first time, then succeed
79+
mock_iterator = mock.Mock()
80+
mock_iterator.total_rows = 0
81+
mock_iterator.to_arrow.return_value = pa.Table.from_arrays([], names=[])
82+
mock_query_job = mock.Mock(spec=bigquery.QueryJob)
83+
mock_query_job.destination = None
84+
85+
error = google.cloud.exceptions.BadRequest("failed")
86+
error.job = mock.Mock(spec=bigquery.QueryJob) # type: ignore
87+
error.job.job_id = "failed_job_id" # type: ignore
88+
89+
mock_executor._run_execute_query = mock.Mock(
90+
side_effect=[error, (mock_iterator, mock_query_job)]
91+
)
92+
93+
with bigframes.option_context("experiments.sql_compiler", "stable"), pytest.warns(
94+
UserWarning, match="Falling back to ibis"
95+
):
96+
mock_executor._execute_plan_gbq(plan, ordered=False, must_create_table=False)
97+
98+
# Verify labels for both calls
99+
assert mock_executor._run_execute_query.call_count == 2
100+
101+
call_1_kwargs = mock_executor._run_execute_query.call_args_list[0][1]
102+
call_2_kwargs = mock_executor._run_execute_query.call_args_list[1][1]
103+
104+
label_1 = call_1_kwargs["job_config"].labels["bigframes-compiler"]
105+
label_2 = call_2_kwargs["job_config"].labels["bigframes-compiler"]
106+
107+
assert label_1.startswith("sqlglot-")
108+
assert label_2.startswith("ibis-")
109+
# Both should have the same compiler_id suffix
110+
assert label_1.split("-")[1] == label_2.split("-")[1]

0 commit comments

Comments
 (0)