Skip to content

Commit

Permalink
Refactor and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
bbotella committed Feb 12, 2025
1 parent 6a72d73 commit 50d464d
Show file tree
Hide file tree
Showing 16 changed files with 361 additions and 129 deletions.
13 changes: 13 additions & 0 deletions conf/sidecar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ sidecar_peer_health:
health_check_retries: 5
health_check_retry_delay: 10s

sidecar_client:
use_ssl: true
request_timeout: 1s
request_idle_timeout: 1s
connection_pool_max_size: 10
connection_pool_clearing_period: 10s
connection_pool_event_loop_size: 10
connection_pool_max_wait_queue_size: 10
max_retries: 3
retry_delay: 1s
max_retry_delay: 2s


metrics:
registry_name: cassandra_sidecar
vertx:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ public TestSidecarHostInfo(IInstance instance, Server sidecarServer, int port)
this.port = port;
}

public IInstance getInstance()
{
return instance;
}

public Server getServer()
{
return sidecarServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.cassandra.sidecar.common.server.JmxClient;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.server.utils.DriverUtils;
import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider;
import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
Expand All @@ -84,16 +86,21 @@
import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration;
import org.apache.cassandra.sidecar.config.SslConfiguration;
import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
import org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider;
import org.apache.cassandra.sidecar.coordination.SidecarHttpHealthProvider;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics;
import org.apache.cassandra.sidecar.server.MainModule;
Expand Down Expand Up @@ -163,6 +170,8 @@ public abstract class SharedClusterIntegrationTestBase
protected MtlsTestHelper mtlsTestHelper;
protected IsolatedDTestClassLoaderWrapper classLoaderWrapper;
protected Injector sidecarServerInjector;
protected HashMap<Server, String> serverDeploymentIds;
protected HashMap<Server, SidecarPeerHealthMonitorTask> peerHealthMonitors;

static
{
Expand All @@ -181,6 +190,9 @@ protected void setup() throws Exception
classLoaderWrapper = new IsolatedDTestClassLoaderWrapper();
classLoaderWrapper.initializeDTestJarClassLoader(testVersion, TestVersion.class);

serverDeploymentIds = new HashMap<>();
peerHealthMonitors = new HashMap<>();

beforeClusterProvisioning();
cluster = provisionClusterWithRetries(this.testVersion);
assertThat(cluster).isNotNull();
Expand Down Expand Up @@ -372,17 +384,22 @@ protected Server startSidecarWithInstances(Iterable<? extends IInstance> instanc
{
sidecarServerInjector = Guice.createInjector(Modules.override(new MainModule()).with(Modules.override(testModule).with(customModule)));
}
else {
else
{
sidecarServerInjector = Guice.createInjector(Modules.override(new MainModule()).with(testModule));
}


Server sidecarServer = sidecarServerInjector.getInstance(Server.class);
SidecarPeerHealthMonitorTask peerHealthMonitorTask = sidecarServerInjector.getInstance(SidecarPeerHealthMonitorTask.class);
sidecarServer.start()
.onSuccess(s -> context.completeNow())
.onSuccess(deploymentId -> {
serverDeploymentIds.put(sidecarServer, deploymentId);
peerHealthMonitors.put(sidecarServer, peerHealthMonitorTask);
context.completeNow();
})
.onFailure(context::failNow);

context.awaitCompletion(5, TimeUnit.SECONDS);
assertThat(context.awaitCompletion(5, TimeUnit.SECONDS)).isTrue();
return sidecarServer;
}

Expand Down Expand Up @@ -661,10 +678,13 @@ public SidecarConfiguration sidecarConfiguration(SidecarPeerHealthConfiguration
5242880, DEFAULT_API_CALL_TIMEOUT,
buildTestS3ProxyConfig());

SidecarClientConfiguration sidecarClientConfiguration = new SidecarClientConfigurationImpl();

SidecarConfigurationImpl.Builder builder = SidecarConfigurationImpl.builder()
.serviceConfiguration(conf)
.sslConfiguration(sslConfiguration)
.s3ClientConfiguration(s3ClientConfig)
.sidecarClientConfiguration(sidecarClientConfiguration)
.sidecarPeerHealthConfiguration(sidecarPeerHealthConfiguration);
if (configurationOverrides != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
Expand All @@ -14,6 +12,8 @@
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider;
import org.apache.cassandra.sidecar.server.Server;
import org.apache.cassandra.sidecar.testing.InnerDcTokenAdjacentPeerTestProvider.TestSidecarHostInfo;
import org.apache.cassandra.sidecar.testing.QualifiedName;
Expand All @@ -30,9 +30,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class SidecarPeerDownDetectorIntegrationTest extends SharedClusterSidecarIntegrationTestBase
Expand All @@ -53,13 +51,13 @@ protected void startSidecar(ICluster<? extends IInstance> cluster) throws Interr
{
Supplier<List<TestSidecarHostInfo>> supplier = () -> sidecarServerList;
PeersModule peersModule = new PeersModule(supplier);
for (IInstance instance : cluster)
for (IInstance cassandraInstance : cluster)
{
// Provider de una lista de Sidecar servers
LOGGER.info("Starting Sidecar instance for Cassandra instance {}",
instance.config().num());
Server server = startSidecarWithInstances(List.of(instance), peersModule);
sidecarServerList.add(new TestSidecarHostInfo(instance, server, server.actualPort()));
cassandraInstance.config().num());
Server server = startSidecarWithInstances(List.of(cassandraInstance), peersModule);
sidecarServerList.add(new TestSidecarHostInfo(cassandraInstance, server, server.actualPort()));
}

assertThat(sidecarServerList.size()).as("Each Cassandra Instance will be managed by a single Sidecar instance")
Expand Down Expand Up @@ -100,8 +98,8 @@ public SidecarPeerHealthConfiguration sidecarPeerHealthConfiguration()
void stopSidecarInstanceForTest(int instanceId) throws Exception
{
assertThat(sidecarServerList).isNotEmpty();
TestSidecarHostInfo server = sidecarServerList.get(instanceId);
server.getServer().stop().toCompletionStage().toCompletableFuture().get(30, TimeUnit.SECONDS);
Server server = sidecarServerList.get(instanceId).getServer();
server.stop(serverDeploymentIds.get(server)).toCompletionStage().toCompletableFuture().get(30, TimeUnit.SECONDS);
}

void startSidecarInstanceForTest(int instanceId) throws Exception
Expand Down Expand Up @@ -140,34 +138,37 @@ protected Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Bu
}

@Test
void oneBuddyDownTest() throws Exception
void onePeerDownTest() throws Exception
{
HttpResponse<Buffer> response = getBlocking(trustedClient().get(server.actualPort(), "localhost", "/api/v1/peers/__health")
.send());
assertTrue(response.bodyAsJsonObject().isEmpty());
Thread.sleep(10000);
response = getBlocking(trustedClient().get(server.actualPort(), "localhost", "/api/v1/peers/__health")
.send());

assertEquals("OK", response.bodyAsJsonObject().getJsonObject("localhost2").getString("status"));

SidecarPeerHealthMonitorTask monitor = peerHealthMonitors.get(sidecarServerList.get(0).getServer());
// Monitor hasn't had time to perform checks
assertTrue(monitor.getStatus().isEmpty());
Thread.sleep(5000);
// After some time, peer is up
checkHostUp(monitor, "localhost2");
stopSidecarInstanceForTest(1);

Thread.sleep(10000);

response = getBlocking(trustedClient().get(server.actualPort(), "localhost", "/api/v1/peers/__health")
.send());

assertEquals("DOWN", response.bodyAsJsonObject().getJsonObject("localhost2").getString("status"));

Thread.sleep(5000);
// After killing peer sidecar instance, monitor caches up and the host is down
checkHostDown(monitor, "localhost2");
startSidecarInstanceForTest(1);
Thread.sleep(5000);
// After restarting peer sidecar instance, monitor caches up and the host is down
checkHostUp(monitor, "localhost2");
}

Thread.sleep(10000);
private boolean checkHostUp(SidecarPeerHealthMonitorTask monitor, String hostname)
{
return checkHostStatus(monitor, hostname, SidecarPeerHealthProvider.Health.OK);
}

response = getBlocking(trustedClient().get(server.actualPort(), "localhost", "/api/v1/peers/__health")
.send());
private boolean checkHostDown(SidecarPeerHealthMonitorTask monitor, String hostname)
{
return checkHostStatus(monitor, hostname, SidecarPeerHealthProvider.Health.DOWN);
}

assertEquals("OK", response.bodyAsJsonObject().getJsonObject("localhost2").getString("status"));
private boolean checkHostStatus(SidecarPeerHealthMonitorTask monitor, String hostname, SidecarPeerHealthProvider.Health status)
{
return monitor.getStatus().entrySet().stream().filter(e -> e.getKey().hostname().equals(hostname)).findAny().orElseThrow().getValue().equals(status);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public InstanceMetadataImpl transform(InstanceMetadataImpl instance)
@Override
public String name()
{
return "SidecarInstance";
return "PeerInstance";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,13 @@
/**
* Configuration for sidecar client
*/
public interface SidecarClientConfiguration extends SidecarClientConfig
public interface SidecarClientConfiguration
{
/**
* @return {@code true} if SSL should be used for Sidecar client connections
*/
boolean useSsl();

/**
* @return {@code true} if the keystore is configured, and the {@link KeyStoreConfiguration#path()} and
* {@link KeyStoreConfiguration#password()} parameters are provided
*/
default boolean isKeystoreConfigured()
{
return keystore() != null && keystore().isConfigured();
}

/**
* @return the configuration for the keystore
*/
KeyStoreConfiguration keystore();

/**
* @return {@code true} if the truststore is configured, and the {@link KeyStoreConfiguration#path()} and
* {@link KeyStoreConfiguration#password()} parameters are provided
*/
default boolean isTruststoreConfigured()
{
return truststore() != null && truststore().isConfigured();
}

/**
* @return the configuration for the truststore
*/
KeyStoreConfiguration truststore();

/**
* @return the client request timeout value for the connection to be established
*/
Expand Down Expand Up @@ -99,4 +71,19 @@ default boolean isTruststoreConfigured()
* a ConnectionPoolTooBusyException. If the value is set to a negative number then the queue will be unbounded.
*/
int connectionPoolMaxWaitQueueSize();

/**
* @return the maximum number of retries for a failed call.
*/
int maxRetries();

/**
* @return the delay between retries.
*/
MillisecondBoundConfiguration retryDelay();

/**
* @return the max delay between retries.
*/
MillisecondBoundConfiguration maxRetryDelay();
}
Loading

0 comments on commit 50d464d

Please sign in to comment.