Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSSIDECAR-210: Fix PeriodicTaskExecutor double execution due to race from reschedule #192

Merged
merged 6 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.0.0
-----
* Fix PeriodicTaskExecutor double execution due to race from reschedule (CASSSIDECAR-210)
* Upgrade Netty to 4.1.118.Final and Vert.x to 4.5.13 Version (CASSSIDECAR-207)
* Fix missing field for INDEX_STATUS in GossipInfo (CASSSIDECAR-195)
* Add feature level permissions to Sidecar (CASSSIDECAR-193)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public ScheduleDecision scheduleDecision()
@Override
public DurationSpec delay()
{
// delay value is re-evaluated when rescheduling
// The delay value is evaluated on scheduling the next run
// see, org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor.executeAndScheduleNext
return hasInflightJobs()
? restoreJobConfig.jobDiscoveryActiveLoopDelay()
: restoreJobConfig.jobDiscoveryIdleLoopDelay();
Expand Down Expand Up @@ -205,7 +206,6 @@ private void executeInternal()
"inflightJobsCount={} delayMs={} jobDiscoveryRecencyDays={}",
inflightJobsCount, delay(), jobDiscoveryRecencyDays);

boolean hasInflightJobBefore = hasInflightJobs();
// reset in-flight jobs
inflightJobsCount = 0;
RunContext context = new RunContext();
Expand All @@ -232,13 +232,6 @@ private void executeInternal()
"abortedJobs={}",
inflightJobsCount, jobDiscoveryRecencyDays, context.expiredJobs, context.abortedJobs);
metrics.activeJobs.metric.setValue(inflightJobsCount);

