Skip to content

Commit 6b0426c

Browse files
Izerenrkhachatryan
authored andcommitted
[FLINK-38114] Allow asynchronous offloading of TaskRestore
1 parent bf1cd86 commit 6b0426c

File tree

15 files changed

+698
-201
lines changed

15 files changed

+698
-201
lines changed

flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.UUID;
3232
import java.util.concurrent.Executors;
3333
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.ThreadFactory;
3435
import java.util.function.Predicate;
3536

3637
import static org.assertj.core.api.Assertions.assertThat;
@@ -53,10 +54,38 @@ public static Duration infiniteDuration() {
5354
return Duration.ofDays(365L);
5455
}
5556

57+
// To make debugging logs easier, we use a custom thread factory that names the thread.
58+
static class JmMainSingleThreadPoolFactory implements ThreadFactory {
59+
public Thread newThread(final Runnable r) {
60+
return new Thread(r, "jm-main-thread");
61+
}
62+
}
63+
64+
// To make debugging logs easier, we use a custom thread factory that names the thread.
65+
static class AsyncSingleThreadPoolFactory implements ThreadFactory {
66+
public Thread newThread(final Runnable r) {
67+
return new Thread(r, "async-single-thread-pool");
68+
}
69+
}
70+
5671
public static TestExecutorExtension<ScheduledExecutorService> defaultExecutorExtension() {
5772
return new TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor);
5873
}
5974

75+
public static TestExecutorExtension<ScheduledExecutorService> jmMainThreadExecutorExtension() {
76+
return new TestExecutorExtension<>(
77+
() ->
78+
Executors.newSingleThreadScheduledExecutor(
79+
new JmMainSingleThreadPoolFactory()));
80+
}
81+
82+
public static TestExecutorExtension<ScheduledExecutorService> jmAsyncThreadExecutorExtension() {
83+
return new TestExecutorExtension<>(
84+
() ->
85+
Executors.newSingleThreadScheduledExecutor(
86+
new AsyncSingleThreadPoolFactory()));
87+
}
88+
6089
public static TestExecutorResource<ScheduledExecutorService> defaultExecutorResource() {
6190
return new TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor);
6291
}

flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ public Offloaded(PermanentBlobKey serializedValueKey) {
118118
*/
119119
private transient TaskInformation taskInformation;
120120

121+
/** Information to restore the task. This can be null if there is no state to restore. */
122+
@Nullable private final MaybeOffloaded<JobManagerTaskRestore> serializedTaskRestore;
123+
124+
/** Information to restore the task. This can be null if there is no state to restore. */
125+
@Nullable private transient JobManagerTaskRestore taskRestore;
126+
121127
/**
122128
* The ID referencing the job this task belongs to.
123129
*
@@ -138,16 +144,13 @@ public Offloaded(PermanentBlobKey serializedValueKey) {
138144
/** The list of consumed intermediate result partitions. */
139145
private final List<InputGateDeploymentDescriptor> inputGates;
140146

141-
/** Information to restore the task. This can be null if there is no state to restore. */
142-
@Nullable private final JobManagerTaskRestore taskRestore;
143-
144147
public TaskDeploymentDescriptor(
145148
JobID jobId,
146149
MaybeOffloaded<JobInformation> serializedJobInformation,
147150
MaybeOffloaded<TaskInformation> serializedTaskInformation,
148151
ExecutionAttemptID executionAttemptId,
149152
AllocationID allocationId,
150-
@Nullable JobManagerTaskRestore taskRestore,
153+
@Nullable MaybeOffloaded<JobManagerTaskRestore> serializedTaskRestore,
151154
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
152155
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
153156

@@ -159,7 +162,7 @@ public TaskDeploymentDescriptor(
159162
this.executionId = Preconditions.checkNotNull(executionAttemptId);
160163
this.allocationId = Preconditions.checkNotNull(allocationId);
161164

162-
this.taskRestore = taskRestore;
165+
this.serializedTaskRestore = serializedTaskRestore;
163166

164167
this.producedPartitions = Preconditions.checkNotNull(resultPartitionDeploymentDescriptors);
165168
this.inputGates = Preconditions.checkNotNull(inputGateDeploymentDescriptors);
@@ -241,8 +244,16 @@ public List<InputGateDeploymentDescriptor> getInputGates() {
241244
}
242245

243246
@Nullable
244-
public JobManagerTaskRestore getTaskRestore() {
245-
return taskRestore;
247+
public JobManagerTaskRestore getTaskRestore() throws IOException, ClassNotFoundException {
248+
if (taskRestore != null || serializedTaskRestore == null) {
249+
return taskRestore;
250+
}
251+
if (serializedTaskRestore instanceof NonOffloaded) {
252+
NonOffloaded<JobManagerTaskRestore> taskRestore =
253+
(NonOffloaded<JobManagerTaskRestore>) serializedTaskRestore;
254+
return taskRestore.serializedValue.deserializeValue(getClass().getClassLoader());
255+
}
256+
throw new IllegalStateException("Trying to work with offloaded serialized task restore.");
246257
}
247258

248259
public AllocationID getAllocationId() {
@@ -311,6 +322,21 @@ public void loadBigData(
311322
this.taskInformation = taskInformation.deepCopy();
312323
}
313324

325+
if (serializedTaskRestore == null) {
326+
this.taskRestore = null;
327+
} else if (serializedTaskRestore instanceof Offloaded) {
328+
final PermanentBlobKey blobKey =
329+
((Offloaded<JobManagerTaskRestore>) serializedTaskRestore).serializedValueKey;
330+
331+
Preconditions.checkNotNull(blobService);
332+
333+
final File dataFile = blobService.getFile(jobId, blobKey);
334+
taskRestore =
335+
InstantiationUtil.deserializeObject(
336+
new BufferedInputStream(Files.newInputStream(dataFile.toPath())),
337+
getClass().getClassLoader());
338+
}
339+
314340
for (InputGateDeploymentDescriptor inputGate : inputGates) {
315341
inputGate.tryLoadAndDeserializeShuffleDescriptors(
316342
blobService, jobId, shuffleDescriptorsCache);

flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public MaybeOffloaded<JobInformation> getSerializedJobInformation() {
112112
public TaskDeploymentDescriptor createDeploymentDescriptor(
113113
Execution execution,
114114
AllocationID allocationID,
115-
@Nullable JobManagerTaskRestore taskRestore,
115+
@Nullable Either<SerializedValue<JobManagerTaskRestore>, PermanentBlobKey> taskRestore,
116116
Collection<ResultPartitionDeploymentDescriptor> producedPartitions)
117117
throws IOException, ClusterDatasetCorruptedException {
118118
final ExecutionVertex executionVertex = execution.getVertex();
@@ -124,7 +124,7 @@ public TaskDeploymentDescriptor createDeploymentDescriptor(
124124
executionVertex.getJobVertex().getTaskInformationOrBlobKey()),
125125
execution.getAttemptId(),
126126
allocationID,
127-
taskRestore,
127+
taskRestore != null ? getSerializedTaskRestore(taskRestore) : null,
128128
new ArrayList<>(producedPartitions),
129129
createInputGateDeploymentDescriptors(executionVertex));
130130
}
@@ -295,6 +295,13 @@ private static MaybeOffloaded<TaskInformation> getSerializedTaskInformation(
295295
: new TaskDeploymentDescriptor.Offloaded<>(taskInfo.right());
296296
}
297297

298+
private static MaybeOffloaded<JobManagerTaskRestore> getSerializedTaskRestore(
299+
Either<SerializedValue<JobManagerTaskRestore>, PermanentBlobKey> either) {
300+
return either.isLeft()
301+
? new TaskDeploymentDescriptor.NonOffloaded<>(either.left())
302+
: new TaskDeploymentDescriptor.Offloaded<>(either.right());
303+
}
304+
298305
public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
299306
IntermediateResultPartition consumedPartition,
300307
PartitionLocationConstraint partitionDeploymentConstraint,

0 commit comments

Comments
 (0)