Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [native] Derive TableScan stream type as FIXED #24468

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ public PlanOptimizers(
// MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation
// Should be placed after AddExchanges, but before AddLocalExchange
// To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange
builder.add(new MergeJoinForSortedInputOptimizer(metadata));
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()));

// Optimizers above this don't understand local exchanges, so be careful moving this.
builder.add(new AddLocalExchanges(metadata, featuresConfig.isNativeExecutionEnabled()));
Expand Down Expand Up @@ -958,7 +958,7 @@ public PlanOptimizers(
statsCalculator,
costCalculator,
ImmutableList.of(),
ImmutableSet.of(new RuntimeReorderJoinSides(metadata))));
ImmutableSet.of(new RuntimeReorderJoinSides(metadata, featuresConfig.isNativeExecutionEnabled()))));
this.runtimeOptimizers = runtimeBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
Metadata metadata,
Lookup lookup,
Session session,
PlanNodeIdAllocator idAllocator)
PlanNodeIdAllocator idAllocator,
boolean nativeExecution)
{
JoinNode swapped = joinNode.flipChildren();

Expand All @@ -76,7 +77,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
PlanNode resolvedSwappedLeft = lookup.resolve(newLeft);
if (resolvedSwappedLeft instanceof ExchangeNode && resolvedSwappedLeft.getSources().size() == 1) {
// Ensure the new probe after skipping the local exchange will satisfy the required probe side property
if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, lookup, session)) {
if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, lookup, session, nativeExecution)) {
newLeft = resolvedSwappedLeft.getSources().get(0);
// The HashGenerationOptimizer will generate hashVariables and append to the output layout of the nodes following the same order. Therefore,
// we use the index of the old hashVariable in the ExchangeNode output layout to retrieve the hashVariable from the new left node, and feed
Expand All @@ -100,7 +101,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
.map(EquiJoinClause::getRight)
.collect(toImmutableList());
PlanNode newRight = swapped.getRight();
if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, lookup, session)) {
if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, lookup, session, nativeExecution)) {
if (getTaskConcurrency(session) > 1) {
newRight = systemPartitionedExchange(
idAllocator.getNextId(),
Expand Down Expand Up @@ -132,7 +133,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
}

// Check if the new probe side after removing unnecessary local exchange is valid.
public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, Lookup lookup, Session session)
public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, Lookup lookup, Session session, boolean nativeExecution)
{
StreamPreferredProperties requiredProbeProperty;
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
Expand All @@ -141,7 +142,7 @@ public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata me
else {
requiredProbeProperty = defaultParallelism(session);
}
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session);
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session, nativeExecution);
return requiredProbeProperty.isSatisfiedBy(nodeProperty);
}

Expand All @@ -151,7 +152,8 @@ private static boolean checkBuildSidePropertySatisfied(
List<VariableReferenceExpression> partitioningColumns,
Metadata metadata,
Lookup lookup,
Session session)
Session session,
boolean nativeExecution)
{
StreamPreferredProperties requiredBuildProperty;
if (getTaskConcurrency(session) > 1) {
Expand All @@ -160,21 +162,22 @@ private static boolean checkBuildSidePropertySatisfied(
else {
requiredBuildProperty = singleStream();
}
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session);
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session, nativeExecution);
return requiredBuildProperty.isSatisfiedBy(nodeProperty);
}

private static StreamPropertyDerivations.StreamProperties derivePropertiesRecursively(
PlanNode node,
Metadata metadata,
Lookup lookup,
Session session)
Session session,
boolean nativeExecution)
{
PlanNode actual = lookup.resolve(node);
List<StreamPropertyDerivations.StreamProperties> inputProperties = actual.getSources().stream()
.map(source -> derivePropertiesRecursively(source, metadata, lookup, session))
.map(source -> derivePropertiesRecursively(source, metadata, lookup, session, nativeExecution))
.collect(toImmutableList());
return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session);
return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session, nativeExecution);
}

public static boolean isBelowBroadcastLimit(PlanNode planNode, Rule.Context context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ public class RuntimeReorderJoinSides
private static final Pattern<JoinNode> PATTERN = join();

private final Metadata metadata;
private final boolean nativeExecution;

public RuntimeReorderJoinSides(Metadata metadata)
public RuntimeReorderJoinSides(Metadata metadata, boolean nativeExecution)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.nativeExecution = nativeExecution;
}

