Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PaxosCleanupLocalCoordinator wait for transaction timeout before repa… #4001

Merged
merged 1 commit into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Fix Paxos repair interrupts running transactions (CASSANDRA-20469)
* Various fixes in constraint framework (CASSANDRA-20481)
* Add support in CAS for -= on numeric types, and fixed improper handling of empty bytes which lead to NPE (CASSANDRA-20477)
* Do not fail to start a node with materialized views after they are turned off in config (CASSANDRA-20452)
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1467,4 +1467,6 @@ public enum CQLStartTime
// 3.x Cassandra Driver has its "read" timeout set to 12 seconds, default matches this.
public DurationSpec.LongMillisecondsBound native_transport_timeout = new DurationSpec.LongMillisecondsBound("12s");
public boolean enforce_native_deadline_for_hints = false;

public boolean paxos_repair_race_wait = true;
}
20 changes: 15 additions & 5 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -105,13 +104,13 @@
import org.apache.cassandra.locator.EndpointSnitchInfo;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Locator;
import org.apache.cassandra.locator.LocationInfo;
import org.apache.cassandra.locator.InitialLocationProvider;
import org.apache.cassandra.locator.LocationInfo;
import org.apache.cassandra.locator.Locator;
import org.apache.cassandra.locator.NodeAddressConfig;
import org.apache.cassandra.locator.NodeProximity;
import org.apache.cassandra.locator.ReconnectableSnitchHelper;
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.locator.NodeProximity;
import org.apache.cassandra.locator.SnitchAdapter;
import org.apache.cassandra.security.AbstractCryptoProvider;
import org.apache.cassandra.security.EncryptionContext;
Expand All @@ -129,8 +128,8 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.ALLOCATE_TOKENS_FOR_KEYSPACE;
import static org.apache.cassandra.config.CassandraRelevantProperties.ALLOW_UNLIMITED_CONCURRENT_VALIDATIONS;
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTO_BOOTSTRAP;
import static org.apache.cassandra.config.CassandraRelevantProperties.CONFIG_LOADER;
import static org.apache.cassandra.config.CassandraRelevantProperties.CHRONICLE_ANALYTICS_DISABLE;
import static org.apache.cassandra.config.CassandraRelevantProperties.CONFIG_LOADER;
import static org.apache.cassandra.config.CassandraRelevantProperties.DISABLE_STCS_IN_L0;
import static org.apache.cassandra.config.CassandraRelevantProperties.INITIAL_TOKEN;
import static org.apache.cassandra.config.CassandraRelevantProperties.IO_NETTY_TRANSPORT_ESTIMATE_SIZE_ON_SUBMIT;
Expand Down Expand Up @@ -5574,4 +5573,15 @@ public static void setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetr
{
conf.tombstone_read_purgeable_metric_granularity = granularity;
}

public static boolean getPaxosRepairRaceWait()
{
return conf.paxos_repair_race_wait;
}

@VisibleForTesting
public static void setPaxosRepairRaceWait(boolean paxosRepairRaceWait)
{
conf.paxos_repair_race_wait = paxosRepairRaceWait;
}
}
14 changes: 13 additions & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.MetaStrategy;
import org.apache.cassandra.locator.NodeProximity;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.NodeProximity;
import org.apache.cassandra.locator.SnitchAdapter;
import org.apache.cassandra.locator.SystemReplicas;
import org.apache.cassandra.metrics.Sampler;
Expand Down Expand Up @@ -5541,4 +5541,16 @@ public void setPrioritizeSAIOverLegacyIndex(boolean value)
{
DatabaseDescriptor.setPrioritizeSAIOverLegacyIndex(value);
}

@Override
public void setPaxosRepairRaceWait(boolean paxosRepairRaceWait)
{
DatabaseDescriptor.setPaxosRepairRaceWait(paxosRepairRaceWait);
}

@Override
public boolean getPaxosRepairRaceWait()
{
return DatabaseDescriptor.getPaxosRepairRaceWait();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1356,4 +1356,8 @@ public void enableAuditLog(String loggerName, String includedKeyspaces, String e

boolean getPrioritizeSAIOverLegacyIndex();
void setPrioritizeSAIOverLegacyIndex(boolean value);

void setPaxosRepairRaceWait(boolean paxosRepairCoordinatorWait);

boolean getPaxosRepairRaceWait();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,9 +42,15 @@
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.uncommitted.UncommittedPaxosKey;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.concurrent.AsyncFuture;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.config.DatabaseDescriptor.getCasContentionTimeout;
import static org.apache.cassandra.config.DatabaseDescriptor.getWriteRpcTimeout;
import static org.apache.cassandra.service.paxos.cleanup.PaxosCleanupSession.TIMEOUT_NANOS;

public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupResponse>
Expand Down Expand Up @@ -134,16 +141,18 @@ private void scheduleKeyRepairsOrFinish()
return;
}

long txnTimeoutMicros = Math.max(getCasContentionTimeout(MICROSECONDS), getWriteRpcTimeout(MICROSECONDS));
boolean waitForCoordinator = DatabaseDescriptor.getPaxosRepairRaceWait();
while (inflight.size() < parallelism && uncommittedIter.hasNext())
repairKey(uncommittedIter.next());
repairKey(uncommittedIter.next(), txnTimeoutMicros, waitForCoordinator);

}

if (inflight.isEmpty())
finish();
}

private boolean repairKey(UncommittedPaxosKey uncommitted)
private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros, boolean waitForCoordinator)
{
logger.trace("repairing {}", uncommitted);
Preconditions.checkState(!inflight.containsKey(uncommitted.getKey()));
Expand All @@ -154,6 +163,9 @@ private boolean repairKey(UncommittedPaxosKey uncommitted)
if (consistency == null)
return false;

if (waitForCoordinator)
maybeWaitForOriginalCoordinator(uncommitted, txnTimeoutMicros);

inflight.put(uncommitted.getKey(), tableRepairs.startOrGetOrQueue(uncommitted.getKey(), uncommitted.ballot(), uncommitted.getConsistencyLevel(), table, result -> {
if (result.wasSuccessful())
onKeyFinish(uncommitted.getKey());
Expand All @@ -163,6 +175,24 @@ private boolean repairKey(UncommittedPaxosKey uncommitted)
return true;
}

/**
* Wait to repair things that are still potentially executing at the original coordinator to avoid
* causing timeouts. This should only have to happen at most a few times when the repair starts
*/
private static void maybeWaitForOriginalCoordinator(UncommittedPaxosKey uncommitted, long txnTimeoutMicros)
{
long nowMicros = MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
long ballotElapsedMicros = nowMicros - uncommitted.ballot().unixMicros();
if (ballotElapsedMicros < 0 && Math.abs(ballotElapsedMicros) > SECONDS.toMicros(1))
logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot());
if (ballotElapsedMicros < txnTimeoutMicros)
{
long sleepMicros = txnTimeoutMicros - ballotElapsedMicros;
logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}us to allow the in flight operation to finish", sleepMicros);
Uninterruptibles.sleepUninterruptibly(sleepMicros, MICROSECONDS);
}
}

private synchronized void onKeyFinish(DecoratedKey key)
{
if (!inflight.containsKey(key))
Expand Down