Skip to content

Commit

Permalink
[FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYI…
Browse files Browse the repository at this point in the history
…NG state (#25915)

* [FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYING state
  • Loading branch information
X-czh authored Jan 10, 2025
1 parent 89c7c2b commit 37c70c4
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1203,12 +1203,19 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
return;
}

if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
if (current == ExecutionState.CREATED) {
if (transitionState(current, targetState, cause)) {
// if we manage this state transition, then the invokable gets never called
// we need not call cancel on it
return;
}
} else if (current == ExecutionState.DEPLOYING) {
if (transitionState(current, targetState, cause)) {
// task may hang on the invokable constructor or static code
// we need watchdog to ensure the task does not remain hanging
startTaskCancellationWatchDog();
return;
}
} else if (current == ExecutionState.INITIALIZING
|| current == ExecutionState.RUNNING) {
if (transitionState(current, targetState, cause)) {
Expand Down Expand Up @@ -1271,29 +1278,7 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
FatalExitExceptionHandler.INSTANCE);
interruptingThread.start();

// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
if (taskCancellationTimeout > 0) {
Runnable cancelWatchdog =
new TaskCancelerWatchDog(
taskInfo,
executingThread,
taskManagerActions,
taskCancellationTimeout,
jobId);

Thread watchDogThread =
new Thread(
executingThread.getThreadGroup(),
cancelWatchdog,
String.format(
"Cancellation Watchdog for %s (%s).",
taskNameWithSubtask, executionId));
watchDogThread.setDaemon(true);
watchDogThread.setUncaughtExceptionHandler(
FatalExitExceptionHandler.INSTANCE);
watchDogThread.start();
}
startTaskCancellationWatchDog();
}
return;
}
Expand All @@ -1306,6 +1291,31 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
}
}

private void startTaskCancellationWatchDog() {
// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
if (taskCancellationTimeout > 0) {
Runnable cancelWatchdog =
new TaskCancelerWatchDog(
taskInfo,
executingThread,
taskManagerActions,
taskCancellationTimeout,
jobId);

Thread watchDogThread =
new Thread(
executingThread.getThreadGroup(),
cancelWatchdog,
String.format(
"Cancellation Watchdog for %s (%s).",
taskNameWithSubtask, executionId));
watchDogThread.setDaemon(true);
watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
watchDogThread.start();
}
}

// ------------------------------------------------------------------------
// Partition State Listeners
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,36 @@ public void testWatchDogInterruptsTask() throws Exception {
task.getExecutingThread().join();
}

/**
* Tests that interrupt happens via watch dog if canceller is stuck in cancel. Task cancellation
* blocks the task canceller. Interrupt after cancel via cancellation watch dog.
*/
@Test
public void testWatchDogThrowFatalErrorOnTaskStuckInInstantiation() throws Exception {
final InterruptOnFatalErrorTaskManagerActions taskManagerActions =
new InterruptOnFatalErrorTaskManagerActions();

final Configuration config = new Configuration();
config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(5));
config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(1000L));

final Task task =
createTaskBuilder()
.setInvokable(InvokableBlockingInInstantiation.class)
.setTaskManagerConfig(config)
.setTaskManagerActions(taskManagerActions)
.build(Executors.directExecutor());
taskManagerActions.setExecutingThread(task.getExecutingThread());

task.startTaskThread();
InvokableBlockingInInstantiation.await();
task.cancelExecution();
task.getExecutingThread().join();

// Expect fatal error to recover
assertTrue(taskManagerActions.hasFatalError());
}

/**
* The 'invoke' method holds a lock (trigger awaitLatch after acquisition) and cancel cannot
* complete because it also tries to acquire the same lock. This is resolved by the watch dog,
Expand Down Expand Up @@ -1284,6 +1314,26 @@ public void notifyFatalError(String message, Throwable cause) {
}
}

/** Customized TaskManagerActions that interrupts task thread on fatal error. */
private static class InterruptOnFatalErrorTaskManagerActions extends NoOpTaskManagerActions {
private boolean fatalError = false;
private Thread executingThread;

@Override
public void notifyFatalError(String message, Throwable cause) {
fatalError = true;
executingThread.interrupt();
}

public boolean hasFatalError() {
return fatalError;
}

public void setExecutingThread(Thread executingThread) {
this.executingThread = executingThread;
}
}

// ------------------------------------------------------------------------
// helper functions
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1571,6 +1621,30 @@ public void invoke() {
public void cancel() {}
}

/** {@link AbstractInvokable} which blocks in instantiation. */
public static final class InvokableBlockingInInstantiation extends AbstractInvokable {
/** Declared static, otherwise there's no way to access it when blocking in constructor. */
static final OneShotLatch AWAIT_LATCH = new OneShotLatch();

public InvokableBlockingInInstantiation(Environment environment)
throws InterruptedException {
super(environment);
while (true) {
synchronized (this) {
AWAIT_LATCH.trigger();
wait();
}
}
}

@Override
public void invoke() {}

static void await() throws InterruptedException {
AWAIT_LATCH.await();
}
}

// ------------------------------------------------------------------------
// test exceptions
// ------------------------------------------------------------------------
Expand Down

0 comments on commit 37c70c4

Please sign in to comment.