@Override
Expand Down Expand Up @@ -97,7 +99,7 @@ public Result apply(JoinNode joinNode, Captures captures, Context context)
return Result.empty();
}

Optional<JoinNode> rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, context.getLookup(), context.getSession(), context.getIdAllocator());
Optional<JoinNode> rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, context.getLookup(), context.getSession(), context.getIdAllocator(), nativeExecution);
if (rewrittenNode.isPresent()) {
log.debug(format("Probe size: %.2f is smaller than Build size: %.2f => invoke runtime join swapping on JoinNode ID: %s.", leftOutputSizeInBytes, rightOutputSizeInBytes, joinNode.getId()));
return Result.ofPlanNode(rewrittenNode.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, StreamPreferredProp
parentPreferences.constrainTo(node.getProbeSource().getOutputVariables()).withDefaultParallelism(session));

// index source does not support local parallel and must produce a single stream
StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session);
StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session, nativeExecution);
checkArgument(indexStreamProperties.getDistribution() == SINGLE, "index source must be single stream");
PlanWithProperties index = new PlanWithProperties(node.getIndexSource(), indexStreamProperties);

Expand Down Expand Up @@ -983,12 +983,12 @@ private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, List<PlanWit

private PlanWithProperties deriveProperties(PlanNode result, StreamProperties inputProperties)
{
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session));
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, nativeExecution));
}

private PlanWithProperties deriveProperties(PlanNode result, List<StreamProperties> inputProperties)
{
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session));
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, nativeExecution));
}

private PlanWithProperties accept(PlanNode node, StreamPreferredProperties context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public class MergeJoinForSortedInputOptimizer
implements PlanOptimizer
{
private final Metadata metadata;
private final boolean nativeExecution;
private boolean isEnabledForTesting;

public MergeJoinForSortedInputOptimizer(Metadata metadata)
public MergeJoinForSortedInputOptimizer(Metadata metadata, boolean nativeExecution)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.nativeExecution = nativeExecution;
}

@Override
Expand Down Expand Up @@ -139,8 +141,8 @@ public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
private boolean meetsDataRequirement(PlanNode left, PlanNode right, JoinNode node)
{
// Acquire data properties for both left and right side
StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session);
StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session);
StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, nativeExecution);
StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, nativeExecution);

List<VariableReferenceExpression> leftJoinColumns = node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList());
List<VariableReferenceExpression> rightJoinColumns = node.getCriteria().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.facebook.presto.spi.plan.WindowNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution;
import com.facebook.presto.sql.planner.plan.ApplyNode;
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
Expand All @@ -65,8 +66,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;

import javax.annotation.concurrent.Immutable;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -96,20 +95,20 @@ public final class StreamPropertyDerivations
{
private StreamPropertyDerivations() {}

public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session)
public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session, boolean nativeExecution)
{
List<StreamProperties> inputProperties = node.getSources().stream()
.map(source -> derivePropertiesRecursively(source, metadata, session))
.map(source -> derivePropertiesRecursively(source, metadata, session, nativeExecution))
.collect(toImmutableList());
return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session);
return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session, nativeExecution);
}

public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session)
public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session, boolean nativeExecution)
{
return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session);
return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session, nativeExecution);
}

