Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37021][state/forst] Implement fast cp/restore for ForSt StateBackend #25924

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.state.filesystem;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -123,15 +122,18 @@ public FsCheckpointStorageAccess(

// ------------------------------------------------------------------------

@VisibleForTesting
Path getCheckpointsDirectory() {
public Path getCheckpointsDirectory() {
return checkpointsDirectory;
}

public Path getSharedStateDirectory() {
return sharedStateDirectory;
}

public FileSystem getFileSystem() {
return fileSystem;
}

// ------------------------------------------------------------------------
// CheckpointStorage implementation
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,23 @@
public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> {

protected MockEnvironment env;
protected JobID jobID;

// lazily initialized stream storage
private CheckpointStreamFactory checkpointStreamFactory;

@BeforeEach
void before() throws Exception {
jobID = new JobID();
env = buildMockEnv();
}

private MockEnvironment buildMockEnv() throws Exception {
protected MockEnvironment buildMockEnv() throws Exception {
MockEnvironment mockEnvironment =
MockEnvironment.builder().setTaskStateManager(getTestTaskStateManager()).build();
MockEnvironment.builder()
.setTaskStateManager(getTestTaskStateManager())
.setJobID(jobID)
.build();
mockEnvironment.setCheckpointStorageAccess(getCheckpointStorageAccess());
return mockEnvironment;
}
Expand All @@ -114,6 +119,8 @@ void after() {

protected abstract ConfigurableStateBackend getStateBackend() throws Exception;

protected abstract void restoreJob() throws Exception;

protected CheckpointStorage getCheckpointStorage() throws Exception {
ConfigurableStateBackend stateBackend = getStateBackend();
if (stateBackend instanceof CheckpointStorage) {
Expand All @@ -127,14 +134,14 @@ protected CheckpointStorage getCheckpointStorage() throws Exception {
}

protected CheckpointStorageAccess getCheckpointStorageAccess() throws Exception {
return getCheckpointStorage().createCheckpointStorage(new JobID());
return getCheckpointStorage().createCheckpointStorage(jobID);
}

protected CheckpointStreamFactory createStreamFactory() throws Exception {
if (checkpointStreamFactory == null) {
checkpointStreamFactory =
getCheckpointStorage()
.createCheckpointStorage(new JobID())
.createCheckpointStorage(jobID)
.resolveCheckpointStorageLocation(
1L, CheckpointStorageLocationReference.getDefault());
}
Expand All @@ -143,7 +150,11 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception {
}

protected <K> AsyncKeyedStateBackend<K> createAsyncKeyedBackend(
TypeSerializer<K> keySerializer, KeyGroupRange keyGroupRange, Environment env)
int subtaskId,
int parallelism,
TypeSerializer<K> keySerializer,
KeyGroupRange keyGroupRange,
Environment env)
throws Exception {

env.setCheckpointStorageAccess(getCheckpointStorageAccess());
Expand All @@ -152,8 +163,8 @@ protected <K> AsyncKeyedStateBackend<K> createAsyncKeyedBackend(
.createAsyncKeyedStateBackend(
new KeyedStateBackendParametersImpl<>(
env,
new JobID(),
"test_op",
jobID,
String.format("test_op_%d_%d", subtaskId, parallelism),
keySerializer,
keyGroupRange.getNumberOfKeyGroups(),
keyGroupRange,
Expand All @@ -171,6 +182,8 @@ protected StateBackend.CustomInitializationMetrics getCustomInitializationMetric
}

protected <K> AsyncKeyedStateBackend<K> restoreAsyncKeyedBackend(
int subtaskId,
int parallelism,
TypeSerializer<K> keySerializer,
KeyGroupRange keyGroupRange,
List<KeyedStateHandle> state,
Expand All @@ -181,8 +194,8 @@ protected <K> AsyncKeyedStateBackend<K> restoreAsyncKeyedBackend(
.createAsyncKeyedStateBackend(
new KeyedStateBackendParametersImpl<>(
env,
new JobID(),
"test_op",
jobID,
String.format("test_op_%d_%d", subtaskId, parallelism),
keySerializer,
keyGroupRange.getNumberOfKeyGroups(),
keyGroupRange,
Expand Down Expand Up @@ -221,6 +234,8 @@ void testAsyncKeyedStateBackendSnapshot() throws Exception {
try {
backend =
createAsyncKeyedBackend(
0,
1,
IntSerializer.INSTANCE,
new KeyGroupRange(0, jobMaxParallelism - 1),
env);
Expand Down Expand Up @@ -307,10 +322,13 @@ void testAsyncKeyedStateBackendSnapshot() throws Exception {
assertThat(stateHandle).isNotNull();

backend = null;
restoreJob();

try {
backend =
restoreAsyncKeyedBackend(
0,
1,
IntSerializer.INSTANCE,
new KeyGroupRange(0, jobMaxParallelism - 1),
Collections.singletonList(stateHandle),
Expand Down Expand Up @@ -401,7 +419,8 @@ private void testKeyGroupSnapshotRestore(
maxParallelism * i / sourceParallelism,
maxParallelism * (i + 1) / sourceParallelism - 1);
AsyncKeyedStateBackend<Integer> backend =
createAsyncKeyedBackend(IntSerializer.INSTANCE, range, env);
createAsyncKeyedBackend(
i, sourceParallelism, IntSerializer.INSTANCE, range, env);
aec =
new AsyncExecutionController<>(
new SyncMailboxExecutor(),
Expand All @@ -426,7 +445,6 @@ private void testKeyGroupSnapshotRestore(
int keyInKeyGroup =
getKeyInKeyGroup(random, maxParallelism, KeyGroupRange.of(j, j));
RecordContext recordContext = aec.buildContext(keyInKeyGroup, keyInKeyGroup);
;
recordContext.retain();
aec.setCurrentContext(recordContext);
keyInKeyGroups.add(keyInKeyGroup);
Expand Down Expand Up @@ -469,6 +487,8 @@ private void testKeyGroupSnapshotRestore(
for (int i = 0; i < targetParallelism; ++i) {
AsyncKeyedStateBackend<Integer> backend =
restoreAsyncKeyedBackend(
i,
targetParallelism,
IntSerializer.INSTANCE,
keyGroupRangesRestore.get(i),
keyGroupStatesAfterDistribute.get(i),
Expand Down Expand Up @@ -522,7 +542,8 @@ void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception {
String fieldName = "key-grouped-priority-queue";
AsyncKeyedStateBackend<Integer> backend =
createAsyncKeyedBackend(IntSerializer.INSTANCE, new KeyGroupRange(0, 127), env);
createAsyncKeyedBackend(
0, 1, IntSerializer.INSTANCE, new KeyGroupRange(0, 127), env);
try {
KeyGroupedInternalPriorityQueue<TestType> priorityQueue =
backend.create(fieldName, new TestType.V1TestTypeSerializer());
Expand Down Expand Up @@ -578,7 +599,8 @@ void testValueStateWorkWithTtl() throws Exception {
TestAsyncFrameworkExceptionHandler testExceptionHandler =
new TestAsyncFrameworkExceptionHandler();
AsyncKeyedStateBackend<Long> backend =
createAsyncKeyedBackend(LongSerializer.INSTANCE, new KeyGroupRange(0, 127), env);
createAsyncKeyedBackend(
0, 1, LongSerializer.INSTANCE, new KeyGroupRange(0, 127), env);
AsyncExecutionController<Long> aec =
new AsyncExecutionController<>(
new SyncMailboxExecutor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
Expand All @@ -41,7 +42,7 @@
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.restore.ForStHeapTimersFullRestoreOperation;
import org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation;
import org.apache.flink.state.forst.restore.ForStNoneRestoreOperation;
Expand All @@ -65,7 +66,6 @@
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -133,6 +133,8 @@ public class ForStKeyedStateBackendBuilder<K>
private double overlapFractionThreshold = 0.5;
private boolean useIngestDbRestoreMode = false;

private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;

public ForStKeyedStateBackendBuilder(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
Expand Down Expand Up @@ -194,6 +196,11 @@ ForStKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange(
return this;
}

ForStKeyedStateBackendBuilder<K> setRecoveryClaimMode(RecoveryClaimMode recoveryClaimMode) {
this.recoveryClaimMode = recoveryClaimMode;
return this;
}

@Override
public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
ColumnFamilyHandle defaultColumnFamilyHandle = null;
Expand Down Expand Up @@ -241,6 +248,7 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {

try {
optionsContainer.prepareDirectories();
optionsContainer.buildDataTransferStrategy();
restoreOperation =
getForStRestoreOperation(
keyGroupPrefixBytes,
Expand Down Expand Up @@ -381,7 +389,8 @@ private ForStRestoreOperation getForStRestoreOperation(
restoreStateHandles, IncrementalRemoteKeyedStateHandle.class),
overlapFractionThreshold,
useIngestDbRestoreMode,
rescalingUseDeleteFilesInRange);
rescalingUseDeleteFilesInRange,
recoveryClaimMode);
} else if (priorityQueueConfig.getPriorityQueueStateType()
== ForStStateBackend.PriorityQueueStateType.HEAP) {
// Note: This branch can be touched after ForSt Support canonical savepoint,
Expand Down Expand Up @@ -421,12 +430,11 @@ private ForStRestoreOperation getForStRestoreOperation(
@Nonnull
SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>
uploadedStateHandles,
long lastCompletedCheckpointId)
throws IOException {

ForStFlinkFileSystem forStFs = optionsContainer.getFileSystem();
long lastCompletedCheckpointId) {
ForStStateDataTransfer stateTransfer =
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs);
new ForStStateDataTransfer(
ForStStateDataTransfer.DEFAULT_THREAD_NUM,
optionsContainer.getDataTransferStrategy());

if (enableIncrementalCheckpointing) {
return new ForStIncrementalSnapshotStrategy<>(
Expand Down
Loading