Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37024][task] Make cancel watchdog cover tasks stuck in DEPLOYING state #25915

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1203,12 +1203,19 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
return;
}

if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
if (current == ExecutionState.CREATED) {
if (transitionState(current, targetState, cause)) {
// if we manage this state transition, then the invokable gets never called
// we need not call cancel on it
return;
}
} else if (current == ExecutionState.DEPLOYING) {
if (transitionState(current, targetState, cause)) {
// task may hang on the invokable constructor or static code
// we need watchdog to ensure the task does not remain hanging
startTaskCancellationWatchDog();
return;
}
} else if (current == ExecutionState.INITIALIZING
|| current == ExecutionState.RUNNING) {
if (transitionState(current, targetState, cause)) {
Expand Down Expand Up @@ -1271,29 +1278,7 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
FatalExitExceptionHandler.INSTANCE);
interruptingThread.start();

// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
if (taskCancellationTimeout > 0) {
Runnable cancelWatchdog =
new TaskCancelerWatchDog(
taskInfo,
executingThread,
taskManagerActions,
taskCancellationTimeout,
jobId);

Thread watchDogThread =
new Thread(
executingThread.getThreadGroup(),
cancelWatchdog,
String.format(
"Cancellation Watchdog for %s (%s).",
taskNameWithSubtask, executionId));
watchDogThread.setDaemon(true);
watchDogThread.setUncaughtExceptionHandler(
FatalExitExceptionHandler.INSTANCE);
watchDogThread.start();
}
startTaskCancellationWatchDog();
}
return;
}
Expand All @@ -1306,6 +1291,31 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
}
}

private void startTaskCancellationWatchDog() {
// if a cancellation timeout is set, the watchdog thread kills the process
// if graceful cancellation does not succeed
if (taskCancellationTimeout > 0) {
Runnable cancelWatchdog =
new TaskCancelerWatchDog(
taskInfo,
executingThread,
taskManagerActions,
taskCancellationTimeout,
jobId);

Thread watchDogThread =
new Thread(
executingThread.getThreadGroup(),
cancelWatchdog,
String.format(
"Cancellation Watchdog for %s (%s).",
taskNameWithSubtask, executionId));
watchDogThread.setDaemon(true);
watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
watchDogThread.start();
}
}

// ------------------------------------------------------------------------
// Partition State Listeners
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,36 @@ public void testWatchDogInterruptsTask() throws Exception {
task.getExecutingThread().join();
}

/**
* Tests that interrupt happens via watch dog if canceller is stuck in cancel. Task cancellation
* blocks the task canceller. Interrupt after cancel via cancellation watch dog.
*/
@Test
public void testWatchDogThrowFatalErrorOnTaskStuckInInstantiation() throws Exception {
final InterruptOnFatalErrorTaskManagerActions taskManagerActions =
new InterruptOnFatalErrorTaskManagerActions();

final Configuration config = new Configuration();
config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(5));
config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(1000L));

final Task task =
createTaskBuilder()
.setInvokable(InvokableBlockingInInstantiation.class)
.setTaskManagerConfig(config)
.setTaskManagerActions(taskManagerActions)
.build(Executors.directExecutor());
taskManagerActions.setExecutingThread(task.getExecutingThread());

task.startTaskThread();
InvokableBlockingInInstantiation.await();
task.cancelExecution();
task.getExecutingThread().join();

// Expect fatal error to recover
assertTrue(taskManagerActions.hasFatalError());
}

/**
* The 'invoke' method holds a lock (trigger awaitLatch after acquisition) and cancel cannot
* complete because it also tries to acquire the same lock. This is resolved by the watch dog,
Expand Down Expand Up @@ -1284,6 +1314,26 @@ public void notifyFatalError(String message, Throwable cause) {
}
}

/** Customized TaskManagerActions that interrupts task thread on fatal error. */
private static class InterruptOnFatalErrorTaskManagerActions extends NoOpTaskManagerActions {
private boolean fatalError = false;
private Thread executingThread;

@Override
public void notifyFatalError(String message, Throwable cause) {
fatalError = true;
executingThread.interrupt();
}

public boolean hasFatalError() {
return fatalError;
}

public void setExecutingThread(Thread executingThread) {
this.executingThread = executingThread;
}
}

// ------------------------------------------------------------------------
// helper functions
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1571,6 +1621,30 @@ public void invoke() {
public void cancel() {}
}

/** {@link AbstractInvokable} which blocks in instantiation. */
public static final class InvokableBlockingInInstantiation extends AbstractInvokable {
/** Declared static, otherwise there's no way to access it when blocking in constructor. */
static final OneShotLatch AWAIT_LATCH = new OneShotLatch();

public InvokableBlockingInInstantiation(Environment environment)
throws InterruptedException {
super(environment);
while (true) {
synchronized (this) {
AWAIT_LATCH.trigger();
wait();
}
}
}

@Override
public void invoke() {}

static void await() throws InterruptedException {
AWAIT_LATCH.await();
}
}

// ------------------------------------------------------------------------
// test exceptions
// ------------------------------------------------------------------------
Expand Down