Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/main/java/redis/clients/jedis/MultiClusterClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public static interface StrategySupplier {
private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT = Arrays
.asList(CallNotPermittedException.class, ConnectionFailoverException.class);

private static final long FAILBACK_CHECK_INTERVAL_DEFAULT = 5000; // 5 seconds
private static final long GRACE_PERIOD_DEFAULT = 10000; // 10 seconds

private final ClusterConfig[] clusterConfigs;

//////////// Retry Config - https://resilience4j.readme.io/docs/retry ////////////
Expand Down Expand Up @@ -152,6 +155,12 @@ public static interface StrategySupplier {
/** Whether failback is supported by client */
private boolean isFailbackSupported;

/** Interval in milliseconds to wait before attempting failback to a recovered cluster */
private long failbackCheckInterval;

/** Grace period in milliseconds to keep clusters disabled after they become unhealthy */
private long gracePeriod;

public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
this.clusterConfigs = clusterConfigs;
}
Expand Down Expand Up @@ -225,6 +234,14 @@ public boolean isFailbackSupported() {
return isFailbackSupported;
}

public long getFailbackCheckInterval() {
return failbackCheckInterval;
}

public long getGracePeriod() {
return gracePeriod;
}

public static Builder builder(ClusterConfig[] clusterConfigs) {
return new Builder(clusterConfigs);
}
Expand Down Expand Up @@ -358,6 +375,8 @@ public static class Builder {

private boolean retryOnFailover = false;
private boolean isFailbackSupported = true;
private long failbackCheckInterval = FAILBACK_CHECK_INTERVAL_DEFAULT;
private long gracePeriod = GRACE_PERIOD_DEFAULT;

public Builder(ClusterConfig[] clusterConfigs) {

Expand Down Expand Up @@ -461,6 +480,16 @@ public Builder failbackSupported(boolean supported) {
return this;
}

public Builder failbackCheckInterval(long failbackCheckInterval) {
this.failbackCheckInterval = failbackCheckInterval;
return this;
}

public Builder gracePeriod(long gracePeriod) {
this.gracePeriod = gracePeriod;
return this;
}

public MultiClusterClientConfig build() {
MultiClusterClientConfig config = new MultiClusterClientConfig(this.clusterConfigs);

Expand Down Expand Up @@ -488,6 +517,8 @@ public MultiClusterClientConfig build() {

config.retryOnFailover = this.retryOnFailover;
config.isFailbackSupported = this.isFailbackSupported;
config.failbackCheckInterval = this.failbackCheckInterval;
config.gracePeriod = this.gracePeriod;

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
import redis.clients.jedis.util.IOUtils;

/**
Expand Down Expand Up @@ -46,16 +47,24 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
// Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and
// event publishing.
// To recover/transition from this forced state the user will need to manually failback

Cluster activeCluster = provider.getCluster();
// This should never happen in theory !!
if (activeCluster.getCircuitBreaker() != circuitBreaker) throw new IllegalStateException(
"A circuitbreaker failover can be triggered only by the active cluster!");

activeCluster.setGracePeriod();
circuitBreaker.transitionToForcedOpenState();

// Iterating the active cluster will allow subsequent calls to the executeCommand() to use the next
// cluster's connection pool - according to the configuration's prioritization/order/weight
// int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex1();
provider.iterateActiveCluster();
if (provider.iterateActiveCluster() != null) {

// Implementation is optionally provided during configuration. Typically, used for
// activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(provider.getCluster());
// Implementation is optionally provided during configuration. Typically, used for
// activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(provider.getCluster());
}
}
// this check relies on the fact that many failover attempts can hit with the same CB,
// only the first one will trigger a failover, and make the CB FORCED_OPEN.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -82,6 +86,12 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider

private HealthStatusManager healthStatusManager = new HealthStatusManager();

// Failback mechanism fields
private final ScheduledExecutorService failbackScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "failback-scheduler");
t.setDaemon(true);
return t;
});
// Store retry and circuit breaker configs for dynamic cluster addition/removal
private RetryConfig retryConfig;
private CircuitBreakerConfig circuitBreakerConfig;
Expand Down Expand Up @@ -151,6 +161,13 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste
/// --- ///

this.fallbackExceptionList = multiClusterClientConfig.getFallbackExceptionList();

// Start periodic failback checker
if (multiClusterClientConfig.isFailbackSupported()) {
long failbackInterval = multiClusterClientConfig.getFailbackCheckInterval();
failbackScheduler.scheduleAtFixedRate(this::periodicFailbackCheck, failbackInterval, failbackInterval,
TimeUnit.MILLISECONDS);
}
}

/**
Expand Down Expand Up @@ -194,6 +211,7 @@ public void remove(Endpoint endpoint) {
if (multiClusterMap.size() < 2) {
throw new JedisValidationException("Cannot remove the last remaining endpoint");
}
log.debug("Removing endpoint {}", endpoint);

activeClusterIndexLock.lock();
try {
Expand Down Expand Up @@ -251,7 +269,6 @@ private void addClusterInternal(MultiClusterClientConfig multiClusterClientConfi
circuitBreakerEventPublisher.onError(event -> log.error(String.valueOf(event)));
circuitBreakerEventPublisher.onFailureRateExceeded(event -> log.error(String.valueOf(event)));
circuitBreakerEventPublisher.onSlowCallRateExceeded(event -> log.error(String.valueOf(event)));
circuitBreakerEventPublisher.onStateTransition(event -> log.warn(String.valueOf(event)));

ConnectionPool pool;
if (poolConfig != null) {
Expand Down Expand Up @@ -281,20 +298,51 @@ private void handleStatusChange(HealthStatusChangeEvent eventArgs) {

clusterWithHealthChange.setHealthStatus(newStatus);

if (newStatus.isHealthy()) {
if (clusterWithHealthChange.isFailbackSupported() && activeCluster != clusterWithHealthChange) {
// lets check if weighted switching is possible
Map.Entry<Endpoint, Cluster> failbackCluster = findWeightedHealthyClusterToIterate();
if (failbackCluster == clusterWithHealthChange
&& clusterWithHealthChange.getWeight() > activeCluster.getWeight()) {
setActiveCluster(clusterWithHealthChange, false);
if (!newStatus.isHealthy()) {
// Handle failover if this was the active cluster
if (clusterWithHealthChange == activeCluster) {
clusterWithHealthChange.setGracePeriod();
if (iterateActiveCluster() != null) {
this.runClusterFailoverPostProcessor(activeCluster);
}
}
} else if (clusterWithHealthChange == activeCluster) {
if (iterateActiveCluster() != null) {
this.runClusterFailoverPostProcessor(activeCluster);
}
}

/**
* Periodic failback checker - runs at configured intervals to check for failback opportunities
*/
private void periodicFailbackCheck() {
// Find the best candidate cluster for failback
Cluster bestCandidate = null;
float bestWeight = activeCluster.getWeight();

for (Map.Entry<Endpoint, Cluster> entry : multiClusterMap.entrySet()) {
Cluster cluster = entry.getValue();

// Skip if this is already the active cluster
if (cluster == activeCluster) {
continue;
}

// Skip if cluster is not healthy
if (!cluster.isHealthy()) {
continue;
}

// This cluster is a valid candidate
if (cluster.getWeight() > bestWeight) {
bestCandidate = cluster;
bestWeight = cluster.getWeight();
}
}

// Perform failback if we found a better candidate
if (bestCandidate != null) {
log.info("Performing failback from {} to {} (higher weight cluster available)",
activeCluster.getCircuitBreaker().getName(), bestCandidate.getCircuitBreaker().getName());
setActiveCluster(bestCandidate, true);
}
}

public Endpoint iterateActiveCluster() {
Expand Down Expand Up @@ -397,7 +445,21 @@ private boolean setActiveCluster(Cluster cluster, boolean validateConnection) {

@Override
public void close() {
activeCluster.getConnectionPool().close();
// Shutdown the failback scheduler
failbackScheduler.shutdown();
try {
if (!failbackScheduler.awaitTermination(1, TimeUnit.SECONDS)) {
failbackScheduler.shutdownNow();
}
} catch (InterruptedException e) {
failbackScheduler.shutdownNow();
Thread.currentThread().interrupt();
}

// Close all cluster connection pools
for (Cluster cluster : multiClusterMap.values()) {
cluster.getConnectionPool().close();
}
}

@Override
Expand Down Expand Up @@ -425,26 +487,21 @@ public Cluster getCluster() {
}

@VisibleForTesting
public Cluster getCluster(Endpoint multiClusterIndex) {
return multiClusterMap.get(multiClusterIndex);
public Cluster getCluster(Endpoint endpoint) {
return multiClusterMap.get(endpoint);
}

public CircuitBreaker getClusterCircuitBreaker() {
return activeCluster.getCircuitBreaker();
}

public CircuitBreaker getClusterCircuitBreaker(int multiClusterIndex) {
return activeCluster.getCircuitBreaker();
}

/**
* Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list provided at
* startup via the MultiClusterClientConfig, is unavailable and therefore no further failover is possible. Users can
* manually failback to an available cluster
*/
public boolean canIterateOnceMore() {
Map.Entry<Endpoint, Cluster> e = findWeightedHealthyClusterToIterate();

return e != null;
}

Expand Down Expand Up @@ -472,6 +529,9 @@ public static class Cluster {
private MultiClusterClientConfig multiClusterClientConfig;
private boolean disabled = false;

// Grace period tracking
private volatile long gracePeriodEndsAt = 0;

public Cluster(ConnectionPool connectionPool, Retry retry, CircuitBreaker circuitBreaker, float weight,
MultiClusterClientConfig multiClusterClientConfig) {
this.connectionPool = connectionPool;
Expand Down Expand Up @@ -513,11 +573,14 @@ public float getWeight() {
}

public boolean isCBForcedOpen() {
if (circuitBreaker.getState() == State.FORCED_OPEN && !isInGracePeriod()) {
circuitBreaker.transitionToClosedState();
}
return circuitBreaker.getState() == CircuitBreaker.State.FORCED_OPEN;
}

public boolean isHealthy() {
return healthStatus.isHealthy() && !isCBForcedOpen() && !disabled;
return healthStatus.isHealthy() && !isCBForcedOpen() && !disabled && !isInGracePeriod();
}

public boolean retryOnFailover() {
Expand All @@ -532,6 +595,20 @@ public void setDisabled(boolean disabled) {
this.disabled = disabled;
}

/**
* Checks if the cluster is currently in grace period
*/
public boolean isInGracePeriod() {
return System.currentTimeMillis() < gracePeriodEndsAt;
}

/**
* Sets the grace period for this cluster
*/
public void setGracePeriod() {
gracePeriodEndsAt = System.currentTimeMillis() + multiClusterClientConfig.getGracePeriod();
}

/**
* Whether failback is supported by client
*/
Expand Down
Loading
Loading