Skip to content

Commit 1a5c2eb

Browse files
authored
[Improve][Zeta] Filter tasks and pipelines by state (#9926)
1 parent 10c6a6f commit 1a5c2eb

File tree

4 files changed

+181
-17
lines changed

4 files changed

+181
-17
lines changed

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;
5151

52+
import com.hazelcast.jet.datamodel.Tuple2;
5253
import com.hazelcast.map.IMap;
5354
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
5455
import lombok.Getter;
@@ -58,6 +59,7 @@
5859
import java.time.Instant;
5960
import java.util.ArrayDeque;
6061
import java.util.ArrayList;
62+
import java.util.HashMap;
6163
import java.util.HashSet;
6264
import java.util.List;
6365
import java.util.Map;
@@ -787,26 +789,45 @@ private Set<Long> getNotYetAcknowledgedTasks() {
787789
}
788790

789791
private Map<ActionStateKey, ActionState> getActionStates() {
790-
// TODO: some tasks have completed and will not submit state again.
791-
return plan.getPipelineActions().entrySet().stream()
792+
Map<ActionStateKey, Integer> pipelineActions = new HashMap<>(plan.getPipelineActions());
793+
Set<ActionStateKey> closedActionKeys =
794+
plan.getSubtaskActions().entrySet().stream()
795+
.filter(
796+
entry ->
797+
SeaTunnelTaskState.CLOSED.equals(
798+
this.pipelineTaskStatus.get(
799+
entry.getKey().getTaskID())))
800+
.flatMap(entry -> entry.getValue().stream().map(Tuple2::f0))
801+
.collect(Collectors.toSet());
802+
pipelineActions.keySet().removeAll(closedActionKeys);
803+
804+
return pipelineActions.entrySet().stream()
792805
.collect(
793806
Collectors.toMap(
794807
Map.Entry::getKey,
795808
entry -> new ActionState(entry.getKey(), entry.getValue())));
796809
}
797810

798811
private Map<Long, TaskStatistics> getTaskStatistics() {
799-
// TODO: some tasks have completed and don't need to be ack
800-
return this.pipelineTasks.entrySet().stream()
812+
Map<Long, Integer> tasks = new HashMap<>(this.pipelineTasks);
813+
for (Long taskId : this.pipelineTasks.keySet()) {
814+
if (SeaTunnelTaskState.CLOSED.equals(this.pipelineTaskStatus.get(taskId))) {
815+
tasks.remove(taskId);
816+
}
817+
}
818+
return tasks.entrySet().stream()
801819
.collect(
802820
Collectors.toMap(
803821
Map.Entry::getKey,
804822
entry -> new TaskStatistics(entry.getKey(), entry.getValue())));
805823
}
806824

807825
public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarrier) {
808-
// TODO: some tasks have completed and don't need to trigger
809826
return plan.getStartingSubtasks().stream()
827+
.filter(
828+
taskLocation ->
829+
!SeaTunnelTaskState.CLOSED.equals(
830+
this.pipelineTaskStatus.get(taskLocation.getTaskID())))
810831
.map(
811832
taskLocation ->
812833
new CheckpointBarrierTriggerOperation(
@@ -1100,4 +1121,9 @@ public String getCheckpointStateImapKey() {
11001121
public PendingCheckpoint getSavepointPendingCheckpoint() {
11011122
return savepointPendingCheckpoint;
11021123
}
1124+
1125+
@VisibleForTesting
1126+
public Map<Long, SeaTunnelTaskState> getPipelineTaskStatus() {
1127+
return pipelineTaskStatus;
1128+
}
11031129
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,24 @@ public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
159159
.getJobConfig()
160160
.getEnvOptions()
161161
.get(EnvCommonOptions.NODE_TAG_FILTER.key());
162-
// TODO Determine which tasks do not need to be restored according to state
163162
CopyOnWriteArrayList<PassiveCompletableFuture<PipelineStatus>>
164163
waitForCompleteBySubPlanList = new CopyOnWriteArrayList<>();
165164

165+
List<Pipeline> unclosedPipelines = new ArrayList<>();
166+
for (Pipeline pipeline : this.pipelines) {
167+
PipelineLocation pipelineLocation =
168+
new PipelineLocation(jobImmutableInformation.getJobId(), pipeline.getId());
169+
PipelineStatus pipelineStatus =
170+
(PipelineStatus) runningJobStateIMap.get(pipelineLocation);
171+
if (!PipelineStatus.FINISHED.equals(pipelineStatus)) {
172+
unclosedPipelines.add(pipeline);
173+
}
174+
}
175+
166176
Map<Integer, CheckpointPlan> checkpointPlans = new HashMap<>();
167-
final int totalPipelineNum = pipelines.size();
177+
final int totalPipelineNum = unclosedPipelines.size();
168178
Stream<SubPlan> subPlanStream =
169-
pipelines.stream()
179+
unclosedPipelines.stream()
170180
.map(
171181
pipeline -> {
172182
this.pipelineTasks.clear();

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java

Lines changed: 123 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.seatunnel.engine.server.checkpoint;
1919

2020
import org.apache.seatunnel.common.utils.ReflectionUtils;
21-
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
21+
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
2222
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
2323
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
2424
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
@@ -28,19 +28,28 @@
2828
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
2929
import org.apache.seatunnel.engine.server.execution.TaskLocation;
3030
import org.apache.seatunnel.engine.server.master.JobMaster;
31+
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
32+
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
3133

3234
import org.junit.jupiter.api.Assertions;
3335
import org.junit.jupiter.api.Test;
3436
import org.mockito.MockedStatic;
3537
import org.mockito.Mockito;
3638

39+
import com.hazelcast.jet.datamodel.Tuple2;
40+
import com.hazelcast.map.IMap;
41+
import com.hazelcast.spi.impl.NodeEngine;
3742
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
3843

3944
import java.time.Instant;
4045
import java.util.ArrayList;
46+
import java.util.Arrays;
4147
import java.util.Collections;
4248
import java.util.HashMap;
49+
import java.util.HashSet;
50+
import java.util.List;
4351
import java.util.Map;
52+
import java.util.Set;
4453
import java.util.concurrent.ExecutionException;
4554
import java.util.concurrent.ExecutorService;
4655
import java.util.concurrent.Executors;
@@ -54,7 +63,7 @@ public class CheckpointCoordinatorTest
5463
extends AbstractSeaTunnelServerTest<CheckpointCoordinatorTest> {
5564

5665
@Test
57-
void testACKNotExistPendingCheckpoint() throws CheckpointStorageException {
66+
void testACKNotExistPendingCheckpoint() {
5867
CheckpointConfig checkpointConfig = new CheckpointConfig();
5968
checkpointConfig.setStorage(new CheckpointStorageConfig());
6069
Map<Integer, CheckpointPlan> planMap = new HashMap<>();
@@ -80,8 +89,7 @@ void testACKNotExistPendingCheckpoint() throws CheckpointStorageException {
8089

8190
@Test
8291
void testSchedulerThreadShouldNotBeInterruptedBeforeJobMasterCleaned()
83-
throws CheckpointStorageException, ExecutionException, InterruptedException,
84-
TimeoutException {
92+
throws ExecutionException, InterruptedException, TimeoutException {
8593
CheckpointConfig checkpointConfig = new CheckpointConfig();
8694
// quickly fail the checkpoint
8795
checkpointConfig.setCheckpointTimeout(5000);
@@ -122,8 +130,7 @@ protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
122130

123131
@Test
124132
void testCheckpointContinuesWorkAfterClockDrift()
125-
throws CheckpointStorageException, ExecutionException, InterruptedException,
126-
TimeoutException {
133+
throws ExecutionException, InterruptedException, TimeoutException {
127134
CheckpointConfig checkpointConfig = new CheckpointConfig();
128135
checkpointConfig.setStorage(new CheckpointStorageConfig());
129136
checkpointConfig.setCheckpointTimeout(5000);
@@ -169,9 +176,7 @@ protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
169176
}
170177

171178
@Test
172-
void testCheckpointMinPause()
173-
throws CheckpointStorageException, ExecutionException, InterruptedException,
174-
TimeoutException {
179+
void testCheckpointMinPause() {
175180
CheckpointConfig checkpointConfig = new CheckpointConfig();
176181
checkpointConfig.setStorage(new CheckpointStorageConfig());
177182
checkpointConfig.setCheckpointInterval(10000); // 10 seconds
@@ -280,4 +285,113 @@ public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
280285
executorService.shutdownNow();
281286
}
282287
}
288+
289+
@Test
290+
void testFilteringClosedTasksAndActions() {
291+
CheckpointConfig checkpointConfig = new CheckpointConfig();
292+
checkpointConfig.setStorage(new CheckpointStorageConfig());
293+
Map<Integer, CheckpointPlan> planMap = new HashMap<>();
294+
planMap.put(1, CheckpointPlan.builder().pipelineId(1).build());
295+
TestCheckpointManager checkpointManager =
296+
new TestCheckpointManager(
297+
1L,
298+
nodeEngine,
299+
planMap,
300+
checkpointConfig,
301+
server.getCheckpointService().getCheckpointStorage(),
302+
instance.getExecutorService("test"),
303+
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
304+
305+
TaskGroupLocation group1 = new TaskGroupLocation(1L, 1, 1);
306+
TaskLocation task1 = new TaskLocation(group1, 1, 1);
307+
TaskLocation task2 = new TaskLocation(group1, 2, 1);
308+
309+
ActionStateKey actionKey1 = new ActionStateKey("action1");
310+
ActionStateKey actionKey2 = new ActionStateKey("action2");
311+
312+
Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>> subtaskActions = new HashMap<>();
313+
subtaskActions.put(task1, new HashSet<>(Arrays.asList(Tuple2.tuple2(actionKey1, 0))));
314+
subtaskActions.put(task2, new HashSet<>(Arrays.asList(Tuple2.tuple2(actionKey2, 0))));
315+
316+
Map<ActionStateKey, Integer> pipelineActions = new HashMap<>();
317+
pipelineActions.put(actionKey1, 1);
318+
pipelineActions.put(actionKey2, 1);
319+
320+
CheckpointPlan plan =
321+
CheckpointPlan.builder()
322+
.pipelineId(1)
323+
.pipelineSubtasks(new HashSet<>(Arrays.asList(task1, task2)))
324+
.startingSubtasks(new HashSet<>(Arrays.asList(task1, task2)))
325+
.subtaskActions(subtaskActions)
326+
.pipelineActions(pipelineActions)
327+
.build();
328+
329+
ExecutorService executor = Executors.newSingleThreadExecutor();
330+
CheckpointCoordinator coordinator =
331+
new CheckpointCoordinator(
332+
checkpointManager,
333+
null,
334+
checkpointConfig,
335+
1L,
336+
plan,
337+
null,
338+
null,
339+
executor,
340+
Mockito.mock(com.hazelcast.map.IMap.class),
341+
false);
342+
343+
Map<Long, SeaTunnelTaskState> taskStatus = coordinator.getPipelineTaskStatus();
344+
taskStatus.put(task1.getTaskID(), SeaTunnelTaskState.RUNNING);
345+
taskStatus.put(task2.getTaskID(), SeaTunnelTaskState.CLOSED);
346+
347+
Map<ActionStateKey, ActionState> actionStates =
348+
(Map<ActionStateKey, ActionState>)
349+
ReflectionUtils.invoke(coordinator, "getActionStates");
350+
Assertions.assertTrue(actionStates.containsKey(actionKey1));
351+
Assertions.assertFalse(actionStates.containsKey(actionKey2));
352+
353+
Map<Long, TaskStatistics> stats =
354+
(Map<Long, TaskStatistics>)
355+
ReflectionUtils.invoke(coordinator, "getTaskStatistics");
356+
Assertions.assertTrue(stats.containsKey(task1.getTaskID()));
357+
Assertions.assertFalse(stats.containsKey(task2.getTaskID()));
358+
359+
CheckpointBarrier barrier =
360+
new CheckpointBarrier(
361+
1L, System.currentTimeMillis(), CheckpointType.CHECKPOINT_TYPE);
362+
coordinator.triggerCheckpoint(barrier);
363+
Assertions.assertEquals(1, checkpointManager.operations.size());
364+
365+
executor.shutdownNow();
366+
}
367+
}
368+
369+
class TestCheckpointManager extends CheckpointManager {
370+
public List<TaskOperation> operations = new ArrayList<>();
371+
372+
public TestCheckpointManager(
373+
long jobId,
374+
NodeEngine nodeEngine,
375+
Map<Integer, CheckpointPlan> checkpointPlanMap,
376+
CheckpointConfig checkpointConfig,
377+
CheckpointStorage checkpointStorage,
378+
ExecutorService executorService,
379+
IMap<Object, Object> runningJobStateIMap) {
380+
super(
381+
jobId,
382+
false,
383+
nodeEngine,
384+
null,
385+
checkpointPlanMap,
386+
checkpointConfig,
387+
checkpointStorage,
388+
executorService,
389+
runningJobStateIMap);
390+
}
391+
392+
@Override
393+
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
394+
this.operations.add(operation);
395+
return null;
396+
}
283397
}

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
3333
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
3434
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
35+
import org.apache.seatunnel.engine.server.dag.physical.UnknownPhysicalPlanException;
3536
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
3637
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
3738
import org.apache.seatunnel.engine.server.service.slot.SlotService;
@@ -248,6 +249,19 @@ public void testCloseIdleTask() throws InterruptedException {
248249
assertCloseIdleTask(jobMaster);
249250
}
250251

252+
@Test
253+
void testFilteringFinishedPipelinesInPhysicalPlanGenerator() throws Exception {
254+
long jobId = instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
255+
JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
256+
257+
jobMaster
258+
.getRunningJobStateIMap()
259+
.put(new PipelineLocation(jobId, 1), PipelineStatus.FINISHED);
260+
Assertions.assertThrows(
261+
UnknownPhysicalPlanException.class,
262+
() -> jobMaster.init(System.currentTimeMillis(), false));
263+
}
264+
251265
private void assertCloseIdleTask(JobMaster jobMaster) {
252266
SlotService slotService = server.getSlotService();
253267
Assertions.assertEquals(4, slotService.getWorkerProfile().getAssignedSlots().length);

0 commit comments

Comments
 (0)