Skip to content

Commit c12d73f

Browse files
committed
[FLINK-37605][runtime] Infer checkpoint id on endInput in sink
So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times. With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is - higher than all checkpoint ids of the previous, successful checkpoints of this attempt - higher than the checkpoint id of the restored checkpoint - lower than any future checkpoint id. Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not impact correctness at all. (cherry picked from commit 9302545)
1 parent de906fa commit c12d73f

File tree

11 files changed

+81
-175
lines changed

11 files changed

+81
-175
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.core.memory.DataInputDeserializer;
3535
import org.apache.flink.core.memory.DataInputView;
3636
import org.apache.flink.core.memory.DataOutputSerializer;
37+
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
3738
import org.apache.flink.runtime.state.StateInitializationContext;
3839
import org.apache.flink.runtime.state.StateSnapshotContext;
3940
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -106,6 +107,8 @@ public class CompactorOperator
106107
// submitted again while restoring
107108
private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
108109

110+
private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
111+
109112
public CompactorOperator(
110113
FileCompactStrategy strategy,
111114
SimpleVersionedSerializer<FileSinkCommittable> committableSerializer,
@@ -136,15 +139,16 @@ public void processElement(StreamRecord<CompactorRequest> element) throws Except
136139
@Override
137140
public void endInput() throws Exception {
138141
// add collecting requests into the final snapshot
139-
checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
142+
long checkpointId = lastKnownCheckpointId + 1;
143+
checkpointRequests.put(checkpointId, collectingRequests);
140144
collectingRequests = new ArrayList<>();
141145

142146
// submit all requests and wait until they are done
143-
submitUntil(CommittableMessage.EOI);
147+
submitUntil(checkpointId);
144148
assert checkpointRequests.isEmpty();
145149

146150
getAllTasksFuture().join();
147-
emitCompacted(CommittableMessage.EOI);
151+
emitCompacted(checkpointId);
148152
assert compactingRequests.isEmpty();
149153
}
150154

@@ -222,6 +226,8 @@ private void submitUntil(long checkpointId) {
222226
}
223227

224228
private void emitCompacted(long checkpointId) throws Exception {
229+
lastKnownCheckpointId = checkpointId;
230+
225231
List<FileSinkCommittable> compacted = new ArrayList<>();
226232
Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
227233
compactingRequests.iterator();

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ public interface CommittableMessage<CommT> {
2828
/**
2929
* Special value for checkpointId for the end of input in case of batch commit or final
3030
* checkpoint.
31+
*
32+
* @deprecated the special value is not used anymore at all (remove with Flink 2.2)
3133
*/
32-
long EOI = Long.MAX_VALUE;
34+
@Deprecated long EOI = Long.MAX_VALUE;
3335

3436
/** The subtask that created this committable. */
3537
int getSubtaskId();
@@ -49,6 +51,8 @@ default OptionalLong getCheckpointId() {
4951
/**
5052
* Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch
5153
* commit.
54+
*
55+
* @deprecated the special value EOI is not used anymore
5256
*/
5357
long getCheckpointIdOrEOI();
5458
}

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.configuration.SinkOptions;
2626
import org.apache.flink.core.io.SimpleVersionedSerializer;
2727
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
28+
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
2829
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
2930
import org.apache.flink.runtime.state.StateInitializationContext;
3031
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -51,7 +52,6 @@
5152
import java.util.Collections;
5253
import java.util.OptionalLong;
5354

54-
import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
5555
import static org.apache.flink.util.IOUtils.closeAll;
5656
import static org.apache.flink.util.Preconditions.checkNotNull;
5757

@@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
7676
private SinkCommitterMetricGroup metricGroup;
7777
private Committer<CommT> committer;
7878
private CommittableCollector<CommT> committableCollector;
79-
private long lastCompletedCheckpointId = -1;
79+
private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
8080
private int maxRetries;
8181

82-
private boolean endInput = false;
83-
8482
/** The operator's state descriptor. */
8583
private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
8684
new ListStateDescriptor<>(
@@ -131,11 +129,11 @@ public void initializeState(StateInitializationContext context) throws Exception
131129
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
132130
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
133131
metricGroup));
134-
if (context.isRestored()) {
132+
if (checkpointId.isPresent()) {
135133
committableCollectorState.get().forEach(cc -> committableCollector.merge(cc));
136134
lastCompletedCheckpointId = checkpointId.getAsLong();
137135
// try to re-commit recovered transactions as quickly as possible
138-
commitAndEmitCheckpoints();
136+
commitAndEmitCheckpoints(lastCompletedCheckpointId);
139137
}
140138
}
141139

@@ -148,24 +146,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
148146

149147
@Override
150148
public void endInput() throws Exception {
151-
endInput = true;
152149
if (!isCheckpointingEnabled || isBatchMode) {
153150
// There will be no final checkpoint, all committables should be committed here
154-
commitAndEmitCheckpoints();
151+
commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
155152
}
156153
}
157154

158155
@Override
159156
public void notifyCheckpointComplete(long checkpointId) throws Exception {
160157
super.notifyCheckpointComplete(checkpointId);
161-
lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
162-
commitAndEmitCheckpoints();
158+
commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId));
163159
}
164160

