Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,19 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHint;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.transforms.windowing.AfterAll;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TriggerVisitor;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down Expand Up @@ -134,6 +146,77 @@ public class DataflowPipelineTranslator {
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* Checks to see whether the Trigger tree is compatible with combiner lifting.
*/
private static class TriggerCombinerLiftingCompatibility implements TriggerVisitor<Boolean> {
@Override
public Boolean visit(DefaultTrigger trigger) {
return true;
}

@Override
public Boolean visit(AfterWatermark.FromEndOfWindow trigger) {
return true;
}

@Override
public Boolean visit(AfterWatermark.AfterWatermarkEarlyAndLate trigger) {
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
}

@Override
public Boolean visit(Never.NeverTrigger trigger) {
return true;
}

@Override
public Boolean visit(ReshuffleTrigger<?> trigger) {
return false;
}

@Override
public Boolean visit(AfterProcessingTime trigger) {
return true;
}

@Override
public Boolean visit(AfterSynchronizedProcessingTime trigger) {
return true;
}

@Override
public Boolean visit(AfterFirst trigger) {
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
}

@Override
public Boolean visit(AfterAll trigger) {
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
}

@Override
public Boolean visit(AfterEach trigger) {
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
}

@Override
public Boolean visit(AfterPane trigger) {
// Combiner lifting not supported for count triggers.
return false;
}

@Override
public Boolean visit(Repeatedly trigger) {
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
}

@Override
public Boolean visit(OrFinallyTrigger trigger) {
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
}
}

private static byte[] serializeWindowingStrategy(
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
try {
Expand Down Expand Up @@ -970,8 +1053,8 @@ private <K, V> void groupByKeyHelper(
&& windowingStrategy.getWindowFn().assignsToOneWindow();
if (isStreaming) {
allowCombinerLifting &= transform.fewKeys();
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger);
allowCombinerLifting &=
windowingStrategy.getTrigger().accept(new TriggerCombinerLiftingCompatibility());
}
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting);
stepContext.addInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow;

import static org.apache.beam.runners.dataflow.util.Structs.getBoolean;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -89,11 +90,13 @@
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -110,7 +113,17 @@
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.AfterAll;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.DoFnInfo;
Expand Down Expand Up @@ -235,6 +248,131 @@ private static DataflowPipelineOptions buildPipelineOptions() throws IOException
return options;
}

private void testTriggerCombinerLiftingDisabled(Trigger trigger) throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
options.setRunner(DataflowRunner.class);
options.as(StreamingOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);

p.traverseTopologically(new RecordingPipelineVisitor());
SdkComponents sdkComponents = createSdkComponents(options);

p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()))
.setIsBoundedInternal(IsBounded.UNBOUNDED)
.apply("window", Window.<Integer>configure().triggering(trigger).discardingFiredPanes())
.apply("count", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());

RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
DataflowPipelineOptions translatorOptions =
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
translatorOptions.setStreaming(true);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(translatorOptions);

JobSpecification jobSpecification =
t.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());

boolean foundDisable = false;
for (Step step : jobSpecification.getJob().getSteps()) {
if (getBoolean(step.getProperties(), PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
foundDisable = true;
}
}
assertTrue(foundDisable);
}

@Test
public void testRepeatedCountTriggerDisablesCombinerLifting() throws IOException, Exception {
testTriggerCombinerLiftingDisabled(Repeatedly.forever((AfterPane.elementCountAtLeast(1))));
}

@Test
public void testEarlyCountTriggerDisablesCombinerLifting() throws IOException, Exception {
testTriggerCombinerLiftingDisabled(
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)));
}

@Test
public void testAfterFirstCountTriggerDisablesCombinerLifting() throws IOException, Exception {
testTriggerCombinerLiftingDisabled(
Repeatedly.forever(AfterFirst.of(Never.ever(), AfterPane.elementCountAtLeast(1))));
}

@Test
public void testAfterAllCountTriggerDisablesCombinerLifting() throws IOException, Exception {
testTriggerCombinerLiftingDisabled(
Repeatedly.forever(AfterAll.of(Never.ever(), AfterPane.elementCountAtLeast(1))));
}

@Test
public void testCombinerLiftingEnabled() throws IOException, Exception {
DataflowPipelineOptions options = buildPipelineOptions();
options.setRunner(DataflowRunner.class);
options.as(StreamingOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);

p.traverseTopologically(new RecordingPipelineVisitor());
SdkComponents sdkComponents = createSdkComponents(options);

PCollection<Integer> input =
p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()));

input
.setIsBoundedInternal(IsBounded.UNBOUNDED)
.apply(
"window1",
Window.<Integer>into(FixedWindows.of(Duration.millis(1)))
.triggering(DefaultTrigger.of())
.discardingFiredPanes())
.apply("count", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());

input
.apply(
"window2",
Window.<Integer>configure()
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes())
.apply("count2", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());

input
.apply(
"window3",
Window.<Integer>configure()
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))
.withLateFirings(AfterSynchronizedProcessingTime.ofFirstElement()))
.discardingFiredPanes())
.apply("count3", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());

RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
DataflowPipelineOptions translatorOptions =
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
translatorOptions.setStreaming(true);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(translatorOptions);

JobSpecification jobSpecification =
t.translate(
p,
pipelineProto,
sdkComponents,
DataflowRunner.fromOptions(options),
Collections.emptyList());

boolean foundDisable = false;
for (Step step : jobSpecification.getJob().getSteps()) {
if (getBoolean(step.getProperties(), PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
foundDisable = true;
}
}
assertFalse(foundDisable);
}

// Test that the transform names for Storage Write API for streaming pipelines are what we expect
// them to be. This is required since the Windmill backend expects the step to contain that name.
// For a more stable solution, we should use URN, but that is not currently used in the legacy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return deadline;
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return new AfterAll(continuationTriggers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {

@Override
public boolean mayFinish() {
return subTriggers.stream().allMatch(trigger -> trigger.mayFinish());
return subTriggers.stream().allMatch(Trigger::mayFinish);
}

@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return Repeatedly.forever(new AfterFirst(continuationTriggers));
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return deadline;
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return new AfterFirst(continuationTriggers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return AfterPane.elementCountAtLeast(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return AfterSynchronizedProcessingTime.ofFirstElement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
public Trigger getContinuationTrigger() {
return new AfterWatermarkEarlyAndLate(
Expand Down Expand Up @@ -177,6 +182,11 @@ protected FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTrigg
return this;
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
return TO_STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public boolean isCompatible(Trigger other) {
return other instanceof DefaultTrigger;
}

@Override
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
return visitor.visit(this);
}

@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
Expand Down
Loading
Loading