Skip to content

Commit

Permalink
CASSSIDECAR-210: Fix PeriodicTaskExecutor double execution due to rac…
Browse files Browse the repository at this point in the history
…e from reschedule (#192)

Patch by Yifan Cai; Reviewed by Doug Rohrer, Francisco Guerrero for CASSSIDECAR-210
  • Loading branch information
yifan-c authored Feb 14, 2025
1 parent d5d8f9c commit 0712bf6
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 216 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.0.0
-----
* Fix PeriodicTaskExecutor double execution due to race from reschedule (CASSSIDECAR-210)
* Upgrade Netty to 4.1.118.Final and Vert.x to 4.5.13 Version (CASSSIDECAR-207)
* Fix missing field for INDEX_STATUS in GossipInfo (CASSSIDECAR-195)
* Add feature level permissions to Sidecar (CASSSIDECAR-193)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public ScheduleDecision scheduleDecision()
@Override
public DurationSpec delay()
{
// delay value is re-evaluated when rescheduling
// The delay value is evaluated on scheduling the next run
// see, org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor.executeAndScheduleNext
return hasInflightJobs()
? restoreJobConfig.jobDiscoveryActiveLoopDelay()
: restoreJobConfig.jobDiscoveryIdleLoopDelay();
Expand Down Expand Up @@ -205,7 +206,6 @@ private void executeInternal()
"inflightJobsCount={} delayMs={} jobDiscoveryRecencyDays={}",
inflightJobsCount, delay(), jobDiscoveryRecencyDays);

boolean hasInflightJobBefore = hasInflightJobs();
// reset in-flight jobs
inflightJobsCount = 0;
RunContext context = new RunContext();
Expand All @@ -232,13 +232,6 @@ private void executeInternal()
"abortedJobs={}",
inflightJobsCount, jobDiscoveryRecencyDays, context.expiredJobs, context.abortedJobs);
metrics.activeJobs.metric.setValue(inflightJobsCount);

