Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSSIDECAR-210: Fix PeriodicTaskExecutor double execution due to race from reschedule #192

Merged
merged 6 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.0.0
-----
* Fix PeriodicTaskExecutor double execution due to race from reschedule (CASSSIDECAR-210)
* Upgrade Netty to 4.1.118.Final and Vert.x to 4.5.13 Version (CASSSIDECAR-207)
* Fix missing field for INDEX_STATUS in GossipInfo (CASSSIDECAR-195)
* Add feature level permissions to Sidecar (CASSSIDECAR-193)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public ScheduleDecision scheduleDecision()
@Override
public DurationSpec delay()
{
// delay value is re-evaluated when rescheduling
// The delay value is evaluated on scheduling the next run
// see, org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor.executeAndScheduleNext
return hasInflightJobs()
? restoreJobConfig.jobDiscoveryActiveLoopDelay()
: restoreJobConfig.jobDiscoveryIdleLoopDelay();
Expand Down Expand Up @@ -205,7 +206,6 @@ private void executeInternal()
"inflightJobsCount={} delayMs={} jobDiscoveryRecencyDays={}",
inflightJobsCount, delay(), jobDiscoveryRecencyDays);

boolean hasInflightJobBefore = hasInflightJobs();
// reset in-flight jobs
inflightJobsCount = 0;
RunContext context = new RunContext();
Expand All @@ -232,13 +232,6 @@ private void executeInternal()
"abortedJobs={}",
inflightJobsCount, jobDiscoveryRecencyDays, context.expiredJobs, context.abortedJobs);
metrics.activeJobs.metric.setValue(inflightJobsCount);

