Skip to content

Commit f29dee6

Browse files
stczwdHyukjinKwon
stczwd
authored andcommitted
[SPARK-37933][SQL] Change the traversal method of V2ScanRelationPushDown push down rules
### What changes were proposed in this pull request? This pr is trying to change the traversal method of V2ScanRelationPushDown push down rules , which is more readable and easier to extend and add new rules. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? origin tests Closes apache#35242 from stczwd/SPARK-37933. Authored-by: stczwd <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 3a45981 commit f29dee6

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

+13-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,17 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
3737
import DataSourceV2Implicits._
3838

3939
def apply(plan: LogicalPlan): LogicalPlan = {
40-
applyColumnPruning(
41-
applyLimit(pushDownAggregates(pushDownFilters(pushDownSample(createScanBuilder(plan))))))
40+
val pushdownRules = Seq[LogicalPlan => LogicalPlan] (
41+
createScanBuilder,
42+
pushDownSample,
43+
pushDownFilters,
44+
pushDownAggregates,
45+
pushDownLimits,
46+
pruneColumns)
47+
48+
pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) =>
49+
pushDownRule(newPlan)
50+
}
4251
}
4352

4453
private def createScanBuilder(plan: LogicalPlan) = plan.transform {
@@ -222,7 +231,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
222231
Cast(aggAttribute, aggDataType)
223232
}
224233

225-
def applyColumnPruning(plan: LogicalPlan): LogicalPlan = plan.transform {
234+
def pruneColumns(plan: LogicalPlan): LogicalPlan = plan.transform {
226235
case ScanOperation(project, filters, sHolder: ScanBuilderHolder) =>
227236
// column pruning
228237
val normalizedProjects = DataSourceStrategy
@@ -308,7 +317,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
308317
case other => other
309318
}
310319

311-
def applyLimit(plan: LogicalPlan): LogicalPlan = plan.transform {
320+
def pushDownLimits(plan: LogicalPlan): LogicalPlan = plan.transform {
312321
case globalLimit @ Limit(IntegerLiteral(limitValue), child) =>
313322
val newChild = pushDownLimit(child, limitValue)
314323
val newLocalLimit = globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild))

0 commit comments

Comments
 (0)