Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup
private static final long SPLIT_NOT_STARTED = -1L;
private long splitStartTime = SPLIT_NOT_STARTED;
private final MetricGroup splitWatermarkMetricGroup;
private final String splitId;

private InternalSourceSplitMetricGroup(
MetricGroup parentMetricGroup,
Expand All @@ -56,6 +57,7 @@ private InternalSourceSplitMetricGroup(
Gauge<Long> currentWatermark) {
super(parentMetricGroup);
this.clock = clock;
this.splitId = splitId;
splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK);
pausedTimePerSecond =
splitWatermarkMetricGroup.gauge(
Expand All @@ -75,20 +77,13 @@ private InternalSourceSplitMetricGroup(
MetricNames.SPLIT_CURRENT_WATERMARK, currentWatermark);
}

public static InternalSourceSplitMetricGroup wrap(
OperatorMetricGroup operatorMetricGroup, String splitId, Gauge<Long> currentWatermark) {
return new InternalSourceSplitMetricGroup(
operatorMetricGroup, SystemClock.getInstance(), splitId, currentWatermark);
}

@VisibleForTesting
public static InternalSourceSplitMetricGroup mock(
MetricGroup metricGroup, String splitId, Gauge<Long> currentWatermark) {
return new InternalSourceSplitMetricGroup(
metricGroup, SystemClock.getInstance(), splitId, currentWatermark);
}

@VisibleForTesting
public static InternalSourceSplitMetricGroup wrap(
OperatorMetricGroup operatorMetricGroup,
Clock clock,
Expand Down Expand Up @@ -118,7 +113,7 @@ public void markPaused() {
// If a split got paused it means it emitted records,
// hence it shouldn't be considered idle anymore
markNotIdle();
LOG.warn("Split marked paused while still idle");
LOG.warn("[{}] Split marked paused while still idle", splitId);
}
this.pausedTimePerSecond.markStart();
}
Expand All @@ -129,7 +124,7 @@ public void markIdle() {
// If a split is marked idle, it has no records to emit.
// hence it shouldn't be considered paused anymore
markNotPaused();
LOG.warn("Split marked idle while still paused");
LOG.warn("[{}] Split marked idle while still paused", splitId);
}
this.idleTimePerSecond.markStart();
}
Expand Down Expand Up @@ -208,4 +203,10 @@ public void onSplitFinished() {
public MetricGroup getSplitWatermarkMetricGroup() {
return splitWatermarkMetricGroup;
}

@VisibleForTesting
public void updateTimers() {
this.idleTimePerSecond.update();
this.pausedTimePerSecond.update();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
private final List<SplitT> splitsToInitializeOutput = new ArrayList<>();

private final Set<String> currentlyPausedSplits = new HashSet<>();
private final Set<String> currentlyIdleSplits = new HashSet<>();

private boolean waitingForCheckpoint;

Expand Down Expand Up @@ -395,6 +396,7 @@ protected InternalSourceSplitMetricGroup getOrCreateSplitMetricGroup(String spli
InternalSourceSplitMetricGroup splitMetricGroup =
InternalSourceSplitMetricGroup.wrap(
getMetricGroup(),
processingTimeService.getClock(),
splitId,
() -> sampledSplitWatermarks.get(splitId).getLatest());
splitMetricGroup.markSplitStart();
Expand Down Expand Up @@ -781,6 +783,13 @@ public void updateCurrentEffectiveWatermark(long watermark) {
public void updateCurrentSplitWatermark(String splitId, long watermark) {
WatermarkSampler splitWatermarkSampler = checkNotNull(sampledSplitWatermarks.get(splitId));
splitWatermarkSampler.addLatest(watermark);
if (!currentlyIdleSplits.contains(splitId)) {
maybePauseSplit(splitId);
}
}

private void maybePauseSplit(String splitId) {
Copy link
Contributor

@davidradl davidradl Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method name is strange - I was expecting it to start with a verb like the other methods. Maybe `updateCurrentSplitPausedWatermark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method checks if the split watermark is too advanced and if yes calls pauseOrResumeSplit for it, unless already paused.
I am not aware of a "paused watermark" concept, but open to other suggestions on how to name this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the maybePauseSplit decsribes pretty well what's happening. Also there are plenty of other cases with similar naming convention (method named maybeACTION).

Re updateCurrentSplitPausedWatermark:

  • I presume you ment updateCurrentPausedSplitWatermark, but in this case "Paused watermark" is still confusing as pointed by @Efrat19 . Even more correct would have been updateCurrentlyPausedSplit...
  • but updateCurrentlyPausedSplit doesn't capture that currently paused split split might not be updated
  • and updateCurrentlyPausedSplit is basically as synonym of pauseSplits

So:

  • maybePauseSplit and maybeUpdateCurrentlyPausedSplit are both technically correct maybePauseSplits imo sounds better, so I would keep it as is.

WatermarkSampler splitWatermarkSampler = checkNotNull(sampledSplitWatermarks.get(splitId));
long oldestSampledWatermark = splitWatermarkSampler.getOldestSample();
// oldestSampledWatermark can be only updated after adding new latest if sampling capacity
// is 0, but we still need to handle that
Expand All @@ -793,10 +802,22 @@ public void updateCurrentSplitWatermark(String splitId, long watermark) {

@Override
public void updateCurrentSplitIdle(String splitId, boolean idle) {
final InternalSourceSplitMetricGroup splitMetricGroup =
this.getOrCreateSplitMetricGroup(splitId);
if (idle == currentlyIdleSplits.contains(splitId)) {
return;
}
if (idle) {
this.getOrCreateSplitMetricGroup(splitId).markIdle();
LOG.info("[{}] Marking split idle", splitId);
currentlyIdleSplits.add(splitId);
splitMetricGroup.markIdle();
} else {
this.getOrCreateSplitMetricGroup(splitId).markNotIdle();
LOG.info("[{}] Marking split not idle", splitId);
currentlyIdleSplits.remove(splitId);
splitMetricGroup.markNotIdle();
// Since we skipped alignment check
// for this split while it was idle:
maybePauseSplit(splitId);
}
}

Expand All @@ -805,6 +826,7 @@ public void splitFinished(String splitId) {
getOrCreateSplitMetricGroup(splitId).onSplitFinished();
this.splitMetricGroups.remove(splitId);
sampledSplitWatermarks.remove(splitId);
currentlyIdleSplits.remove(splitId);
}

/**
Expand All @@ -818,6 +840,9 @@ private void checkSplitWatermarkAlignment() {
Collection<String> splitsToResume = new ArrayList<>();
sampledSplitWatermarks.forEach(
(splitId, splitWatermarks) -> {
if (currentlyIdleSplits.contains(splitId)) {
return;
}
if (splitWatermarks.getOldestSample() > currentMaxDesiredWatermark) {
splitsToPause.add(splitId);
} else if (currentlyPausedSplits.contains(splitId)) {
Expand All @@ -836,10 +861,11 @@ private void pauseOrResumeSplits(
Collection<String> splitsToPause, Collection<String> splitsToResume) {
try {
LOG.info(
"pauseOrResumeSplits [splitsToPause={}][splitsToResume={}]"
"pauseOrResumeSplits [splitsToPause={}][splitsToResume={}][idleSplits={}]"
+ "[currentMaxDesiredWatermark={}][latestWatermark={}][oldestWatermark={}]",
splitsToPause,
splitsToResume,
currentlyIdleSplits,
currentMaxDesiredWatermark,
sampledLatestWatermark.getLatest(),
sampledLatestWatermark.getOldestSample());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ Licensed to the Apache Software Foundation (ASF) under one
import static org.apache.flink.configuration.PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

/** Unit test for split alignment in {@link SourceOperator}. */
class SourceOperatorSplitWatermarkAlignmentTest {
Expand Down Expand Up @@ -501,6 +503,72 @@ void testStateReportingForSingleSplitWatermarkAlignmentAndIdleness() throws Exce
assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
}

@Test
void testAlignmentCheckIsDeferredForIdleSplits() throws Exception {
final long idleTimeout = 100;
final MockSourceReader sourceReader =
new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, true, true);
final TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
// Split states math assumes non-negative time
processingTimeService.setCurrentTime(0);
final SourceOperator<Integer, MockSourceSplit> operator =
createAndOpenSourceOperatorWithIdleness(
sourceReader, processingTimeService, idleTimeout);

final MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
final int allowedWatermark4 = 4;
final int allowedWatermark7 = 7;
split0.addRecord(5);
split0.addRecord(6);
split0.addRecord(7);
split0.addRecord(8);
operator.handleOperatorEvent(
new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer()));
final CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>();

// Emit enough record to fill the sampler buffer
operator.emitNext(actualOutput);
operator.emitNext(actualOutput);
operator.emitNext(actualOutput);
sampleAllWatermarks(processingTimeService);

// Transition the split to idle state:
for (int i = 0; i < 10; i++) {
processingTimeService.advance(idleTimeout);
}
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();

// Alignment check fires but doesn't pause the idle split
operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark4));
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
Comment on lines +541 to +543
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that the split hasn't been paused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// While the split is idle, we advance the allowed watermark to keep the source active
operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark7));
sampleAllWatermarks(processingTimeService);
// The split is still idle:
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();

// updating timers values manually (in reality this is done by ViewUpdater)
operator.getSplitMetricGroup(split0.splitId()).updateTimers();
// Ensure the idle timer ticked, but not pause timer
assertNotEquals(
0L, operator.getSplitMetricGroup(split0.splitId()).getAccumulatedIdleTime());
assertEquals(0L, operator.getSplitMetricGroup(split0.splitId()).getAccumulatedPausedTime());

// The split emits a record to break out of idleness
operator.emitNext(actualOutput);
sampleAllWatermarks(processingTimeService);

// The split is marked not idle, then immediately paused by the deferred alignment check
assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();

// Make pause timer tick
processingTimeService.advance(10);
operator.getSplitMetricGroup(split0.splitId()).updateTimers();
assertNotEquals(
0L, operator.getSplitMetricGroup(split0.splitId()).getAccumulatedPausedTime());
}

private void sampleAllWatermarks(TestProcessingTimeService timeService) throws Exception {
sampleWatermarks(timeService, WATERMARK_ALIGNMENT_BUFFER_SIZE.defaultValue());
}
Expand Down