Skip to content

Commit 74178b4

Browse files
committed
Add Exchange before GroupId to improve Partial Aggregation
Based on: trinodb/trino@dc1d66fb co-authored-by: Piotr Findeisen <[email protected]> Based on : trinodb/trino@c573b34 co-authored-by: Lukasz Stec <[email protected]> Based on: trinodb/trino@29328d3 co-authored-by: praveenkrishna <[email protected]>
1 parent 7119aa2 commit 74178b4

15 files changed

+831
-16
lines changed

presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java

+10
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ public final class SystemSessionProperties
328328
public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer";
329329
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";
330330
public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name";
331+
public static final String ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID = "add_exchange_below_partial_aggregation_over_group_id";
331332

332333
// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
333334
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
@@ -1858,6 +1859,10 @@ public SystemSessionProperties(
18581859
EXPRESSION_OPTIMIZER_NAME,
18591860
"Configure which expression optimizer to use",
18601861
featuresConfig.getExpressionOptimizerName(),
1862+
false),
1863+
booleanProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID,
1864+
"Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance",
1865+
featuresConfig.getAddExchangeBelowPartialAggregationOverGroupId(),
18611866
false));
18621867
}
18631868

@@ -3164,4 +3169,9 @@ public static String getExpressionOptimizerName(Session session)
31643169
{
31653170
return session.getSystemProperty(EXPRESSION_OPTIMIZER_NAME, String.class);
31663171
}
3172+
3173+
public static boolean isEnabledAddExchangeBelowGroupId(Session session)
3174+
{
3175+
return session.getSystemProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID, Boolean.class);
3176+
}
31673177
}

presto-main/src/main/java/com/facebook/presto/cost/TaskCountEstimator.java

+8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.cost;
1515

16+
import com.facebook.presto.Session;
1617
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
1718
import com.facebook.presto.metadata.InternalNode;
1819
import com.facebook.presto.metadata.InternalNodeManager;
@@ -22,6 +23,8 @@
2223
import java.util.Set;
2324
import java.util.function.IntSupplier;
2425

26+
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
27+
import static java.lang.Math.min;
2528
import static java.lang.Math.toIntExact;
2629
import static java.util.Objects.requireNonNull;
2730

@@ -54,4 +57,9 @@ public int estimateSourceDistributedTaskCount()
5457
{
5558
return numberOfNodes.getAsInt();
5659
}
60+
61+
public int estimateHashedTaskCount(Session session)
62+
{
63+
return min(numberOfNodes.getAsInt(), getHashPartitionCount(session));
64+
}
5765
}

presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java

+14
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ public class FeaturesConfig
297297
private boolean singleNodeExecutionEnabled;
298298
private boolean nativeExecutionScaleWritersThreadsEnabled;
299299
private String expressionOptimizerName = DEFAULT_EXPRESSION_OPTIMIZER_NAME;
300+
private boolean addExchangeBelowPartialAggregationOverGroupId;
300301

301302
public enum PartitioningPrecisionStrategy
302303
{
@@ -2945,4 +2946,17 @@ public boolean isExcludeInvalidWorkerSessionProperties()
29452946
{
29462947
return this.setExcludeInvalidWorkerSessionProperties;
29472948
}
2949+
2950+
@Config("optimizer.add-exchange-below-partial-aggregation-over-group-id")
2951+
@ConfigDescription("Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance")
2952+
public FeaturesConfig setAddExchangeBelowPartialAggregationOverGroupId(boolean addExchangeBelowPartialAggregationOverGroupId)
2953+
{
2954+
this.addExchangeBelowPartialAggregationOverGroupId = addExchangeBelowPartialAggregationOverGroupId;
2955+
return this;
2956+
}
2957+
2958+
public boolean getAddExchangeBelowPartialAggregationOverGroupId()
2959+
{
2960+
return addExchangeBelowPartialAggregationOverGroupId;
2961+
}
29482962
}

presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.cost.CostComparator;
1919
import com.facebook.presto.cost.StatsCalculator;
2020
import com.facebook.presto.cost.TaskCountEstimator;
21+
import com.facebook.presto.execution.TaskManagerConfig;
2122
import com.facebook.presto.metadata.Metadata;
2223
import com.facebook.presto.split.PageSourceManager;
2324
import com.facebook.presto.split.SplitManager;
@@ -27,6 +28,7 @@
2728
import com.facebook.presto.sql.planner.iterative.IterativeOptimizer;
2829
import com.facebook.presto.sql.planner.iterative.Rule;
2930
import com.facebook.presto.sql.planner.iterative.properties.LogicalPropertiesProviderImpl;
31+
import com.facebook.presto.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet;
3032
import com.facebook.presto.sql.planner.iterative.rule.AddIntermediateAggregations;
3133
import com.facebook.presto.sql.planner.iterative.rule.AddNotNullFiltersToJoinNode;
3234
import com.facebook.presto.sql.planner.iterative.rule.CombineApproxPercentileFunctions;
@@ -222,7 +224,8 @@ public PlanOptimizers(
222224
TaskCountEstimator taskCountEstimator,
223225
PartitioningProviderManager partitioningProviderManager,
224226
FeaturesConfig featuresConfig,
225-
ExpressionOptimizerManager expressionOptimizerManager)
227+
ExpressionOptimizerManager expressionOptimizerManager,
228+
TaskManagerConfig taskManagerConfig)
226229
{
227230
this(metadata,
228231
sqlParser,
@@ -238,7 +241,8 @@ public PlanOptimizers(
238241
taskCountEstimator,
239242
partitioningProviderManager,
240243
featuresConfig,
241-
expressionOptimizerManager);
244+
expressionOptimizerManager,
245+
taskManagerConfig);
242246
}
243247

244248
@PostConstruct
@@ -270,7 +274,8 @@ public PlanOptimizers(
270274
TaskCountEstimator taskCountEstimator,
271275
PartitioningProviderManager partitioningProviderManager,
272276
FeaturesConfig featuresConfig,
273-
ExpressionOptimizerManager expressionOptimizerManager)
277+
ExpressionOptimizerManager expressionOptimizerManager,
278+
TaskManagerConfig taskManagerConfig)
274279
{
275280
this.exporter = exporter;
276281
ImmutableList.Builder<PlanOptimizer> builder = ImmutableList.builder();
@@ -820,6 +825,7 @@ public PlanOptimizers(
820825

821826
if (!noExchange) {
822827
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges
828+
823829
builder.add(new IterativeOptimizer(
824830
metadata,
825831
ruleStats,
@@ -830,6 +836,7 @@ public PlanOptimizers(
830836
// Must run before AddExchanges and after ReplicateSemiJoinInDelete
831837
// to avoid temporarily having an invalid plan
832838
new DetermineSemiJoinDistributionType(costComparator, taskCountEstimator))));
839+
833840
builder.add(new RandomizeNullKeyInOuterJoin(metadata.getFunctionAndTypeManager(), statsCalculator),
834841
new PruneUnreferencedOutputs(),
835842
new IterativeOptimizer(
@@ -841,6 +848,7 @@ public PlanOptimizers(
841848
new PruneRedundantProjectionAssignments(),
842849
new InlineProjections(metadata.getFunctionAndTypeManager()),
843850
new RemoveRedundantIdentityProjections())));
851+
844852
builder.add(new ShardJoins(metadata, metadata.getFunctionAndTypeManager(), statsCalculator),
845853
new PruneUnreferencedOutputs());
846854
builder.add(
@@ -914,6 +922,13 @@ public PlanOptimizers(
914922
ImmutableSet.of(
915923
new PruneJoinColumns())));
916924

925+
builder.add(new IterativeOptimizer(
926+
metadata,
927+
ruleStats,
928+
statsCalculator,
929+
costCalculator,
930+
new AddExchangesBelowPartialAggregationOverGroupIdRuleSet(taskCountEstimator, taskManagerConfig, metadata, sqlParser).rules()));
931+
917932
builder.add(new IterativeOptimizer(
918933
metadata,
919934
ruleStats,

0 commit comments

Comments
 (0)