public static StreamProperties deriveProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session)
public static StreamProperties deriveProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session, boolean nativeExecution)
{
requireNonNull(node, "node is null");
requireNonNull(inputProperties, "inputProperties is null");
Expand All @@ -127,7 +126,7 @@ public static StreamProperties deriveProperties(PlanNode node, List<StreamProper
metadata,
session);

StreamProperties result = node.accept(new Visitor(metadata, session), inputProperties)
StreamProperties result = node.accept(new Visitor(metadata, session, nativeExecution), inputProperties)
.withOtherActualProperties(otherProperties);

result.getPartitioningColumns().ifPresent(columns ->
Expand All @@ -147,11 +146,13 @@ private static class Visitor
{
private final Metadata metadata;
private final Session session;
private final boolean nativeExecution;

private Visitor(Metadata metadata, Session session)
private Visitor(Metadata metadata, Session session, boolean nativeExecution)
{
this.metadata = metadata;
this.session = session;
this.nativeExecution = nativeExecution;
}

@Override
Expand Down Expand Up @@ -291,13 +292,16 @@ public StreamProperties visitTableScan(TableScanNode node, List<StreamProperties
Optional<Set<VariableReferenceExpression>> streamPartitionSymbols = layout.getStreamPartitioningColumns()
.flatMap(columns -> getNonConstantVariables(columns, assignments, constants));

// Native execution creates a fixed number of drivers for TableScan pipelines
StreamDistribution streamDistribution = nativeExecution ? FIXED : MULTIPLE;

// if we are partitioned on empty set, we must say multiple of unknown partitioning, because
// the connector does not guarantee a single split in this case (since it might not understand
// that the value is a constant).
if (streamPartitionSymbols.isPresent() && streamPartitionSymbols.get().isEmpty()) {
return new StreamProperties(MULTIPLE, Optional.empty(), false);
return new StreamProperties(streamDistribution, Optional.empty(), false);
}
return new StreamProperties(MULTIPLE, streamPartitionSymbols, false);
return new StreamProperties(streamDistribution, streamPartitionSymbols, false);
}

private Optional<Set<VariableReferenceExpression>> getNonConstantVariables(Set<ColumnHandle> columnHandles, Map<ColumnHandle, VariableReferenceExpression> assignments, Set<ColumnHandle> globalConstants)
Expand Down Expand Up @@ -633,7 +637,6 @@ public StreamProperties visitRemoteSource(RemoteSourceNode node, List<StreamProp
}
}

@Immutable
public static final class StreamProperties
{
public enum StreamDistribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean noExchange, PlanChecke
new TypeValidator(),
new VerifyOnlyOneOutputNode(),
new VerifyNoFilteredAggregations(),
new ValidateAggregationsWithDefaultValues(noExchange),
new ValidateStreamingAggregations(),
new ValidateAggregationsWithDefaultValues(noExchange, featuresConfig.isNativeExecutionEnabled()),
new ValidateStreamingAggregations(featuresConfig.isNativeExecutionEnabled()),
new VerifyNoIntermediateFormExpression(),
new VerifyProjectionLocality(),
new DynamicFiltersChecker(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ public class ValidateAggregationsWithDefaultValues
implements Checker
{
private final boolean noExchange;
private final boolean nativeExecution;

public ValidateAggregationsWithDefaultValues(boolean noExchange)
public ValidateAggregationsWithDefaultValues(boolean noExchange, boolean nativeExecution)
{
this.noExchange = noExchange;
this.nativeExecution = nativeExecution;
}

@Override
Expand Down Expand Up @@ -121,7 +123,7 @@ public Optional<SeenExchanges> visitAggregation(AggregationNode node, Void conte
if (!seenExchanges.localRepartitionExchange) {
// No local repartition exchange between final and partial aggregation.
// Make sure that final aggregation operators are executed by single thread.
StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session);
StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session, nativeExecution);
checkArgument(localProperties.isSingleStream(),
"Final aggregation with default value not separated from partial aggregation by local hash exchange");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,31 @@
public class ValidateStreamingAggregations
implements Checker
{
private final boolean nativeExecution;

public ValidateStreamingAggregations(boolean nativeExecution)
{
this.nativeExecution = nativeExecution;
}

@Override
public void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
{
planNode.accept(new Visitor(session, metadata), null);
planNode.accept(new Visitor(session, metadata, nativeExecution), null);
}

private static final class Visitor
extends InternalPlanVisitor<Void, Void>
{
private final Session session;
private final Metadata metadata;
private final boolean nativeExecution;

private Visitor(Session session, Metadata metadata)
private Visitor(Session session, Metadata metadata, boolean nativeExecution)
{
this.session = session;
this.metadata = metadata;
this.nativeExecution = nativeExecution;
}

@Override
Expand All @@ -73,7 +82,7 @@ public Void visitAggregation(AggregationNode node, Void context)
return null;
}

StreamProperties properties = derivePropertiesRecursively(node.getSource(), metadata, session);
StreamProperties properties = derivePropertiesRecursively(node.getSource(), metadata, session, nativeExecution);

List<LocalProperty<VariableReferenceExpression>> desiredProperties = ImmutableList.of(new GroupingProperty<>(node.getPreGroupedVariables()));
Iterator<Optional<LocalProperty<VariableReferenceExpression>>> matchIterator = LocalProperties.match(properties.getLocalProperties(), desiredProperties).iterator();
Expand Down
Loading
Loading