19
19
20
20
use crate :: optimizer:: ApplyOrder ;
21
21
use crate :: { OptimizerConfig , OptimizerRule } ;
22
- use datafusion_common:: Result ;
22
+ use datafusion_common:: tree_node:: Transformed ;
23
+ use datafusion_common:: { internal_err, Result } ;
24
+ use datafusion_expr:: utils:: conjunction;
23
25
use datafusion_expr:: {
24
- and , logical_plan:: Filter , logical_plan:: JoinType , Expr , ExprSchemable , LogicalPlan ,
26
+ logical_plan:: Filter , logical_plan:: JoinType , Expr , ExprSchemable , LogicalPlan ,
25
27
} ;
26
28
use std:: sync:: Arc ;
27
29
@@ -32,24 +34,34 @@ use std::sync::Arc;
32
34
#[ derive( Default ) ]
33
35
pub struct FilterNullJoinKeys { }
34
36
35
- impl FilterNullJoinKeys {
36
- pub const NAME : & ' static str = "filter_null_join_keys" ;
37
- }
38
-
39
37
impl OptimizerRule for FilterNullJoinKeys {
40
38
fn try_optimize (
41
39
& self ,
42
- plan : & LogicalPlan ,
43
- config : & dyn OptimizerConfig ,
40
+ _plan : & LogicalPlan ,
41
+ _config : & dyn OptimizerConfig ,
44
42
) -> Result < Option < LogicalPlan > > {
43
+ internal_err ! ( "Should have called FilterNullJoinKeys::rewrite" )
44
+ }
45
+
46
+ fn supports_rewrite ( & self ) -> bool {
47
+ true
48
+ }
49
+
50
+ fn apply_order ( & self ) -> Option < ApplyOrder > {
51
+ Some ( ApplyOrder :: BottomUp )
52
+ }
53
+
54
+ fn rewrite (
55
+ & self ,
56
+ plan : LogicalPlan ,
57
+ config : & dyn OptimizerConfig ,
58
+ ) -> Result < Transformed < LogicalPlan > > {
45
59
if !config. options ( ) . optimizer . filter_null_join_keys {
46
- return Ok ( None ) ;
60
+ return Ok ( Transformed :: no ( plan ) ) ;
47
61
}
48
62
49
63
match plan {
50
- LogicalPlan :: Join ( join) if join. join_type == JoinType :: Inner => {
51
- let mut join = join. clone ( ) ;
52
-
64
+ LogicalPlan :: Join ( mut join) if join. join_type == JoinType :: Inner => {
53
65
let left_schema = join. left . schema ( ) ;
54
66
let right_schema = join. right . schema ( ) ;
55
67
@@ -69,29 +81,22 @@ impl OptimizerRule for FilterNullJoinKeys {
69
81
if !left_filters. is_empty ( ) {
70
82
let predicate = create_not_null_predicate ( left_filters) ;
71
83
join. left = Arc :: new ( LogicalPlan :: Filter ( Filter :: try_new (
72
- predicate,
73
- join. left . clone ( ) ,
84
+ predicate, join. left ,
74
85
) ?) ) ;
75
86
}
76
87
if !right_filters. is_empty ( ) {
77
88
let predicate = create_not_null_predicate ( right_filters) ;
78
89
join. right = Arc :: new ( LogicalPlan :: Filter ( Filter :: try_new (
79
- predicate,
80
- join. right . clone ( ) ,
90
+ predicate, join. right ,
81
91
) ?) ) ;
82
92
}
83
- Ok ( Some ( LogicalPlan :: Join ( join) ) )
93
+ Ok ( Transformed :: yes ( LogicalPlan :: Join ( join) ) )
84
94
}
85
- _ => Ok ( None ) ,
95
+ _ => Ok ( Transformed :: no ( plan ) ) ,
86
96
}
87
97
}
88
-
89
98
fn name ( & self ) -> & str {
90
- Self :: NAME
91
- }
92
-
93
- fn apply_order ( & self ) -> Option < ApplyOrder > {
94
- Some ( ApplyOrder :: BottomUp )
99
+ "filter_null_join_keys"
95
100
}
96
101
}
97
102
@@ -100,11 +105,9 @@ fn create_not_null_predicate(filters: Vec<Expr>) -> Expr {
100
105
. into_iter ( )
101
106
. map ( |c| Expr :: IsNotNull ( Box :: new ( c) ) )
102
107
. collect ( ) ;
103
- // combine the IsNotNull expressions with AND
104
- not_null_exprs
105
- . iter ( )
106
- . skip ( 1 )
107
- . fold ( not_null_exprs[ 0 ] . clone ( ) , |a, b| and ( a, b. clone ( ) ) )
108
+
109
+ // directly unwrap since it should always have a value
110
+ conjunction ( not_null_exprs) . unwrap ( )
108
111
}
109
112
110
113
#[ cfg( test) ]
0 commit comments