boolean hasInflightJobsNow = hasInflightJobs();
// need to update delay time; reschedule self
if (hasInflightJobBefore != hasInflightJobsNow)
{
periodicTaskExecutor.reschedule(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.cassandra.sidecar.tasks;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,6 +29,7 @@
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
Expand All @@ -50,12 +50,12 @@
public class PeriodicTaskExecutor implements Closeable
{
private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskExecutor.class);
private static final long PLACEHOLDER_TIMER_ID = -1L; // used when the run start immediately, not scheduled via a timer
private static final long RUN_NOW_TIMER_ID = -1L; // used when the run start immediately, not scheduled via a timer
private static final long UNSCHEDULED_STATE_TIMER_ID = -2L; // used when the task is unscheduled

// keep track of the timerIds in order to cancel them when closing/unscheduling
private final Map<PeriodicTaskKey, Long> timerIds = new ConcurrentHashMap<>();
private final Map<PeriodicTaskKey, Future<Void>> activeRuns = new ConcurrentHashMap<>();
private final Set<PeriodicTaskKey> poisonPilledTasks = ConcurrentHashMap.newKeySet();
private final TaskExecutorPool internalPool;
private final ClusterLease clusterLease;

Expand All @@ -81,18 +81,35 @@ public void schedule(PeriodicTask task)
private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long delayMillis, long execCount)
{
long actualDelayMillis = delayMillis - priorExecDurationMillis;
AtomicBoolean runImmediately = new AtomicBoolean(actualDelayMillis <= 0);
timerIds.compute(key, (k, v) -> {
AtomicBoolean runNow = new AtomicBoolean(actualDelayMillis <= 0);
timerIds.compute(key, (k, tid) -> {
// The periodic task has been scheduled already. Exit early and avoid scheduling the duplication
if (v != null && execCount == 0)
if (tid != null && execCount == 0)
{
LOGGER.debug("Task is already scheduled. task='{}'", key);
runImmediately.set(false);
return v;
runNow.set(false);
return tid;
}

// Cleanup the unscheduled task from map
if (tid != null && tid == UNSCHEDULED_STATE_TIMER_ID) // at this step, execCount != 0
{
LOGGER.debug("Task is now unscheduled. task='{}' execCount={}", key, execCount);
runNow.set(false);
return null; // remove the entry from the map, since it is unscheduled
}

if (tid == null && execCount != 0)
{
LOGGER.info("The executor is closed or the task is already unscheduled. " +
"Avoid scheduling more runs." +
"tid=null task='{}' execCount={}", key, execCount);
runNow.set(false);
return null;
}

LOGGER.debug("Scheduling task {}. task='{}' execCount={}",
runImmediately.get() ? "immediately" : "in " + actualDelayMillis + " milliseconds",
runNow.get() ? "immediately" : "in " + actualDelayMillis + " milliseconds",
key, execCount);

try
Expand All @@ -106,15 +123,15 @@ private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long de

// If run immediately, do not execute within the compute block.
// Return the placeholder timer ID, and execute after exiting the compute block.
if (runImmediately.get())
if (runNow.get())
{
return PLACEHOLDER_TIMER_ID; // use the placeholder ID, since this run is not scheduled as a timer
return RUN_NOW_TIMER_ID; // use the placeholder ID, since this run is not scheduled as a timer
}
// Schedule and update the timer id
return internalPool.setTimer(delayMillis, timerId -> executeAndScheduleNext(key, execCount));
});

if (runImmediately.get())
if (runNow.get())
{
executeAndScheduleNext(key, execCount);
}
Expand All @@ -130,92 +147,99 @@ private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long de
*/
private void executeAndScheduleNext(PeriodicTaskKey key, long execCount)
{
Promise<Void> runPromise = Promise.promise();
if (activeRuns.computeIfAbsent(key, k -> runPromise.future()) != runPromise)
JeetKunDoug marked this conversation as resolved.
Show resolved Hide resolved
{
LOGGER.debug("already active. task='{}' execCount={}", key, execCount);
return;
}
long startTime = System.nanoTime();
internalPool.<Void>executeBlocking(promise -> executeInternal(promise, key, execCount), false)
.onComplete(ignored -> {
LOGGER.debug("Task run finishes. task='{}' execCount={}", key, execCount);
// schedule the next run iff the task is not killed
if (poisonPilledTasks.remove(key))
internalPool.<ScheduleDecision>executeBlocking(promise -> executeInternal(promise, key, execCount), false)
.onComplete(outcome -> {
LOGGER.debug("Task run finishes. task='{}' outcome={} execCount={}", key, outcome, execCount);
runPromise.complete(); // mark the completion, regardless of the result from last run
activeRuns.remove(key);

DurationSpec delay;
long priorExecutionDurationMillis;
if (outcome.result() == ScheduleDecision.RESCHEDULE)
{
// timerId might get populated after unschedule due to race.
// Have another attempt to clean up here.
timerIds.remove(key);
LOGGER.debug("Avoid scheduling the next run, and remove it from poisonPilledTasks. task='{}' execCount={}",
key, execCount);
return;
priorExecutionDurationMillis = 0;
delay = key.task.initialDelay();
JeetKunDoug marked this conversation as resolved.
Show resolved Hide resolved
}
long priorExecutionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
schedule(key, priorExecutionDurationMillis, key.task.delay().to(TimeUnit.MILLISECONDS), execCount + 1);
else
{
priorExecutionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
delay = key.task.delay();
}

schedule(key, priorExecutionDurationMillis, delay.to(TimeUnit.MILLISECONDS), execCount + 1);
});
}

/**
* Unschedule and close the {@link PeriodicTask} iff it has been scheduled.
*
* @param task the task to unschedule
* @return future of task unscheduling
*/
public void unschedule(PeriodicTask task)
{
unschedule(task, true);
}

/**
* Unschedule the {@link PeriodicTask} iff it has been scheduled.
*
* @param task the {@link PeriodicTask} to unschedule
* @param shouldCloseTask indicate whether {@link PeriodicTask#close()} should be called
* @return a future of unschedule
*/
private Future<Void> unschedule(PeriodicTask task, boolean shouldCloseTask)
public Future<Void> unschedule(PeriodicTask task)
{
PeriodicTaskKey key = new PeriodicTaskKey(task);
Long timerId = timerIds.remove(key);
AtomicBoolean alreadyUnscheduled = new AtomicBoolean(false);
Long timerId = timerIds.computeIfPresent(key, (k, tid) -> {
alreadyUnscheduled.set(tid == UNSCHEDULED_STATE_TIMER_ID);
if (tid > 0)
{
// Best-effort on cancelling the timer
internalPool.cancelTimer(tid);
}
return UNSCHEDULED_STATE_TIMER_ID;
});
if (timerId == null)
{
return Future.failedFuture("No such PeriodicTask: " + key);
LOGGER.debug("No such task to unschedule. task='{}'", key);
return Future.failedFuture("No such task to unschedule");
}

LOGGER.debug("Unscheduling task. task='{}' timerId={}", key, timerId);
// always insert a poison pill when unscheduling an existing task,
// and conditionally remove when the timer can be cancelled
poisonPilledTasks.add(key);
// if timer is not started, it can be cancelled
if (timerId != PLACEHOLDER_TIMER_ID && internalPool.cancelTimer(timerId))
if (alreadyUnscheduled.get())
{
poisonPilledTasks.remove(key);
LOGGER.debug("Task is already unscheduled. task='{}'", key);
return Future.failedFuture("Task is already unscheduled");
}

// The current run might have started already.
// If so, a non-null activeRun should be retrieved and a poison pill
// is placed to avoid further scheduling.
// Reschedule should only happen after the activeRun finishes.
Future<Void> unscheduleFuture = activeRuns.getOrDefault(key, Future.succeededFuture());
return shouldCloseTask
? unscheduleFuture.onComplete(ignored -> task.close())
: unscheduleFuture;
}

LOGGER.debug("Unscheduling task. task='{}' timerId={}", key, timerId);

/**
* Reschedules the provided {@code task}.
* <p>The difference from {@link #unschedule(PeriodicTask)} then {@link #schedule(PeriodicTask)}
* is that the {@link PeriodicTask} in question is not closed when unscheduling
*
* @param task the task to reschedule
*/
public void reschedule(PeriodicTask task)
{
unschedule(task, false)
.onComplete(ignored -> schedule(task));
// The current run might have started already.
// If so, close the task once the run is completed; the task entry is removed on the next schedule.
// Otherwise, the task entry should be removed here, as there are no more schedules.
boolean removeEntry = !activeRuns.containsKey(key);
return activeRuns
.getOrDefault(key, Future.succeededFuture())
.andThen(ignored -> {
try
{
task.close();
}
catch (Throwable cause)
{
// just log any error while closing and continue
LOGGER.warn("Failed to close task during unscheduling. task='{}'", key, cause);
}
if (removeEntry)
{
timerIds.remove(key);
}
});
}

@Override
public void close(Promise<Void> completion)
{
LOGGER.info("Closing...");
try
{
timerIds.values().forEach(internalPool::cancelTimer);
timerIds.keySet().forEach(key -> key.task.close());
timerIds.keySet().forEach(key -> unschedule(key.task));
timerIds.clear();
completion.complete();
}
Expand All @@ -225,46 +249,28 @@ public void close(Promise<Void> completion)
}
}

private void executeInternal(Promise<Void> promise, PeriodicTaskKey key, long execCount)
private void executeInternal(Promise<ScheduleDecision> promise, PeriodicTaskKey key, long execCount)
{
PeriodicTask periodicTask = key.task;
switch (consolidateScheduleDecision(periodicTask))
{
case SKIP:
LOGGER.trace("Skip executing task. task='{}' execCount={}", key, execCount);
promise.tryComplete();
return;

case EXECUTE:
break;

case RESCHEDULE:
default:
LOGGER.debug("Reschedule the task. task='{}' execCount={}", key, execCount);
reschedule(periodicTask);
promise.tryComplete();
return;
}

// Leverage a separate promise, taskRunPromise, to ensure the activeRun removal
// happens before the completion of the promise of executeBlocking and its subsequent calls
Promise<Void> taskRunPromise = Promise.promise();
Future<Void> futureResult = taskRunPromise.future().onComplete(res -> {
LOGGER.debug("Removing task from active runs. task='{}' execCount={}", key, execCount);
activeRuns.remove(key);
// finally complete the promise of executeBlocking
promise.handle(res);
});
try
ScheduleDecision scheduleDecision = consolidateScheduleDecision(periodicTask);
LOGGER.debug("{} task. task='{}' execCount={}", scheduleDecision, key, execCount);
if (scheduleDecision == ScheduleDecision.EXECUTE)
{
LOGGER.debug("Executing task. task='{}' execCount={}", key, execCount);
activeRuns.put(key, futureResult);
periodicTask.execute(taskRunPromise);
Promise<Void> taskRunPromise = Promise.promise();
taskRunPromise.future().onComplete(ignored -> promise.tryComplete(ScheduleDecision.EXECUTE));
try
{
periodicTask.execute(taskRunPromise);
}
catch (Throwable throwable)
{
LOGGER.warn("Periodic task failed to execute. task='{}' execCount={}", periodicTask.name(), execCount, throwable);
taskRunPromise.tryFail(throwable);
}
}
catch (Throwable throwable)
else
{
LOGGER.warn("Periodic task failed to execute. task='{}' execCount={}", periodicTask.name(), execCount, throwable);
taskRunPromise.tryFail(throwable);
promise.tryComplete(scheduleDecision);
}
}

Expand Down Expand Up @@ -336,8 +342,8 @@ Map<PeriodicTaskKey, Long> timerIds()
}

@VisibleForTesting
Set<PeriodicTaskKey> poisonPilledTasks()
Map<PeriodicTaskKey, Future<Void>> activeRuns()
{
return poisonPilledTasks;
return activeRuns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
public enum ScheduleDecision
{
/**
* Execute the upcoming run
* Execute the current run and continue to schedule the next run in {@link PeriodicTask#delay()}
*/
EXECUTE,

/**
* Skip the upcoming run, but do not change the schedule
* Skip the current run and continue to schedule the next run in {@link PeriodicTask#delay()}
*/
SKIP,

/**
* Skip the upcoming run and reschedule the {@link PeriodicTask}
* Similar to {@link SKIP}. Skip the current run, but schedule the next run in {@link PeriodicTask#initialDelay()}
*/
RESCHEDULE;
}
Loading