Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] Fix ExecutionMode.AIRFLOW_ASYNC query #1260

Closed
2 tasks
tatiana opened this issue Oct 21, 2024 · 9 comments · Fixed by #1474
Closed
2 tasks

[bug] Fix ExecutionMode.AIRFLOW_ASYNC query #1260

tatiana opened this issue Oct 21, 2024 · 9 comments · Fixed by #1474
Assignees
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc bug Something isn't working dbt:run Primarily related to dbt run command or functionality execution:async Related to the Async execution mode priority:high High priority issues are blocking or critical issues without a workaround and large impact profile:bigquery Related to BigQuery ProfileConfig
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Oct 21, 2024

Context

In Cosmos 1.7, we introduced experimental BQ support to run dbt models with ExecutionMode.AIRFLOW_ASYNC in #1224 and #1230.

While chatting with @joppevos , he identified that the dbt run command:

dbt run --full-refresh

the BQ adaptor seems to create or replace on top of the table, not a drop/create:
https://github.com/dbt-labs/dbt-bigquery/blob/455c76887c9886c517df9619335066bedb1e1a43/dbt/include/bigquery/macros/adapters.sql#L16

Only if the partitions or clusters have changed then it drops
https://github.com/dbt-labs/dbt-bigquery/blob/455c76887c9886c517df9619335066bedb1e1a43/dbt/include/bigquery/macros/materializations/table.sql#L27

Action

  • Confirm this behaviour, by adding a breakpoint in the dbt run command
  • Fix the query behaviour in Cosmos
@tatiana tatiana added the bug Something isn't working label Oct 21, 2024
@dosubot dosubot bot added area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:run Primarily related to dbt run command or functionality profile:bigquery Related to BigQuery ProfileConfig labels Oct 21, 2024
@joppevos
Copy link
Contributor

Thanks for creating the issue @tatiana! I see it's assigned, but I'd love to give it a shot or collaborate.

@pankajkoti
Copy link
Contributor

@joppevos just wished to quickly check on this one -- May I know please if in your deployments, you install Airflow & dbt in the same virtualenv? asking because we're thinking of an approach that could necessarily leverage if they're in the same env.

@joppevos
Copy link
Contributor

joppevos commented Nov 12, 2024

Hey @pankajkoti - I do not use the virtualenv mode. I had a look myself on how to approach this issue, but getting the sql header information from DBT like create, etc is difficult.

@pankajkoti
Copy link
Contributor

hi @joppevos , din't mean the virtualenv mode. But was more curious on whether dbt & Airflow python packages are installed in the same virtual environment or different virtual environments in your deployments. I guess it's fine we will try to cater to both scenarios.

@pankajkoti
Copy link
Contributor

@tatiana and I paired up to debug the dbt run command using the dbt runner included with the Python package. While it took some time to trace through dbt’s multiple abstraction layers—starting from the click CLI interface that triggers the dbt runner—we were able to use breakpoints to locate where the SQL gets constructed & is available for execution. This happens here in the raw_execute method for the dbt BigQuery adapter.

We realized there are several layers of abstraction involved in building the final SQL, including CTEs and DDLs, which account for modes like full-refresh, incremental, and snapshot. Since dbt doesn’t provide an interface to directly expose the SQL, we’ll need to intercept this flow, likely by monkey patching the raw_execute method to capture the SQL. Here are the proposed approaches:
1. Patch the raw_execute method: Capture the generated SQL by writing it to a temporary file, and upload these files to remote storage so they’re accessible to subsequent tasks running on different workers. All this could happen in our Compile task in the async mode that could run the dbt run command & potentially conduct these steps.
2. Use SQL from the target/run directory: According to this reference (thanks to Tatiana), dbt already writes SQL to the target/run directory, and I verified that this directory contains the same SQL we’d capture with approach 1. We could modify dbt run in the Compile tasks for our async execution mode, monkey-patching only the database calls in the raw_execute method to avoid making changes to the database itself while still generating these SQL files.