165-
private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
166-
long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId;
161+
private void commitAndEmitCheckpoints(long checkpointId)
162+
throws IOException, InterruptedException {
163+
lastCompletedCheckpointId = checkpointId;
167164
for (CheckpointCommittableManager<CommT> checkpointManager :
168-
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
165+
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
169166
// ensure that all committables of the first checkpoint are fully committed before
170167
// attempting the next committable
171168
commitAndEmit(checkpointManager);

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.api.common.state.ListState;
2424
import org.apache.flink.api.common.state.ListStateDescriptor;
2525
import org.apache.flink.api.common.typeutils.TypeSerializer;
26-
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
2726
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
2827
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
2928
import org.apache.flink.api.connector.sink2.Sink;
@@ -52,8 +51,6 @@
5251
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
5352
import org.apache.flink.util.UserCodeClassLoader;
5453

55-
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
56-
5754
import javax.annotation.Nullable;
5855

5956
import java.io.IOException;
@@ -62,6 +59,7 @@
6259
import java.util.List;
6360
import java.util.OptionalLong;
6461

62+
import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
6563
import static org.apache.flink.util.IOUtils.closeAll;
6664
import static org.apache.flink.util.Preconditions.checkNotNull;
6765
import static org.apache.flink.util.Preconditions.checkState;
@@ -91,13 +89,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
9189
@Nullable private final SimpleVersionedSerializer<CommT> committableSerializer;
9290
private final List<CommT> legacyCommittables = new ArrayList<>();
9391

94-
/**
95-
* Used to remember that EOI has already happened so that we don't emit the last committables of
96-
* the final checkpoints twice.
97-
*/
98-
private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
99-
new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE);
100-
10192
/** The runtime information of the input element. */
10293
private final Context<InputT> context;
10394

@@ -115,10 +106,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
115106
private final MailboxExecutor mailboxExecutor;
116107

117108
private boolean endOfInput = false;
118-
/**
119-
* Remembers the endOfInput state for (final) checkpoints iff the operator emits committables.
120-
*/
121-
@Nullable private ListState<Boolean> endOfInputState;
109+
private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1;
122110

123111
SinkWriterOperator(
124112
Sink<InputT> sink,
@@ -146,8 +134,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
146134
@Override
147135
public void initializeState(StateInitializationContext context) throws Exception {
148136
super.initializeState(context);
149-
WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId());
150-
if (context.isRestored()) {
137+
OptionalLong restoredCheckpointId = context.getRestoredCheckpointId();
138+
WriterInitContext initContext = createInitContext(restoredCheckpointId);
139+
if (restoredCheckpointId.isPresent()) {
140+
lastKnownCheckpointId = restoredCheckpointId.getAsLong();
151141
if (committableSerializer != null) {
152142
final ListState<List<CommT>> legacyCommitterState =
153143
new SimpleVersionedListState<>(
@@ -161,41 +151,12 @@ public void initializeState(StateInitializationContext context) throws Exception
161151
}
162152

163153
sinkWriter = writerStateHandler.createWriter(initContext, context);
164-
165-
if (emitDownstream) {
166-
// Figure out if we have seen end of input before and if we can suppress creating
167-
// transactions and sending them downstream to the CommitterOperator. We have the
168-
// following
169-
// cases:
170-
// 1. state is empty:
171-
// - First time initialization
172-
// - Restoring from a previous version of Flink that didn't handle EOI
173-
// - Upscaled from a final or regular checkpoint
174-
// In all cases, we regularly handle EOI, potentially resulting in duplicate summaries
175-
// that the CommitterOperator needs to handle.
176-
// 2. state is not empty:
177-
// - This implies Flink restores from a version that handles EOI.
178-
// - If there is one entry, no rescaling happened (for this subtask), so if it's true,
179-
// we recover from a final checkpoint (for this subtask) and can ignore another EOI
180-
// else we have a regular checkpoint.
181-
// - If there are multiple entries, Flink downscaled, and we need to check if all are
182-
// true and do the same as above. As soon as one entry is false, we regularly start
183-
// the writer and potentially emit duplicate summaries if we indeed recovered from a
184-
// final checkpoint.
185-
endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
186-
ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get());
187-
endOfInput = !previousState.isEmpty() && !previousState.contains(false);
188-
}
189154
}
190155

191156
@Override
192157
public void snapshotState(StateSnapshotContext context) throws Exception {
193158
super.snapshotState(context);
194159
writerStateHandler.snapshotState(context.getCheckpointId());
195-
if (endOfInputState != null) {
196-
endOfInputState.clear();
197-
endOfInputState.add(this.endOfInput);
198-
}
199160
}
200161

201162
@Override
@@ -225,17 +186,16 @@ public void processWatermark(Watermark mark) throws Exception {
225186

226187
@Override
227188
public void endInput() throws Exception {
189+
LOG.info("Received endInput");
228190
if (!endOfInput) {
229191
endOfInput = true;
230-
if (endOfInputState != null) {
231-
endOfInputState.add(true);
232-
}
233192
sinkWriter.flush(true);
234-
emitCommittables(CommittableMessage.EOI);
193+
emitCommittables(lastKnownCheckpointId + 1);
235194
}
236195
}
237196

238197
private void emitCommittables(long checkpointId) throws IOException, InterruptedException {
198+
lastKnownCheckpointId = checkpointId;
239199
if (!emitDownstream) {
240200
// To support SinkV1 topologies with only a writer we have to call prepareCommit
241201
// although no committables are forwarded

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ void addSummary(CommittableSummary<CommT> summary) {
9595
summary.getSubtaskId(),
9696
checkpointId,
9797
metricGroup);
98+
// Remove branch once CommittableMessage.EOI has been removed (earliest 2.2)
9899
if (checkpointId == CommittableMessage.EOI) {
99100
SubtaskCommittableManager<CommT> merged =
100101
subtasksCommittableManagers.merge(

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Map.Entry;
3434
import java.util.NavigableMap;
3535
import java.util.Objects;
36-
import java.util.Optional;
3736
import java.util.TreeMap;
3837
import java.util.stream.Collectors;
3938

@@ -49,7 +48,6 @@
4948
*/
5049
@Internal
5150
public class CommittableCollector<CommT> {
52-
private static final long EOI = Long.MAX_VALUE;
5351
/** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */
5452
private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
5553
checkpointCommittables;
@@ -143,15 +141,6 @@ public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCo
143141
return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values());
144142
}
145143

146-
/**
147-
* Returns {@link CheckpointCommittableManager} belonging to the last input.
148-
*
149-
* @return {@link CheckpointCommittableManager}
150-
*/
151-
public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() {
152-
return Optional.ofNullable(checkpointCommittables.get(EOI));
153-
}
154-
155144
/**
156145
* Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are
157146
* either committed or failed.

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.Collection;
3333
import java.util.List;
3434

35-
import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
3635
import static org.assertj.core.api.Assertions.assertThat;
3736

3837
class GlobalCommitterOperatorTest {
@@ -138,38 +137,6 @@ void testStateRestore() throws Exception {
138137
}
139138
}
140139

141-
@ParameterizedTest
142-
@ValueSource(booleans = {true, false})
143-
void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception {
144-
final MockCommitter committer = new MockCommitter();
145-
final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness =
146-
createTestHarness(committer, commitOnInput);
147-
testHarness.open();
148-
149-
final CommittableSummary<Integer> committableSummary =
150-
new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
151-
testHarness.processElement(new StreamRecord<>(committableSummary));
152-
final CommittableSummary<Integer> committableSummary2 =
153-
new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
154-
testHarness.processElement(new StreamRecord<>(committableSummary2));
155-
156-
final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1);
157-
testHarness.processElement(new StreamRecord<>(first));
158-
final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2);
159-
testHarness.processElement(new StreamRecord<>(second));
160-
161-
// commitOnInput implies that the global committer is not using notifyCheckpointComplete
162-
if (commitOnInput) {
163-
assertThat(committer.committed).containsExactly(1, 2);
164-
} else {
165-
assertThat(committer.committed).isEmpty();
166-
testHarness.notifyOfCompletedCheckpoint(EOI);
167-
assertThat(committer.committed).containsExactly(1, 2);
168-
}
169-
170-
assertThat(testHarness.getOutput()).isEmpty();
171-
}
172-
173140
private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness(
174141
Committer<Integer> committer, boolean commitOnInput) throws Exception {
175142
return new OneInputStreamOperatorTestHarness<>(

0 commit comments

Comments
 (0)