boolean hasInflightJobsNow = hasInflightJobs();
// need to update delay time; reschedule self
if (hasInflightJobBefore != hasInflightJobsNow)
{
periodicTaskExecutor.reschedule(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.cassandra.sidecar.tasks;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,6 +29,7 @@
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.ClusterLease;
Expand All @@ -50,12 +50,12 @@
public class PeriodicTaskExecutor implements Closeable
{
private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskExecutor.class);
private static final long PLACEHOLDER_TIMER_ID = -1L; // used when the run start immediately, not scheduled via a timer
private static final long RUN_NOW_TIMER_ID = -1L; // used when the run start immediately, not scheduled via a timer
private static final long UNSCHEDULED_STATE_TIMER_ID = -2L; // used when the task is unscheduled

// keep track of the timerIds in order to cancel them when closing/unscheduling
private final Map<PeriodicTaskKey, Long> timerIds = new ConcurrentHashMap<>();
private final Map<PeriodicTaskKey, Future<Void>> activeRuns = new ConcurrentHashMap<>();
private final Set<PeriodicTaskKey> poisonPilledTasks = ConcurrentHashMap.newKeySet();
private final TaskExecutorPool internalPool;
private final ClusterLease clusterLease;

Expand All @@ -81,18 +81,35 @@ public void schedule(PeriodicTask task)
private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long delayMillis, long execCount)
{
long actualDelayMillis = delayMillis - priorExecDurationMillis;
AtomicBoolean runImmediately = new AtomicBoolean(actualDelayMillis <= 0);
timerIds.compute(key, (k, v) -> {
AtomicBoolean runNow = new AtomicBoolean(actualDelayMillis <= 0);
timerIds.compute(key, (k, tid) -> {
// The periodic task has been scheduled already. Exit early and avoid scheduling the duplication
if (v != null && execCount == 0)
if (tid != null && execCount == 0)
{
LOGGER.debug("Task is already scheduled. task='{}'", key);
runImmediately.set(false);
return v;
runNow.set(false);
return tid;
}

// Cleanup the unscheduled task from map
if (tid != null && tid == UNSCHEDULED_STATE_TIMER_ID) // at this step, execCount != 0
{
LOGGER.debug("Task is now unscheduled. task='{}' execCount={}", key, execCount);
runNow.set(false);
return null; // remove the entry from the map, since it is unscheduled
}

if (tid == null && execCount != 0)
{
LOGGER.info("The executor is closed or the task is already unscheduled. " +
"Avoid scheduling more runs." +
"tid=null task='{}' execCount={}", key, execCount);
runNow.set(false);
return null;
}

LOGGER.debug("Scheduling task {}. task='{}' execCount={}",
runImmediately.get() ? "immediately" : "in " + actualDelayMillis + " milliseconds",
runNow.get() ? "immediately" : "in " + actualDelayMillis + " milliseconds",
key, execCount);

try
Expand All @@ -106,15 +123,15 @@ private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long de

// If run immediately, do not execute within the compute block.
// Return the placeholder timer ID, and execute after exiting the compute block.
if (runImmediately.get())
if (runNow.get())
{
return PLACEHOLDER_TIMER_ID; // use the placeholder ID, since this run is not scheduled as a timer
return RUN_NOW_TIMER_ID; // use the placeholder ID, since this run is not scheduled as a timer
}
// Schedule and update the timer id
return internalPool.setTimer(delayMillis, timerId -> executeAndScheduleNext(key, execCount));
});

if (runImmediately.get())
if (runNow.get())
{
executeAndScheduleNext(key, execCount);
}
Expand All @@ -130,22 +147,33 @@ private void schedule(PeriodicTaskKey key, long priorExecDurationMillis, long de
*/
private void executeAndScheduleNext(PeriodicTaskKey key, long execCount)
{
Promise<Void> runPromise = Promise.promise();
if (activeRuns.computeIfAbsent(key, k -> runPromise.future()) != runPromise)
JeetKunDoug marked this conversation as resolved.
Show resolved Hide resolved
{
LOGGER.debug("already active. task='{}' execCount={}", key, execCount);
return;
}
long startTime = System.nanoTime();
internalPool.<Void>executeBlocking(promise -> executeInternal(promise, key, execCount), false)
.onComplete(ignored -> {
LOGGER.debug("Task run finishes. task='{}' execCount={}", key, execCount);
// schedule the next run iff the task is not killed
if (poisonPilledTasks.remove(key))
internalPool.<Result>executeBlocking(promise -> executeInternal(promise, key, execCount), false)
.onComplete(outcome -> {
LOGGER.debug("Task run finishes. task='{}' outcome={} execCount={}", key, outcome, execCount);
runPromise.complete(); // mark the completion, regardless of the result from last run
activeRuns.remove(key);

DurationSpec delay;
long priorExecutionDurationMillis;
if (outcome.result() == Result.RESCHEDULED)
{
priorExecutionDurationMillis = 0;
delay = key.task.initialDelay();
JeetKunDoug marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
// timerId might get populated after unschedule due to race.
// Have another attempt to clean up here.
timerIds.remove(key);
LOGGER.debug("Avoid scheduling the next run, and remove it from poisonPilledTasks. task='{}' execCount={}",
key, execCount);
return;
priorExecutionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
delay = key.task.delay();
}
long priorExecutionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
schedule(key, priorExecutionDurationMillis, key.task.delay().to(TimeUnit.MILLISECONDS), execCount + 1);

schedule(key, priorExecutionDurationMillis, delay.to(TimeUnit.MILLISECONDS), execCount + 1);
});
}

Expand All @@ -155,67 +183,62 @@ private void executeAndScheduleNext(PeriodicTaskKey key, long execCount)
* @param task the task to unschedule
*/
public void unschedule(PeriodicTask task)
{
unschedule(task, true);
}

/**
* Unschedule the {@link PeriodicTask} iff it has been scheduled.
*
* @param task the {@link PeriodicTask} to unschedule
* @param shouldCloseTask indicate whether {@link PeriodicTask#close()} should be called
* @return a future of unschedule
*/
private Future<Void> unschedule(PeriodicTask task, boolean shouldCloseTask)
{
PeriodicTaskKey key = new PeriodicTaskKey(task);
Long timerId = timerIds.remove(key);
AtomicBoolean alreadyUnscheduled = new AtomicBoolean(false);
Long timerId = timerIds.computeIfPresent(key, (k, tid) -> {
alreadyUnscheduled.set(tid == UNSCHEDULED_STATE_TIMER_ID);
if (tid > 0)
{
// Best-effort on cancelling the timer
internalPool.cancelTimer(tid);
}
return UNSCHEDULED_STATE_TIMER_ID;
});
if (timerId == null)
{
return Future.failedFuture("No such PeriodicTask: " + key);
LOGGER.debug("No such task. task='{}'", key);
return;
}

LOGGER.debug("Unscheduling task. task='{}' timerId={}", key, timerId);
// always insert a poison pill when unscheduling an existing task,
// and conditionally remove when the timer can be cancelled
poisonPilledTasks.add(key);
// if timer is not started, it can be cancelled
if (timerId != PLACEHOLDER_TIMER_ID && internalPool.cancelTimer(timerId))
if (alreadyUnscheduled.get())
{
poisonPilledTasks.remove(key);
LOGGER.debug("Task is already unscheduled. task='{}'", key);
return;
}

// The current run might have started already.
// If so, a non-null activeRun should be retrieved and a poison pill
// is placed to avoid further scheduling.
// Reschedule should only happen after the activeRun finishes.
Future<Void> unscheduleFuture = activeRuns.getOrDefault(key, Future.succeededFuture());
return shouldCloseTask
? unscheduleFuture.onComplete(ignored -> task.close())
: unscheduleFuture;
}

LOGGER.debug("Unscheduling task. task='{}' timerId={}", key, timerId);

/**
* Reschedules the provided {@code task}.
* <p>The difference from {@link #unschedule(PeriodicTask)} then {@link #schedule(PeriodicTask)}
* is that the {@link PeriodicTask} in question is not closed when unscheduling
*
* @param task the task to reschedule
*/
public void reschedule(PeriodicTask task)
{
unschedule(task, false)
.onComplete(ignored -> schedule(task));
// The current run might have started already.
// If so, close the task once the run is completed; the task entry is removed on the next schedule.
// Otherwise, the task entry should be removed here, as there are no more schedules.
boolean removeEntry = !activeRuns.containsKey(key);
activeRuns
.getOrDefault(key, Future.succeededFuture())
.andThen(ignored -> {
try
{
task.close();
}
catch (Throwable cause)
{
// just log any error while closing and continue
LOGGER.warn("Failed to close task during unscheduling. task='{}'", key, cause);
}
if (removeEntry)
{
timerIds.remove(key);
}
});
}

@Override
public void close(Promise<Void> completion)
{
LOGGER.info("Closing...");
try
{
timerIds.values().forEach(internalPool::cancelTimer);
timerIds.keySet().forEach(key -> key.task.close());
timerIds.keySet().forEach(key -> unschedule(key.task));
timerIds.clear();
completion.complete();
}
Expand All @@ -225,7 +248,7 @@ public void close(Promise<Void> completion)
}
}

private void executeInternal(Promise<Void> promise, PeriodicTaskKey key, long execCount)
private void executeInternal(Promise<Result> promise, PeriodicTaskKey key, long execCount)
{
PeriodicTask periodicTask = key.task;
switch (consolidateScheduleDecision(periodicTask))
Expand All @@ -240,31 +263,22 @@ private void executeInternal(Promise<Void> promise, PeriodicTaskKey key, long ex

case RESCHEDULE:
default:
LOGGER.debug("Reschedule the task. task='{}' execCount={}", key, execCount);
reschedule(periodicTask);
promise.tryComplete();
LOGGER.debug("Rescheduling the task. task='{}' execCount={}", key, execCount);
promise.tryComplete(Result.RESCHEDULED);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix. Do not reschedule, but just signal the decision and update the delay when scheduling a new run.

return;
}

// Leverage a separate promise, taskRunPromise, to ensure the activeRun removal
// happens before the completion of the promise of executeBlocking and its subsequent calls
Promise<Void> taskRunPromise = Promise.promise();
Future<Void> futureResult = taskRunPromise.future().onComplete(res -> {
LOGGER.debug("Removing task from active runs. task='{}' execCount={}", key, execCount);
activeRuns.remove(key);
// finally complete the promise of executeBlocking
promise.handle(res);
});
taskRunPromise.future().onSuccess(ignored -> promise.tryComplete());
try
{
LOGGER.debug("Executing task. task='{}' execCount={}", key, execCount);
activeRuns.put(key, futureResult);
periodicTask.execute(taskRunPromise);
}
catch (Throwable throwable)
{
LOGGER.warn("Periodic task failed to execute. task='{}' execCount={}", periodicTask.name(), execCount, throwable);
taskRunPromise.tryFail(throwable);
promise.tryFail(throwable);
}
}

Expand Down Expand Up @@ -329,15 +343,18 @@ public String toString()
}
}

@VisibleForTesting
Map<PeriodicTaskKey, Long> timerIds()
enum Result
{
return timerIds;
/**
* Tells the run completion handler whether task is rescheduled.
* If it is rescheduled, the delay is adjusted to the initial delay of the task
*/
RESCHEDULED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a Result that doesn't mean RESCHEDULED, like COMPLETED? The fact that we callPromise#tryComplete with either RESCHEDULED or nothing at all (essentially, null) seems really odd, even if we never really check for COMPLETED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COMPLETED is unnecessary. When the handler at onComplete is invoked, we know that it is completed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably rename the enum. It is not really the result of execution. Looking for suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - the biggest thing to me was that this enum is used as the type of an AsyncResult and we set the result to RESCHEDULED or nothing at all, which was just jarring when I was looking at the code.

What do you think of actually using the ScheduleDecision enum here, and just getting rid of the Result all together? Then each branch of the executeInternal method would actually be able to indicate which decision was made - we end up only using RESCHEDULE right now but at least there isn't an odd mix of promise.tryComplete() and promise.tryComplete(Result.RESCHEDULE) which didn't make a lot of sense to me - if the promise is a Promise<Result> it should really be completed with something other than null... using Promise<ScheduleDecision> makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. ScheduleDecision makes better sense.

}

@VisibleForTesting
Set<PeriodicTaskKey> poisonPilledTasks()
Map<PeriodicTaskKey, Long> timerIds()
{
return poisonPilledTasks;
return timerIds;
}
}
Loading