Monkey patching would be straightforward if dbt and Airflow run in the same virtual environment and process. However, if we use a subprocess or separate virtual environments, the patch won’t work in the subprocess. We’ll need to figure out an effective approach to modify and mock the source code to yield the SQL in these cases.

Since this is more than a quick bug fix, I’d like to discuss whether we should proceed with one of these proposals for Cosmos 1.8 or defer it to Cosmos 1.9. This approach would also help streamline a lot of the async work planned for Cosmos 1.9. If feasible, shifting other priorities to 1.9 to make room for this in Cosmos 1.8 could give us a chance to gather feedback sooner and accelerate our async support. I’m looking forward to everyone’s thoughts.

@tatiana
Copy link
Collaborator Author

tatiana commented Nov 13, 2024

From what we're seeing, the fix for this ticket will be to implement #1261

By implementing #1261, we'd also be closing #1271, #1265 and #1261

@tatiana
Copy link
Collaborator Author

tatiana commented Nov 15, 2024

@pankajkoti @phanikumv and I just spoke to @cmarteepants and she prefers we return to this in January, once we're working on the async support.

@pankajkoti
Copy link
Contributor

Thanks @tatiana for the update! I will move this ticket to the backlog & mark it for Cosmos 1.9.0

@pankajkoti pankajkoti added this to the Cosmos 1.9.0 milestone Nov 15, 2024
Copy link

This issue is stale because it has been open for 30 days with no activity.