boolean hasInflightJobsNow = hasInflightJobs();
// need to update delay time; reschedule self
if (hasInflightJobBefore != hasInflightJobsNow)
{
periodicTaskExecutor.reschedule(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.cassandra.sidecar.tasks;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,6 +29,7 @@
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
Expand All @@ -50,12 +50,12 @@
public class PeriodicTaskExecutor implements Closeable
{
private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskExecutor.class);
private static final long PLACEHOLDER_TIMER_ID = -1L; // used when the run start immediately, not scheduled via a timer
private static final long RUN_NOW_TIMER_ID = -1L; // used when the run start immediately, not scheduled via a timer
private static final long UNSCHEDULED_STATE_TIMER_ID = -2L; // used when the task is unscheduled

// keep track of the timerIds in order to cancel them when closing/unscheduling
private final Map<PeriodicTaskKey, Long> timerIds = new ConcurrentHashMap<>();
private final Map<PeriodicTaskKey, Future<Void>> activeRuns = new ConcurrentHashMap<>();
private final Set<PeriodicTaskKey> poisonPilledTasks = ConcurrentHashMap.newKeySet();
private final TaskExecutorPool internalPool;
private final ClusterLease clusterLease;

Expand All @@ -81,18 +81,35 @@ public void schedule(PeriodicTask task)
private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long delayMillis, long execCount)
{
long actualDelayMillis = delayMillis - priorExecDurationMillis;
AtomicBoolean runImmediately = new AtomicBoolean(actualDelayMillis <= 0);
timerIds.compute(key, (k, v) -> {
AtomicBoolean runNow = new AtomicBoolean(actualDelayMillis <= 0);
timerIds.compute(key, (k, tid) -> {
// The periodic task has been scheduled already. Exit early and avoid scheduling the duplication
if (v != null && execCount == 0)
if (tid != null && execCount == 0)
{
LOGGER.debug("Task is already scheduled. task='{}'", key);
runImmediately.set(false);
return v;
runNow.set(false);
return tid;
}

// Cleanup the unscheduled task from map
if (tid != null && tid == UNSCHEDULED_STATE_TIMER_ID) // at this step, execCount != 0
{
LOGGER.debug("Task is now unscheduled. task='{}' execCount={}", key, execCount);
runNow.set(false);
return null; // remove the entry from the map, since it is unscheduled
}

if (tid == null && execCount != 0)
{
LOGGER.info("The executor is closed or the task is already unscheduled. " +
"Avoid scheduling more runs." +
"tid=null task='{}' execCount={}", key, execCount);
runNow.set(false);
return null;
}

LOGGER.debug("Scheduling task {}. task='{}' execCount={}",
runImmediately.get() ? "immediately" : "in " + actualDelayMillis + " milliseconds",
runNow.get() ? "immediately" : "in " + actualDelayMillis + " milliseconds",
key, execCount);

try
Expand All @@ -106,15 +123,15 @@ private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long de

// If run immediately, do not execute within the compute block.
// Return the placeholder timer ID, and execute after exiting the compute block.
if (runImmediately.get())
if (runNow.get())
{
return PLACEHOLDER_TIMER_ID; // use the placeholder ID, since this run is not scheduled as a timer
return RUN_NOW_TIMER_ID; // use the placeholder ID, since this run is not scheduled as a timer
}
// Schedule and update the timer id
return internalPool.setTimer(delayMillis, timerId -> executeAndScheduleNext(key, execCount));
});

if (runImmediately.get())
if (runNow.get())
{
executeAndScheduleNext(key, execCount);
}
Expand All @@ -130,92 +147,99 @@ private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long de
*/
private void executeAndScheduleNext(PeriodicTaskKey key, long execCount)
{
Promise<Void> runPromise = Promise.promise();
if (activeRuns.computeIfAbsent(key, k -> runPromise.future()) != runPromise)
{
LOGGER.debug("already active. task='{}' execCount={}", key, execCount);
return;
}
long startTime = System.nanoTime();
internalPool.<Void>executeBlocking(promise -> executeInternal(promise, key, execCount), false)
.onComplete(ignored -> {
LOGGER.debug("Task run finishes. task='{}' execCount={}", key, execCount);
// schedule the next run iff the task is not killed
if (poisonPilledTasks.remove(key))
internalPool.<ScheduleDecision>executeBlocking(promise -> executeInternal(promise, key, execCount), false)
.onComplete(outcome -> {
LOGGER.debug("Task run finishes. task='{}' outcome={} execCount={}", key, outcome, execCount);
runPromise.complete(); // mark the completion, regardless of the result from last run
activeRuns.remove(key);

DurationSpec delay;
long priorExecutionDurationMillis;
if (outcome.result() == ScheduleDecision.RESCHEDULE)
{
// timerId might get populated after unschedule due to race.
// Have another attempt to clean up here.
timerIds.remove(key);
LOGGER.debug("Avoid scheduling the next run, and remove it from poisonPilledTasks. task='{}' execCount={}",
key, execCount);
return;
priorExecutionDurationMillis = 0;
delay = key.task.initialDelay();
}
long priorExecutionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
schedule(key, priorExecutionDurationMillis, key.task.delay().to(TimeUnit.MILLISECONDS), execCount + 1);
else
{
priorExecutionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
delay = key.task.delay();
}

schedule(key, priorExecutionDurationMillis, delay.to(TimeUnit.MILLISECONDS), execCount + 1);
});
}

/**
* Unschedule and close the {@link PeriodicTask} iff it has been scheduled.
*
* @param task the task to unschedule
* @return future of task unscheduling
*/
public void unschedule(PeriodicTask task)
{
unschedule(task, true);
}

/**
* Unschedule the {@link PeriodicTask} iff it has been scheduled.
*
* @param task the {@link PeriodicTask} to unschedule
* @param shouldCloseTask indicate whether {@link PeriodicTask#close()} should be called
* @return a future of unschedule
*/
private Future<Void> unschedule(PeriodicTask task, boolean shouldCloseTask)
public Future<Void> unschedule(PeriodicTask task)
{
PeriodicTaskKey key = new PeriodicTaskKey(task);
Long timerId = timerIds.remove(key);
AtomicBoolean alreadyUnscheduled = new AtomicBoolean(false);
Long timerId = timerIds.computeIfPresent(key, (k, tid) -> {
alreadyUnscheduled.set(tid == UNSCHEDULED_STATE_TIMER_ID);
if (tid > 0)
{
// Best-effort on cancelling the timer
internalPool.cancelTimer(tid);
}
return UNSCHEDULED_STATE_TIMER_ID;
});
if (timerId == null)
{
return Future.failedFuture("No such PeriodicTask: " + key);
LOGGER.debug("No such task to unschedule. task='{}'", key);
return Future.failedFuture("No such task to unschedule");
}

LOGGER.debug("Unscheduling task. task='{}' timerId={}", key, timerId);
// always insert a poison pill when unscheduling an existing task,
// and conditionally remove when the timer can be cancelled
poisonPilledTasks.add(key);
// if timer is not started, it can be cancelled
if (timerId != PLACEHOLDER_TIMER_ID && internalPool.cancelTimer(timerId))
if (alreadyUnscheduled.get())
{
poisonPilledTasks.remove(key);
LOGGER.debug("Task is already unscheduled. task='{}'", key);
return Future.failedFuture("Task is already unscheduled");
}

// The current run might have started already.
// If so, a non-null activeRun should be retrieved and a poison pill
// is placed to avoid further scheduling.
// Reschedule should only happen after the activeRun finishes.
Future<Void> unscheduleFuture = activeRuns.getOrDefault(key, Future.succeededFuture());
return shouldCloseTask
? unscheduleFuture.onComplete(ignored -> task.close())
: unscheduleFuture;
}

LOGGER.debug("Unscheduling task. task='{}' timerId={}", key, timerId);

/**
* Reschedules the provided {@code task}.
* <p>The difference from {@link #unschedule(PeriodicTask)} then {@link #schedule(PeriodicTask)}
* is that the {@link PeriodicTask} in question is not closed when unscheduling
*
* @param task the task to reschedule
*/
public void reschedule(PeriodicTask task)
{
unschedule(task, false)
.onComplete(ignored -> schedule(task));
// The current run might have started already.
// If so, close the task once the run is completed; the task entry is removed on the next schedule.
// Otherwise, the task entry should be removed here, as there are no more schedules.
boolean removeEntry = !activeRuns.containsKey(key);
return activeRuns
.getOrDefault(key, Future.succeededFuture())
.andThen(ignored -> {
try
{
task.close();
}
catch (Throwable cause)
{
// just log any error while closing and continue
LOGGER.warn("Failed to close task during unscheduling. task='{}'", key, cause);
}
if (removeEntry)
{
timerIds.remove(key);
}
});
}

@Override
public void close(Promise<Void> completion)
{
LOGGER.info("Closing...");
try
{
timerIds.values().forEach(internalPool::cancelTimer);
timerIds.keySet().forEach(key -> key.task.close());
timerIds.keySet().forEach(key -> unschedule(key.task));
timerIds.clear();
completion.complete();
}
Expand All @@ -225,46 +249,28 @@ public void close(Promise<Void> completion)
}
}

private void executeInternal(Promise<Void> promise, PeriodicTaskKey key, long execCount)
private void executeInternal(Promise<ScheduleDecision> promise, PeriodicTaskKey key, long execCount)
{
PeriodicTask periodicTask = key.task;
switch (consolidateScheduleDecision(periodicTask))
{
case SKIP:
LOGGER.trace("Skip executing task. task='{}' execCount={}", key, execCount);
promise.tryComplete();
return;

case EXECUTE:
break;

case RESCHEDULE:
default:
LOGGER.debug("Reschedule the task. task='{}' execCount={}", key, execCount);
reschedule(periodicTask);
promise.tryComplete();
return;
}

// Leverage a separate promise, taskRunPromise, to ensure the activeRun removal
// happens before the completion of the promise of executeBlocking and its subsequent calls
Promise<Void> taskRunPromise = Promise.promise();
Future<Void> futureResult = taskRunPromise.future().onComplete(res -> {
LOGGER.debug("Removing task from active runs. task='{}' execCount={}", key, execCount);
activeRuns.remove(key);
// finally complete the promise of executeBlocking
promise.handle(res);
});
try
ScheduleDecision scheduleDecision = consolidateScheduleDecision(periodicTask);
LOGGER.debug("{} task. task='{}' execCount={}", scheduleDecision, key, execCount);
if (scheduleDecision == ScheduleDecision.EXECUTE)
{
LOGGER.debug("Executing task. task='{}' execCount={}", key, execCount);
activeRuns.put(key, futureResult);
periodicTask.execute(taskRunPromise);
Promise<Void> taskRunPromise = Promise.promise();
taskRunPromise.future().onComplete(ignored -> promise.tryComplete(ScheduleDecision.EXECUTE));
try
{
periodicTask.execute(taskRunPromise);
}
catch (Throwable throwable)
{
LOGGER.warn("Periodic task failed to execute. task='{}' execCount={}", periodicTask.name(), execCount, throwable);
taskRunPromise.tryFail(throwable);
}
}
catch (Throwable throwable)
else
{
LOGGER.warn("Periodic task failed to execute. task='{}' execCount={}", periodicTask.name(), execCount, throwable);
taskRunPromise.tryFail(throwable);
promise.tryComplete(scheduleDecision);
}
}

Expand Down Expand Up @@ -336,8 +342,8 @@ Map<PeriodicTaskKey, Long> timerIds()
}

@VisibleForTesting
Set<PeriodicTaskKey> poisonPilledTasks()
Map<PeriodicTaskKey, Future<Void>> activeRuns()
{
return poisonPilledTasks;
return activeRuns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
public enum ScheduleDecision
{
/**
* Execute the upcoming run
* Execute the current run and continue to schedule the next run in {@link PeriodicTask#delay()}
*/
EXECUTE,

/**
* Skip the upcoming run, but do not change the schedule
* Skip the current run and continue to schedule the next run in {@link PeriodicTask#delay()}
*/
SKIP,

/**
* Skip the upcoming run and reschedule the {@link PeriodicTask}
* Similar to {@link SKIP}. Skip the current run, but schedule the next run in {@link PeriodicTask#initialDelay()}
*/
RESCHEDULE;
}
Loading

0 comments on commit 0712bf6

Please sign in to comment.