Skip to content

Push Dynamic Join Predicates into Scan ("Sideways Information Passing", etc) #7955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Tracked by #15512 ...
alamb opened this issue Oct 27, 2023 · 14 comments
Open
Tracked by #15512 ...
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Oct 27, 2023

Is your feature request related to a problem or challenge?

If we want to make DataFusion the engine of choice for fast OLAP processing, eventually we will need to make joins faster. In addition to making sure the join order is not disastrous (e.g. #7949) we can consider other advanced OLAP techniques improve joins (especially queries with multiple joins)

Describe the solution you'd like

I would like to propose we look into pushing "join predicate" into scans (which I know of as "sideways information passing")

As an example, consider the joins from TPCH Q17

select
sum(l_extendedprice) / 7.0 as avg_yearly from
part, lineitem
where
  p_partkey = l_partkey
  and p_brand = 'Brand#23'
  and p_container = 'MED BOX'
  and l_quantity < (	select	0.2 * avg(l_quantity)	from	lineitem where	l_partkey = p_partkey	);

The first join (should) look like this. The observation is there are no predicates on the lineitem table (the big one), which means all the filtering happens in the join, which is bad because the scan can't do optimizations like "late materialization" and instead must decode all 60M values of selected columns, even though very few (2044!) are actually used

                          │                                                         
                          │                                                         
           2044 Rows      │                                                         
                          │                                                         
                          ▼                                                         
                 ┌────────────────┐                                                 
                 │    HashJoin    │                                                 
                 │   p_partkey =  │                                                 
                 │   l_partkey    │                                                 
                 └──┬─────────┬───┘                     This scan decodes 60M values
   2M Rows          │         │             60M Rows         of l_quantity and      
           ┌────────┘         └─────────┐               l_extendedprice, even though
           │                            │               all but 2044 are filtered by
           ▼                            ▼                         the join          
 ┌──────────────────┐        ┌─────────────────────┐                                
 │Scan: part        │        │Scan: lineitem       │                  │             
 │projection:       │        │projection:          │                                
 │  p_partkey       │        │  l_quantity,        │                  │             
 │filters:          │        │  l_extendedprice,   │◀─ ─ ─ ─ ─ ─ ─ ─ ─              
 │  p_brand = ..    │        │  l_partkey          │                                
 │  p_container = ..│        │filters:             │                                
 │                  │        │  NONE               │                                
 └──────────────────┘        └─────────────────────┘                                

The idea is to push the predicate into the join, by making something that acts like l_partkey IN (...) that can be applied during the scan


                               1. The HashJoin completely reads the build                        
                               side before starting the probe side.                              
                                                                                                 
                               Thus, all 2M known matching values of                             
                         │     l_partkey are in a hash table prior to                            
                         │     scanning lineitem                                                 
          2044 Rows      │                                                                       
                         │                           │                                           
                         ▼                                                                       
                ┌────────────────┐                   │                                           
                │    HashJoin    │                                                               
                │   p_partkey =  │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                           
                │   l_partkey    │                                                               
                └──┬─────────┬───┘                                                               
                   │         │             60M Rows                                              
          ┌────────┘         └────────────┐                  The idea is to introduce a filter   
          │                               │                  that is effectively "l_partkey IN   
          ▼                               ▼                  (HASH TABLE)" or something similar  
┌──────────────────┐        ┌──────────────────────────┐     that is applied during the scan     
│Scan: part        │        │Scan: lineitem            │┌ ─ ─                                    
│projection:       │        │projection:               │     If the scan can avoid decoding      
│  p_partkey       │        │  l_quantity,             ││    l_quantity and l_extended that do   
│filters:          │        │  l_extendedprice,        │     not match, there is significant     
│  p_brand = ..    │        │  l_partkey               ││    savings                             
│  p_container = ..│        │filters:                  │                                         
│                  │        │  l_partkey IN (....)   ◀─│┘                                        
└──────────────────┘        └──────────────────────────┘                                         

In a query with a single selective join (that filters many values) the savings is likely minimal as it depends on how much work can be saved in materialization (decoding). The only scan that does late materialization in DataFusion at the time of writing is the ParquetExec

However, in a query with multiple selective joins the savings becomes much more pronounced, because we can save the effort of creating intermediate join outputs which are filtered out by joins later in the plan

For example:

    Pass down in multiple joins                                                                 
                                                                                                
 While this doesn't happen in TPCH                                                              
Q17 (the subquery has no predicates)                                                            
 the SIPS approach can be even more                                                             
 effective with multiple selective                                                              
               joins                  │                                                         
                                      │                                                         
                                      │             Filters on both join keys can be applied    
                                      │             at this level, which can be even more       
                                      ▼             effective as it avoids the work to create   
                             ┌────────────────┐     the intermediate output of HashJoin(2)   ─ ┐
                             │  HashJoin (1)  │     which is then filtered by HashJoin(1)       
                             │     d1.key =   │                                                │
                             │    f.d1_key    │                                                 
                             └──┬─────────┬───┘                                                │
                                │         │                                                     
                     ┌──────────┘         └────────────┐                                       │
                     │                                 │                                        
                     ▼                                 ▼                                       │
           ┌──────────────────┐               ┌────────────────┐                                
           │Scan: D1          │               │  HashJoin (2)  │                               │
           │filters:          │               │     d2.key =   │                                
           │  ...             │               │    f.d2_key    │                               │
           └──────────────────┘               └───┬─────────┬──┘                                
                                                  │         │                                  │
                                      ┌───────────┘         └─────────────┐                     
                                      │                                   │                    │
                                      ▼                                   ▼                     
                             ┌────────────────┐                ┌─────────────────────┐         │
                             │Scan: D2        │                │Scan: F              │          
                             │filters:        │                │filters:             │         │
                             │  ...           │                │  f.d1_key IN (...)  │◀ ─ ─ ─ ─ 
                             └────────────────┘                │  f.d2_key IN (...)  │          
                                                               │                     │          
                                                               └─────────────────────┘          

Describe alternatives you've considered

Some version of this technique is described in "Bloom Filter Joins" in Spark: https://issues.apache.org/jira/browse/SPARK-32268

Building a seprate Bloom Filter has the nice property that you can distribute them in a networked cluster, however, the overhead of creating the bloom filter would likely be non trivial

Additional context

See a description of how DataFusion HashJoins work here: #7953

Here is an industrial paper that describes industrial experience with using SIPS techniques here: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf

@alamb alamb added the enhancement New feature or request label Oct 27, 2023
@alamb
Copy link
Contributor Author

alamb commented Oct 27, 2023

cc @sunchao @viirya and @kazuyukitanimura whom I mention this technique the other day

@acking-you
Copy link
Contributor

Is your feature request related to a problem or challenge?

If we want to make DataFusion the engine of choice for fast OLAP processing, eventually we will need to make joins faster. In addition to making sure the join order is not disastrous (e.g. #7949) we can consider other advanced OLAP techniques improve joins (especially queries with multiple joins)

Describe the solution you'd like

I would like to propose we look into pushing "join predicate" into scans (which I know of as "sideways information passing")

As an example, consider the joins from TPCH Q17

select
sum(l_extendedprice) / 7.0 as avg_yearly from
part, lineitem
where
  p_partkey = l_partkey
  and p_brand = 'Brand#23'
  and p_container = 'MED BOX'
  and l_quantity < (	select	0.2 * avg(l_quantity)	from	lineitem where	l_partkey = p_partkey	);

The first join (should) look like this. The observation is there are no predicates on the lineitem table (the big one), which means all the filtering happens in the join, which is bad because the scan can't do optimizations like "late materialization" and instead must decode all 60M values of selected columns, even though very few (2044!) are actually used

                          │                                                         
                          │                                                         
           2044 Rows      │                                                         
                          │                                                         
                          ▼                                                         
                 ┌────────────────┐                                                 
                 │    HashJoin    │                                                 
                 │   p_partkey =  │                                                 
                 │   l_partkey    │                                                 
                 └──┬─────────┬───┘                     This scan decodes 60M values
   2M Rows          │         │             60M Rows         of l_quantity and      
           ┌────────┘         └─────────┐               l_extendedprice, even though
           │                            │               all but 2044 are filtered by
           ▼                            ▼                         the join          
 ┌──────────────────┐        ┌─────────────────────┐                                
 │Scan: part        │        │Scan: lineitem       │                  │             
 │projection:       │        │projection:          │                                
 │  p_partkey       │        │  l_quantity,        │                  │             
 │filters:          │        │  l_extendedprice,   │◀─ ─ ─ ─ ─ ─ ─ ─ ─              
 │  p_brand = ..    │        │  l_partkey          │                                
 │  p_container = ..│        │filters:             │                                
 │                  │        │  NONE               │                                
 └──────────────────┘        └─────────────────────┘                                

The idea is to push the predicate into the join, by making something that acts like l_partkey IN (...) that can be applied during the scan


                               1. The HashJoin completely reads the build                        
                               side before starting the probe side.                              
                                                                                                 
                               Thus, all 2M known matching values of                             
                         │     l_partkey are in a hash table prior to                            
                         │     scanning lineitem                                                 
          2044 Rows      │                                                                       
                         │                           │                                           
                         ▼                                                                       
                ┌────────────────┐                   │                                           
                │    HashJoin    │                                                               
                │   p_partkey =  │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                                           
                │   l_partkey    │                                                               
                └──┬─────────┬───┘                                                               
                   │         │             60M Rows                                              
          ┌────────┘         └────────────┐                  The idea is to introduce a filter   
          │                               │                  that is effectively "l_partkey IN   
          ▼                               ▼                  (HASH TABLE)" or something similar  
┌──────────────────┐        ┌──────────────────────────┐     that is applied during the scan     
│Scan: part        │        │Scan: lineitem            │┌ ─ ─                                    
│projection:       │        │projection:               │     If the scan can avoid decoding      
│  p_partkey       │        │  l_quantity,             ││    l_quantity and l_extended that do   
│filters:          │        │  l_extendedprice,        │     not match, there is significant     
│  p_brand = ..    │        │  l_partkey               ││    savings                             
│  p_container = ..│        │filters:                  │                                         
│                  │        │  l_partkey IN (....)   ◀─│┘                                        
└──────────────────┘        └──────────────────────────┘                                         

In a query with a single selective join (that filters many values) the savings is likely minimal as it depends on how much work can be saved in materialization (decoding). The only scan that does late materialization in DataFusion at the time of writing is the ParquetExec

However, in a query with multiple selective joins the savings becomes much more pronounced, because we can save the effort of creating intermediate join outputs which are filtered out by joins later in the plan

For example:

    Pass down in multiple joins                                                                 
                                                                                                
 While this doesn't happen in TPCH                                                              
Q17 (the subquery has no predicates)                                                            
 the SIPS approach can be even more                                                             
 effective with multiple selective                                                              
               joins                  │                                                         
                                      │                                                         
                                      │             Filters on both join keys can be applied    
                                      │             at this level, which can be even more       
                                      ▼             effective as it avoids the work to create   
                             ┌────────────────┐     the intermediate output of HashJoin(2)   ─ ┐
                             │  HashJoin (1)  │     which is then filtered by HashJoin(1)       
                             │     d1.key =   │                                                │
                             │    f.d1_key    │                                                 
                             └──┬─────────┬───┘                                                │
                                │         │                                                     
                     ┌──────────┘         └────────────┐                                       │
                     │                                 │                                        
                     ▼                                 ▼                                       │
           ┌──────────────────┐               ┌────────────────┐                                
           │Scan: D1          │               │  HashJoin (2)  │                               │
           │filters:          │               │     d2.key =   │                                
           │  ...             │               │    f.d2_key    │                               │
           └──────────────────┘               └───┬─────────┬──┘                                
                                                  │         │                                  │
                                      ┌───────────┘         └─────────────┐                     
                                      │                                   │                    │
                                      ▼                                   ▼                     
                             ┌────────────────┐                ┌─────────────────────┐         │
                             │Scan: D2        │                │Scan: F              │          
                             │filters:        │                │filters:             │         │
                             │  ...           │                │  f.d1_key IN (...)  │◀ ─ ─ ─ ─ 
                             └────────────────┘                │  f.d2_key IN (...)  │          
                                                               │                     │          
                                                               └─────────────────────┘          

Describe alternatives you've considered

Some version of this technique is described in "Bloom Filter Joins" in Spark: https://issues.apache.org/jira/browse/SPARK-32268

Building a seprate Bloom Filter has the nice property that you can distribute them in a networked cluster, however, the overhead of creating the bloom filter would likely be non trivial

Additional context

See a description of how DataFusion HashJoins work here: #7953

Here is an industrial paper that describes industrial experience with using SIPS techniques here: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf

I'm very curious about how this kind of graph is drawn😯

@alamb
Copy link
Contributor Author

alamb commented Oct 29, 2023

I'm very curious about how this kind of graph is drawn😯

@acking-you I draw it by hand using https://monodraw.helftone.com/ (and it unfortunately takes me a long time)

@devinjdangelo has suggested https://asciiflow.com/#/ works for making something quick in the browser

@westonpace pointed out that if you're writing on Github then you can use mermaid syntax in comments / issues / prs and it will render automatically: https://github.blog/2022-02-14-include-diagrams-markdown-files-mermaid/

@alamb alamb changed the title Push Join Predicates into Scan ("Sideways Information Passing", etc) Push Dynamic Join Predicates into Scan ("Sideways Information Passing", etc) Nov 30, 2023
@ahirner
Copy link
Contributor

ahirner commented Jul 12, 2024

From: #9963

TakeExec (index lookup) -- really like an indexed scan somehow>

I wonder if TakeExec or something quite similar could also be used for dynamic join predicates?

@westonpace
Copy link
Member

I wonder if TakeExec or something quite similar could also be used for dynamic join predicates?

If there is a secondary index on l_partkey then I think a TakeExec could be useful. Otherwise there is no way to know the row offsets and a filtered scan is probably the best you can do.

@Lordworms
Copy link
Contributor

Interested in this one!

@alamb
Copy link
Contributor Author

alamb commented Aug 14, 2024

Thanks @Lordworms -- I am hoping someone else can step up and help you with this. I just don't have time to help with a project to improve join performance at this time.

@alamb
Copy link
Contributor Author

alamb commented Sep 9, 2024

I believe DuckDB just announced support for this feature in 1.1: https://duckdb.org/2024/09/09/announcing-duckdb-110.html#dynamic-filter-pushdown-from-joins

@adriangb
Copy link
Contributor

I have a PR up for doing something similar for TopK sorts (ORDER BY col LIMIT 10) in #15301. I think we should be able to re-use that work for this change, at which point it would just be a question of implementing a DynamicFilterSource for the join ExecutionPlans. Does that sound right to folks here? I want to make sure we don't land something that is almost what is needed here but not quite.

@xudong963
Copy link
Member

Thank you! @adriangb Basically, they're similar. Join runtime filter should have different kinds of filters, such as min-max, inlist, bloom filter, we can build them based on the build side of hashjoin.

Supporting the min-max runtime filter should be a good start.

@adriangb
Copy link
Contributor

I think that since #15301 pushes an arbitrary Arc<dyn PhysicalExpression> down min/max, inlist, etc. should all be doable 😄

@xudong963
Copy link
Member

I think that since #15301 pushes an arbitrary Arc<dyn PhysicalExpression> down min/max, inlist, etc. should all be doable 😄

Yes, I noticed theDynamicFilterSource trait.

@mbutrovich
Copy link
Contributor

So now that #15568 is in, what is a reasonable approach to do SIP with bloom filters?

  • Modify HashJoinExec to build a bloom filter on the build side, and when complete call DynamicFilterPhysicalExpr::update
  • How do we represent the bloom filter test as an expression? Is it a new PhysicalExpr, or do we define a ScalarUDF for bloom_fiilter_contains(bloom_filter, input) and use a ScalarFunctionExpr? I'm not convinced of the ScalarUDF approach for two reasons: 1) taking the bloom filter bytes as an arg would require construction/validation of a bloom filter from those bytes at every invocation, right? 2) I'm not sure we want to expose this as SQL function.

I'm interested to start tackling this in pieces next week, but I also suspect others have thoughts (maybe even progress) in this area already.

@adriangb
Copy link
Contributor

Modify HashJoinExec to build a bloom filter on the build side, and when complete call DynamicFilterPhysicalExpr::update

Pretty much: once HashjoinExec has completed the build side it builds a bloom filter and wraps it up in a PhysicalExpr.
I don't think you need to use a DynamicFilterPhysicalExpr since once you build the hash table / bloom filter it's never updated. The reason to have DynamicFilterPhysicalExpr is e.g. in a TopK operator where every RecordBatch you process you gain new information to do more selective pruning (new heap values). So I'd think you could just pass the hardcoded PhysicalExpr directly?

To your second point: one thing to consider is if you want this to be compatible with any sort of distributed query execution.
If you do then one of two things would need to happen:

  1. Teach the serialization how to serialize the bloom filter PhysicalExpr -> in practice update the protobuf serialization to match against it.
  2. Implement PhysicalExpr::snapshot() to convert it into an InList or something.

Given all of this my recommendation would be to build a dedicated BloomFilter PhysicalExpr and tech DataFusion how to serialize it to ProtoBuf. Then don't use DynamicFilterPhysicalExpr or PhysicalExpr::snapshot().

The nice thing about this is that the new bloom filter expr could be re-used in other places, e.g. InList could build a bloom filter instead of a HashSet for pre-filtering (I think that's what it currently does).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
8 participants