@github-actions github-actions bot added the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label Dec 17, 2024
@tatiana tatiana added the execution:async Related to the Async execution mode label Jan 13, 2025
@tatiana tatiana removed stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed stretch goal labels Jan 21, 2025
@tatiana tatiana added the priority:high High priority issues are blocking or critical issues without a workaround and large impact label Jan 27, 2025
pankajkoti added a commit that referenced this issue Feb 5, 2025
…LOW_ASYNC` (#1474)

# Overview

This PR introduces a reliable way to extract SQL statements run by
`dbt-core` so Airflow asynchronous operators can use them. It fixes the
experimental BQ implementation of `ExecutionMode.AIRFLOW_ASYNC`
introduced in Cosmos 1.7 (#1230).

Previously, in #1230, we attempted to understand the implementation of
how `dbt-core` runs `--full-refresh` for BQ, and we hard-coded the SQL
header in Cosmos as an experimental feature. Since then, we realised
that this approach was prone to errors (e.g. #1260) and that it is
unrealistic for Cosmos to try to recreate the logic of how `dbt-core`
and its adaptors generate all the SQL statements for different
operations, data warehouses, and types of materialisation.

With this PR, we use `dbt-core` to create the complete SQL statements
without `dbt-core` running those transformations. This enables better
compatibility with various `dbt-core` features while ensuring
correctness in running models.

The drawback of the current approach is that it relies on monkey
patching, a technique used to dynamically update the behaviour of a
piece of code at run-time. Cosmos is monkey patching `dbt-core` adaptors
methods at the moment that they would generally execute SQL statements -
Cosmos modifies this behaviour so that the SQL statements are writen to
disk without performing any operations to the actual data warehouse.

The main drawback of this strategy is in case dbt changes its interface.
For this reason, we logged the follow-up ticket
#1489 to make sure
we test the latest version of dbt and its adapters and confirm the
monkey patching works as expected regardless of the version being used.
That said, since the method being monkey patched is part of the
`dbt-core` interface with its adaptors, we believe the risks of breaking
changes will be low.

The other challenge with the current approach is that every Cosmos task
relies on the following:
1. `dbt-core` being installed alongside the Airflow installation
2. the execution of a significant part of the `dbtRunner` logic

We have logged a follow-up ticket to evaluate the possibility of
overcoming these challenges: #1477

## Key Changes

1. Mocked BigQuery Adapter Execution:
- Introduced `_mock_bigquery_adapter()` to override
`BigQueryConnectionManager.execute`, ensuring SQL is only written to the
`target` directory and skipping execution in the warehouse.
- The generated SQL is then submitted using Airflow’s
BigQueryInsertJobOperator in deferrable mode.
4. Refactoring `AbstractDbtBaseOperator`:
- Previously, `AbstractDbtBaseOperator` inherited `BaseOperator`,
causing conflicts when used with `BigQueryInsertJobOperator` with
our`EXECUTIONMODE.AIRFLOW_ASYNC` classes and the interface built in
#1483
- Refactored to `AbstractDbtBase` (no longer inheriting `BaseOperator`),
requiring explicit `BaseOperator` initialization in all derived
operators.
- Updated the below existing operators to consider this refactoring
needing derived classes to initialise `BaseOperator`:
        - `DbtAzureContainerInstanceBaseOperator`
        - `DbtDockerBaseOperator`
        - `DbtGcpCloudRunJobBaseOperator`
        - `DbtKubernetesBaseOperator`
5. Changes to dbt Compilation Workflow
- Removed `_add_dbt_compile_task`, which previously pre-generated SQL
and uploaded it to remote storage and subsequent task downloaded this
compiled SQL for their execution.
- Instead, `dbt run` is now directly invoked in each task using the
mocked adapter to generate the full SQL.
- A future
[issue](#1477)
will assess whether we should reintroduce a compile task using the
mocked adapter for SQL generation and upload, reducing redundant dbt
calls in each task.

## Issue updates
The PR fixes the following issues:
1. closes: #1260 
- Previously, we only supported --full-refresh dbt run with static SQL
headers (e.g., CREATE/DROP TABLE).
- Now, we support dynamic SQL headers based on materializations,
including CREATE OR REPLACE TABLE, CREATE OR REPLACE VIEW, etc.
2. closes: #1271 
- dbt macros are evaluated at runtime during dbt run invocation using
mocked adapter, and this PR lays the groundwork for supporting them in
async execution mode.
3. closes: #1265 
- Now, large datasets can avoid full drops and recreations, enabling
incremental model updates.
6. closes: #1261 
- Previously, only tables (--full-refresh) were supported; this PR
implements logic for handling different materializations that dbt
supports like table, view, incremental, ephemeral, and materialized
views.
7. closes: #1266 
- Instead of relying on dbt compile (which only outputs SELECT
statements), we now let dbt generate complete SQL queries, including SQL
headers/DDL statements for the queries corresponding to the resource
nodes and state of tables/views in the backend warehouse
8. closes: #1264 
- We support emitting datasets for `EXECUTIONMODE.AIRFLOW_ASYNC` too
with this PR

## Example DAG showing `EXECUTIONMODE.AIRFLOW_ASYNC` deferring tasks and
the dynamic query submitted in the logs

<img width="1532" alt="Screenshot 2025-02-04 at 1 02 42 PM"
src="https://github.com/user-attachments/assets/baf15864-9bf8-4f35-95b7-954a1f547bfe"
/>


## Next Steps & Considerations:
- It's acknowledged that using mock patching may have downsides,
however, this currently seems the best approach to achieve our goals.
It's understood and accepted the risks associated with this method. To
mitigate them, we are expanding our test coverage to include all
currently supported dbt adapter versions in our test matrix in #1489.
This will ensure compatibility across different dbt versions and helps
us catch potential issues early.
- Further validation of different dbt macros and materializations with
`ExecutionMode.AIRFLOW_ASYNC` by seeking feedback from users by testing
alpha
https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.9.0a5
created with changes from this PR.
- #1477, Compare
the efficiency of generating SQL dynamically vs. pre-compiling and
uploading SQL via a separate task.
- Add compatibility across all major cloud datawarehouse backends (dbt
adapters).

---------

Co-authored-by: Tatiana Al-Chueyr <[email protected]>
Co-authored-by: Pankaj Singh <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc bug Something isn't working dbt:run Primarily related to dbt run command or functionality execution:async Related to the Async execution mode priority:high High priority issues are blocking or critical issues without a workaround and large impact profile:bigquery Related to BigQuery ProfileConfig
Projects
None yet
5 participants