Skip to content

General framework to decorrelate the subqueries #5492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Tracked by #5483 ...
mingmwang opened this issue Mar 7, 2023 · 30 comments
Open
Tracked by #5483 ...

General framework to decorrelate the subqueries #5492

mingmwang opened this issue Mar 7, 2023 · 30 comments
Labels
enhancement New feature or request

Comments

@mingmwang
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

In the current DataFusion, it has very limited support for correlated subqueries. It can only decorrelate the (NOT) IN/Exists predicate subqueries to Semi/Anti Joins. Even in the simplest IN/Exists cases, if the correlated expressions are not in the Filter/Join conditions, the current decorrelate rules will not support them.

In the paper "Unnesting Arbitrary Queries" by T. Neumann; A. Kemper
(http://www.btw-2015.de/res/proceedings/Hauptband/Wiss/Neumann-Unnesting_Arbitrary_Querie.pdf). It raise a mechanism to unnest arbitrary queries. This was already implemented by the Hyper DB:

For example:
select * from orders
where 1 in (select 1 from part left join (select l_partkey from lineitem where o_orderkey = 2) lineitem on p_partkey = lineitem.l_partkey)

https://hyper-db.de/interface.html#

Both SparkSQL and PostgreSQL do not support decorrelate such kind of queries.

Describe the solution you'd like

Describe alternatives you've considered

Additional context

@duongcongtoai
Copy link
Contributor

According to the discussions in this issue, i think we can list the following items to support a subqueries decorrelation framework:

  • Unify the optimizor for correlated query, regardless the query type (exists query, scalar query etc)
  • Support flexible decorrelation scheme (simple vs general approach), we can achieve this by following the algorithm mentioned in the 2nd paper. There is a prerequisite to introduce an index algebra during the rewrite. This index requires a pre-traversing over the whole query to detect all non-trivial subqueries, and answer the question whether simple unnesting is sufficient, or should the framework continue with the general approach
  • Implement general purpose + recursive aware subquery decorrelation for the most major operators (projection, filter, group by) using the top-down algorithm mentioned in the 2nd paper
  • Gradually support more complex expression (group by, order, limit, window function)

@alamb
Copy link
Contributor

alamb commented Apr 28, 2025

@suibianwanwank
Copy link
Contributor

suibianwanwank commented Apr 29, 2025

Unify the optimizor for correlated query, regardless the query type (exists query, scalar query etc)

I think a crucial starting point is to transform all correlated exprs into dependent join. (I'm unsure if we need to implement its Execution Plan before completing all decorrelation).

@alamb
Copy link
Contributor

alamb commented May 15, 2025

In case others haven't heard, @irenjj is working on additional subquery support as part of a Google Summer of Code Project (where @jayzhan211 and I are helping mentor).

Perhaps @suibianwanwank and @duongcongtoai would be interested in being involved too

@duongcongtoai
Copy link
Contributor

duongcongtoai commented May 15, 2025

cool, actually in this PR: #16016 i'm trying to introduce a unified structure to do decorrelation for the following usecases:

  • simple decorrelating plan with linear operators (straight forward projection/predicate pull up/ dependent join push down)
  • more complex decorrelation (group by, windows, limit ...)
  • recursive decorrelation/nested decorrelation

The parts that are not implemented are left with unimplemented! errors. I'm trying to make the PR mergable, so everyone can continue to contribute in parallel, but my current goal is to implement enough to satisfy existing slt tests

@alamb
Copy link
Contributor

alamb commented May 18, 2025

From what I can see, this ticket (a general framework to decorrelate subqueries) is going to be the core of any subsequent improvements for subqueries, so I suggest we pool our efforts together to focus on this first.

I suggest, we all first review the PR from @duongcongtoai

cc @suibianwanwank @duongcongtoai @irenjj

@alamb
Copy link
Contributor

alamb commented May 18, 2025

I recommend we also do some research on existing systems. Can someone provide links to existing implementations in other systems?

@irenjj
Copy link
Contributor

irenjj commented May 18, 2025

I recommend we also do some research on existing systems. Can someone provide links to existing implementations in other systems?

For DuckDB: basic logic is in plan_subquery.cpp file(can't find a specific pr) https://github.com/duckdb/duckdb/blob/main/src/planner/binder/query_node/plan_subquery.cpp, For "Unnesting Arbitrary Subqueries," there is a complete implementation.
And 2nd paper implementation: duckdb/duckdb#17294

@suibianwanwank
Copy link
Contributor

Thanks to @irenjj remind, I found this based on your input: https://github.com/duckdb/duckdb/blob/main/src/planner/binder/query_node/plan_subquery.cpp. If I understand correctly, this should be DuckDB's logic for converting subQuery to DependentJoin.
Additionally, in Calcite, the corresponding logic is here: https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
There should be many similarities between the two.

@duongcongtoai
Copy link
Contributor

duongcongtoai commented May 19, 2025

In this draft, dependent join detection is implemented inside this function. But i wonder if creating an explicit LogicalPlan::DependentJoin make sense in our case? if the only place it is being used is inside the optimizor that does the decorrelation

In duckdb PR he mentioned:

With this PR, that changes. Subqueries are fully algebraized, before entering a single decorrelation pass. This potentially provides optimization opportunities, that would not have been possible before.

I can relates this to the idea in the 2nd paper

At this point the accessing list is empty and we have two choices: either we introduce a
join with 𝐷 := Π𝑎:=𝑇1.𝑎 (𝜎𝑓 =123 (𝑇1)) and use that to substitute 𝑇1.𝑎; or we exploit the fact
that all outer column references are bound in cclasses and we simply replace 𝑇1.𝑎 with
the equivalent 𝑇2.𝑎. The optimal choice depends on the selectivity of 𝜎

which corresponds to this part of the draft PR, and is still within the detail implementation of a specific optimizor

@suibianwanwank
Copy link
Contributor

I'm not sure if I fully understand your point. In this paper, the decorrelate should be based on DependentJoin, as can be seen from the example relational algebra diagram.

Image

@duongcongtoai
Copy link
Contributor

There are one thing we surely know that should be implemented: detect which nodes in the LogicalPlan AST is a dependent join node. However, we don't need to create a new LogicalPlan type like LogicalPlan::DependentJoin, if this plan type is only an intermediate form during the decorrelation phase, and i expect after the rewriting completes, no more dependent join node exists.

=> We can keep this concept internally to the decorelation framework

@irenjj
Copy link
Contributor

irenjj commented May 19, 2025

There are one thing we surely know that should be implemented: detect which nodes in the LogicalPlan AST is a dependent join node. However, we don't need to create a new LogicalPlan type like LogicalPlan::DependentJoin, if this plan type is only an intermediate form during the decorrelation phase, and i expect after the rewriting completes, no more dependent join node exists.

=> We can keep this concept internally to the decorelation framework

In DuckDB, the final logical plan generated is DelimJoin. Besides identifying this as a Join produced by decorrelation operations, DelimJoin can also be used to eliminate redundant DelimJoin and DelimGet, which is a further optimization. I agree with @suibianwanwank 's point that adding a new LogicalType can make the current process clearer, and by breaking it down into multiple steps, tasks can be better refined for collaborative work (transform to dependent join -> top down/bottom up decorrelation -> further optimize), and it could also provide possibilities for subsequent further optimization (although unrelated to decorrelation).

@duongcongtoai
Copy link
Contributor

duongcongtoai commented May 20, 2025

Thank you everyone for your opinions. Looks like my implementation is trying to wrap everything inside a single optimizor, which is hard to follow and reduces space for collaborative work, and since we can use duckdb's implementation detail, the next steps can be the followings:

  • Define a new DelimGet LogicalPlan type and implement existing methods for a standard LogicalPlan. For DelimJoin we can reuse existing LogicalPlan::Join with a new join_type, because its purpose is only to detect if a join comes from a dependentJoin or not
  • Rewrite the Existing Subqueries into 2 operators DelimJoin and DelimGet (either at planning stage or optimizor stage, duckdb does this at planning stage)
  • Decorrelation (duckdb does this at planning stage)
  • DelimGet removal

@duongcongtoai
Copy link
Contributor

In this case, we may need at least 2 new optimizor passes 🤔 SubqueryDecorrelation and DelimGetRemoval.
and looks like this PR can be closed in favor of new approach, could you give your opinions to go further?

@irenjj
Copy link
Contributor

irenjj commented May 20, 2025

Thank you everyone for your opinions. Looks like my implementation is trying to wrap everything inside a single optimizor, which is hard to follow and reduces space for collaborative work, and since we can use duckdb's implementation detail, the next steps can be the followings:

  • Define a new DelimGet LogicalPlan type and implement existing methods for a standard LogicalPlan. For DelimJoin we can reuse existing LogicalPlan::Join with a new join_type, because its purpose is only to detect if a join comes from a dependentJoin or not
  • Rewrite the Existing Subqueries into 2 operators DelimJoin and DelimGet (either at planning stage or optimizor stage, duckdb does this at planning stage)
  • Decorrelation (duckdb does this at planning stage)
  • DelimGet removal

Thanks @duongcongtoai ! This is a very good idea! I also think we can start with simple unnest, we may need to introduce some DuckDB structures: new logical plan/expr(DelimScan, ...), some new structures (delim_offset, has_correlated_expressions, ..) By trying to refactor simple unnest, we can discover some limitations of DataFusion and adjust our follow-up plans in a timely manner.
cc @alamb @jayzhan211 @suibianwanwank @xudong963

@duongcongtoai
Copy link
Contributor

From what i understand, duckdb does not directly differentiate between simple unnest vs general unnest right? they achieve the simplified query plan thanks to the optimizor of DelimGetRemove, where the DelimGet operator in the RHS is removed?

@irenjj
Copy link
Contributor

irenjj commented May 21, 2025

From what i understand, duckdb does not directly differentiate between simple unnest vs general unnest right?

Yep, It should be that general unnest includes the simple unnest case.

they achieve the simplified query plan thanks to the optimizor of DelimGetRemove, where the DelimGet operator in the RHS is removed?

Sorry, I couldn't find the DelimGetRemove keyword in the DuckDB source code. Could you help provide a link to it? If you mean Deliminator, the optimization rule collects all DelimJoin and DelimGet under DeliminJoin as candidates. For each candidate's joins array, it sorts them from largest to smallest by depth, finding the DelimGet with the deepest depth. Keeping the deepest join with DelimGet is meaningful because this selection condition can filter out large amounts of data early. Then it attempts to remove other shallower joins while still maintaining safety, since the deepest one already ensures data correctness. This helps avoid redundant join operations.

@duongcongtoai
Copy link
Contributor

Sorry, I couldn't find the DelimGetRemove keyword in the DuckDB source code. Could you help provide a link to it? If you mean Deliminator

Yes

Nice, then let's follow this approach of duckdb, to me it also makes the code cleaner

@alamb
Copy link
Contributor

alamb commented May 21, 2025

Thanks @duongcongtoai ! This is a very good idea! I also think we can start with simple unnest, we may need to introduce some DuckDB structures: new logical plan/expr(DelimScan, ...), some new structures (delim_offset, has_correlated_expressions, ..) By trying to refactor simple unnest, we can discover some limitations of DataFusion and adjust our follow-up plans in a timely manner.

I don't fully follow what this is proposing

Are you proposing to add new LogicalPlan::DelimScan variant? It was not entirely clear to me reading this thread if the DelimScan is something that anything other than the decorrelation optimizer pass would use

@duongcongtoai
Copy link
Contributor

duongcongtoai commented May 22, 2025

From my understanding DelimScan is a LogicalPlan::Aggregate wrapped around a LogicalPlan::TableScan, but maybe @irenjj can provide more information

Update: DelimScan is a physical optimization to retrieve unique column values across multiple tables efficiently (i.e some correlated subqueries require retrieving values from table1.column1 and table2.column2), having this scan onced is an optimization, but the logical plan should have some abstraction to represent this


I may be able to work on some of this story by this week: detect all dependent join nodes (actually my previous PR will not go to waste and can reuse alot of stuff for this)

After this implementation we will have a Logical Plan similar to this of duckdb

D explain select * from table1 where value > (select count(*) from table2 t2 where t2.ref_id=table1.id and quantity=(select count(*) from table1 t1 where t1.id=table1.id));

┌─────────────────────────────┐
│┌───────────────────────────┐│
││ Unoptimized Logical Plan  ││
│└───────────────────────────┘│
└─────────────────────────────┘
┌───────────────────────────┐
│         PROJECTION        │
│    ────────────────────   │
│        Expressions:       │
│             id            │
│           value           │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│           FILTER          │
│    ────────────────────   │
│        Expressions:       │
│  (CAST(value AS BIGINT) > │
│          SUBQUERY)        │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│       DEPENDENT_JOIN      │
│    ────────────────────   ├──────────────┐
│     Join Type: SINGLE     │              │
└─────────────┬─────────────┘              │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│          SEQ_SCAN         ││         PROJECTION        │
│    ────────────────────   ││    ────────────────────   │
│       Table: table1       ││        Expressions:       │
│   Type: Sequential Scan   ││        count_star()       │
└───────────────────────────┘└─────────────┬─────────────┘
                             ┌─────────────┴─────────────┐
                             │         AGGREGATE         │
                             │    ────────────────────   │
                             │        Expressions:       │
                             │        count_star()       │
                             └─────────────┬─────────────┘
                             ┌─────────────┴─────────────┐
                             │           FILTER          │
                             │    ────────────────────   │
                             │        Expressions:       │
                             │       (ref_id = id)       │
                             │ (CAST(quantity AS BIGINT) │
                             │        = SUBQUERY)        │
                             └─────────────┬─────────────┘
                             ┌─────────────┴─────────────┐
                             │       DEPENDENT_JOIN      │
                             │    ────────────────────   ├──────────────┐
                             │     Join Type: SINGLE     │              │
                             └─────────────┬─────────────┘              │
                             ┌─────────────┴─────────────┐┌─────────────┴─────────────┐
                             │          SEQ_SCAN         ││         PROJECTION        │
                             │    ────────────────────   ││    ────────────────────   │
                             │       Table: table2       ││        Expressions:       │
                             │   Type: Sequential Scan   ││        count_star()       │
                             └───────────────────────────┘└─────────────┬─────────────┘
                                                          ┌─────────────┴─────────────┐
                                                          │         AGGREGATE         │
                                                          │    ────────────────────   │
                                                          │        Expressions:       │
                                                          │        count_star()       │
                                                          └─────────────┬─────────────┘
                                                          ┌─────────────┴─────────────┐
                                                          │           FILTER          │
                                                          │    ────────────────────   │
                                                          │        Expressions:       │
                                                          │         (id = id)         │
                                                          └─────────────┬─────────────┘
                                                          ┌─────────────┴─────────────┐
                                                          │          SEQ_SCAN         │
                                                          │    ────────────────────   │
                                                          │       Table: table1       │
                                                          │   Type: Sequential Scan   │
                                                          └───────────────────────────┘

@logan-keede
Copy link
Contributor

logan-keede commented May 22, 2025

@duongcongtoai do you think we should have a feature branch for this?

I think that as you work on detecting correlated queries(dependent join) you will end up creating a logical plan that actually can't be executed because we currently do not support Dependent Join (and seems like we dont plan to, as correlated can now be completed decorrelated (if I read the paper right.)). This will necessitate you to either put decorrelation logic in the same PR or implement Dummy Dependent Join which I think will leave some existing queries broken.

You will either end up with a long PR or broken queries making it difficult to review/merge.

Alternatively you can try putting this code behind feature gates, though I am not sure how that would work out or if that would be easier than having feature branch.

Let me know If I am wrong, or if you have other plans.

@duongcongtoai
Copy link
Contributor

duongcongtoai commented May 23, 2025

@logan-keede I expect to implement an optimizor that does 2 things (splitted into 2 PRs)

  • translate all subqueries (recursive aware) into dependent join
  • decorrelate the dependent join into DelimJoin and DelimGet

In the first PR i don't expect to integrate this optimizor to the main flow yet, the behavior is tested through in-code test instead of sqllogictests => This PR can be merged to main without breaking existing behavior

In the second PR is where all integration happens, and we can integrate this optimizor into the mainbranch and test them with sqllogictest.

But as you mention a lot of sqllogictests will be broken if we completely deprecate the followings

            Arc::new(DecorrelatePredicateSubquery::new()),
            Arc::new(ScalarSubqueryToJoin::new())

So i think we devide the 2nd phase into 2 more subphases:

2nd phase
Let 3 optimizor exists temporarily such as

impl Optimizer {
    /// Create a new optimizer using the recommended list of rules
    pub fn new() -> Self {
...
            Arc::new(DecorrelatePredicateSubquery::new()),
            Arc::new(ScalarSubqueryToJoin::new()),
            Arc::new(GeneralSubqueryDecorrelation::new()), <-----------This is newly added

Should any queries that cannot be decorrelated by the previous 2 optimizor, the new optimizor will come into play

In this phase alot of work can be done in parallel to support different complex usecases, because more and more complex subqueries are supported (DelimGetRemoval can also be implemented in this phase)

3rd phase
Deprecate the old optimizor

@logan-keede
Copy link
Contributor

logan-keede commented May 23, 2025

In the first PR i don't expect to integrate this optimizor to the main flow yet, the behavior is tested through in-code test instead of sqllogictests => This PR can be merged to main without breaking existing behavior

totally forgot that was possible.😅

Also this is a very nice write up! Puts a lot of things into perspective.

@alamb
Copy link
Contributor

alamb commented May 23, 2025

Let 3 optimizor exists temporarily such as

This sounds like a nice plan. Perhaps in parallel different people could work on migrating logic from the existing rules into the new optimizer rule and removing the old one

@irenjj
Copy link
Contributor

irenjj commented May 24, 2025

Hi @duongcongtoai , I created a tracking issue and broke down the task of building dependent joins for better collaboration: #16173. There might be some points I didn't consider, and if you have already implemented them, feel free to let me know so I can update the task list. I also implemented a draft simple rule to handle correlated scalar subqueries in WHERE clauses: #16174.
cc: @alamb @jayzhan211

@alamb
Copy link
Contributor

alamb commented May 24, 2025

PLease let me know when you have PRs ready for review or other items that I can try and help with

@duongcongtoai
Copy link
Contributor

This PR is ready for review,
let me know your opinions
I think after this it will unblock us to start implementing some simple decorrelation
cc @irenjj @alamb @logan-keede

@duongcongtoai
Copy link
Contributor

Hi everyone, i have 2 pending PRs which support this story, please help me take a look

After merging them we can start implement the steps for decorrelation

@duongcongtoai
Copy link
Contributor

There is also one thing i want to highlight, is that in DuckDB, a SubqueryExpr may result into 2 output expr after decorrelation, this is because they want to support this query
select * from outer where (1,2) in (select col1,col2 in inner)
https://github.com/duckdb/duckdb/pull/15259/files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants