From 6a72d731cbe656c895bbaf06b16cdc4880cf0dd4 Mon Sep 17 00:00:00 2001 From: Bernardo Botella Corbi Date: Tue, 11 Feb 2025 15:53:07 -0800 Subject: [PATCH] Some refactors and improvements. --- .../base/CassandraStorageOperations.java | 61 --------- .../GossipDependentStorageJmxOperations.java | 6 - .../adapters/base/StorageJmxOperations.java | 11 -- build.gradle | 4 +- .../sidecar/common/ApiEndpointsV1.java | 3 - client/build.gradle | 2 +- .../sidecar/client/SidecarClient.java | 40 +++--- conf/sidecar.yaml | 6 + gradle.properties | 4 +- integration-framework/build.gradle | 1 + .../distributed/impl/CassandraCluster.java | 2 + .../testing/TemporaryCqlSessionProvider.java | 2 + server-common/build.gradle | 2 +- .../common/server/StorageOperations.java | 15 --- server/build.gradle | 5 +- .../config/SidecarClientConfiguration.java | 20 ++- .../yaml/SidecarClientConfigurationImpl.java | 20 ++- .../CassandraClientTokenRingProvider.java | 28 +++-- .../coordination/CassandraInstance.java | 26 ---- .../SidecarHttpHealthProvider.java | 43 +++---- ...java => SidecarPeerHealthMonitorTask.java} | 51 +++----- .../coordination/TokenRingProvider.java | 28 +---- .../routes/SidecarPeersHealthHandler.java | 103 --------------- .../cassandra/sidecar/server/MainModule.java | 42 +------ .../cassandra/sidecar/server/Server.java | 41 +----- .../sidecar/utils/SidecarClientProvider.java | 35 +++--- .../sidecar/testing/IntegrationTestBase.java | 11 -- .../config/SidecarConfigurationTest.java | 8 +- .../config/sidecar_down_detector_config.yaml | 118 ------------------ 29 files changed, 139 insertions(+), 599 deletions(-) delete mode 100644 server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraInstance.java rename server/src/main/java/org/apache/cassandra/sidecar/coordination/{SidecarPeerHealthMonitor.java => SidecarPeerHealthMonitorTask.java} (74%) delete mode 100644 server/src/main/java/org/apache/cassandra/sidecar/routes/SidecarPeersHealthHandler.java delete mode 100644 server/src/test/resources/config/sidecar_down_detector_config.yaml diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index 261b71e52..b67eb211f 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -22,10 +22,8 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; @@ -37,7 +35,6 @@ import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioners; import org.apache.cassandra.sidecar.common.server.data.Name; -import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.exceptions.NodeBootstrappingException; import org.apache.cassandra.sidecar.common.server.exceptions.SnapshotAlreadyExistsException; @@ -45,9 +42,7 @@ import org.jetbrains.annotations.Nullable; import static java.util.Objects.requireNonNull; -import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME; import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME; -import static org.apache.cassandra.sidecar.common.utils.Preconditions.checkArgument; /** * An implementation of the {@link StorageOperations} that interfaces with Cassandra 4.0 and later @@ -258,60 +253,4 @@ public String clusterName() return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) .getClusterName(); } - - @Override - public Set naturalEndpointsInLocalDatacenter(List tableNameList, - String localDatacenter, - Set tokens) throws UnknownHostException - { - requireNonNull(tableNameList, "tableNameList must be non-null"); - requireNonNull(localDatacenter, "localDatacenter must be non-null"); - requireNonNull(tokens, "tokens must be non-null"); - checkArgument(!tableNameList.isEmpty(), "tableNameList must be non-empty"); - checkArgument(!tokens.isEmpty(), "tokens must be non-empty"); - - Set nonLocalEndpoints = new HashSet<>(); - Set naturalEndpoints = new HashSet<>(); - StorageJmxOperations storageOps = initializeStorageOps(); - EndpointSnitchJmxOperations epSnitchInfo = initializeEndpointProxy(); - for (QualifiedTableName name : tableNameList) - { - for (String token : tokens) - { - List naturalEndpointsWithPort = storageOps.getNaturalEndpointsWithPort(name.keyspace(), name.tableName(), token); - for (String endpoint : naturalEndpointsWithPort) - { - if (naturalEndpoints.contains(endpoint) || - nonLocalEndpoints.contains(endpoint)) - { - continue; // We already have checked this endpoint - } - - String datacenter = epSnitchInfo.getDatacenter(endpoint); - - if (localDatacenter.equals(datacenter)) - { - naturalEndpoints.add(endpoint); - } - else - { - nonLocalEndpoints.add(endpoint); // avoid checking the datacenter information from the endpoint snitch again - } - } - } - } - - return naturalEndpoints; - } - - protected EndpointSnitchJmxOperations initializeEndpointProxy() - { - return jmxClient.proxy(EndpointSnitchJmxOperations.class, ENDPOINT_SNITCH_INFO_OBJ_NAME); - } - - protected StorageJmxOperations initializeStorageOps() - { - return new GossipDependentStorageJmxOperations(jmxClient.proxy(StorageJmxOperations.class, - STORAGE_SERVICE_OBJ_NAME)); - } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java index f3d2fbc7b..7d2a5831a 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java @@ -122,12 +122,6 @@ public Map, List> getRangeToEndpointWithPortMap(String keys return delegate.getRangeToEndpointWithPortMap(keyspace); } - @Override - public List getNaturalEndpointsWithPort(String keyspaceName, String cf, String key) - { - return delegate.getNaturalEndpointsWithPort(keyspaceName, cf, key); - } - @Override public Map, List> getPendingRangeToEndpointWithPortMap(String keyspace) { diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java index d4e0ba76b..0b8cd45db 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java @@ -127,17 +127,6 @@ public interface StorageJmxOperations */ Map, List> getRangeToEndpointWithPortMap(String keyspace); - /** - * This method returns the N endpoints that are responsible for storing the - * specified key i.e for replication. - * - * @param keyspaceName keyspace name - * @param cf Column family name - * @param key Key for which we need to find the endpoint return value - - * the endpoint responsible for this key - */ - List getNaturalEndpointsWithPort(String keyspaceName, String cf, String key); - /** * Retrieve the list of pending node endpoints by token range for the given keyspace * diff --git a/build.gradle b/build.gradle index 20d5c4d62..599f90b14 100644 --- a/build.gradle +++ b/build.gradle @@ -102,8 +102,6 @@ allprojects { shouldRunAfter(tasks.withType(Checkstyle)) shouldRunAfter(tasks.withType(RatTask)) } - - ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?: "${rootDir}/dtest-jars") + "/" } group 'org.apache.cassandra' @@ -241,7 +239,7 @@ subprojects { } test { - jvmArgs "-XX:+MaxFDLimit" + jvmArgs "-XX:-MaxFDLimit" finalizedBy jacocoTestReport // report is always generated after tests run } } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index 6b6a15136..511a5daa4 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -29,7 +29,6 @@ public final class ApiEndpointsV1 public static final String HEALTH = "/__health"; public static final String CASSANDRA = "/cassandra"; - public static final String PEERS = "/peers"; public static final String NATIVE = "/native"; public static final String JMX = "/jmx"; @@ -63,8 +62,6 @@ public final class ApiEndpointsV1 public static final String CASSANDRA_NATIVE_HEALTH_ROUTE = API_V1 + CASSANDRA + NATIVE + HEALTH; public static final String CASSANDRA_JMX_HEALTH_ROUTE = API_V1 + CASSANDRA + JMX + HEALTH; - public static final String SIDECAR_PEERS_HEALTH_ROUTE = API_V1 + PEERS + HEALTH; - @Deprecated // NOTE: Uses singular forms of "keyspace" and "table" public static final String DEPRECATED_SNAPSHOTS_ROUTE = API_V1 + "/keyspace/" + KEYSPACE_PATH_PARAM + "/table/" + TABLE_PATH_PARAM + diff --git a/client/build.gradle b/client/build.gradle index 2e5846664..194c6b663 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -43,7 +43,7 @@ repositories { test { useJUnitPlatform() if (Os.isFamily(Os.FAMILY_MAC)) { - jvmArgs "-XX:+MaxFDLimit" + jvmArgs "-XX:-MaxFDLimit" } testLogging { events "passed", "skipped", "failed" diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index dd177d819..ca595a8a6 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -35,7 +35,6 @@ import org.apache.cassandra.sidecar.client.retry.RunnableOnStatusCodeRetryPolicy; import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy; import org.apache.cassandra.sidecar.client.selection.RandomInstanceSelectionPolicy; -import org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy; import org.apache.cassandra.sidecar.common.request.AbortRestoreJobRequest; import org.apache.cassandra.sidecar.common.request.CreateRestoreJobRequest; import org.apache.cassandra.sidecar.common.request.CreateRestoreJobSliceRequest; @@ -114,21 +113,6 @@ public CompletableFuture sidecarHealth() .build()); } - /** - * Executes the Sidecar health request against a single sidecar instance and retries to confirm - * the Sidecar is DOWN for extended period of time. - * - * @return a completable future of the Sidecar health response - */ - public CompletableFuture sidecarInstanceHealth(SidecarInstance instance, RetryPolicy retryPolicy) - { - return executor.executeRequestAsync(requestBuilder() - .sidecarHealthRequest() - .instanceSelectionPolicy(new SingleInstanceSelectionPolicy(instance)) - .retryPolicy(retryPolicy) - .build()); - } - /** * Executes the Cassandra health request using the configured selection policy and with no retries * @@ -240,6 +224,7 @@ public CompletableFuture gossipInfo() /** * Executes the GET gossip health request using the default retry policy and configured selection policy + * * @param instance the instance where the request will be executed * @return a completable future with gossip health response */ @@ -527,27 +512,28 @@ public CompletableFuture cleanUploadSession(SidecarInstance instance, Stri /** * Lists CDC commit logs in CDC directory for an instance + * * @param sidecarInstance instance on which the CDC commit logs are to be listed * @return a completable future with List of cdc commitLogs on the requested instance */ public CompletableFuture listCdcSegments(SidecarInstance sidecarInstance) { return executor.executeRequestAsync(requestBuilder() - .singleInstanceSelectionPolicy(sidecarInstance) - .request(new ListCdcSegmentsRequest()) - .build()); + .singleInstanceSelectionPolicy(sidecarInstance) + .request(new ListCdcSegmentsRequest()) + .build()); } /** * Streams CDC commit log segments from the requested instance. - * + *

* Streams the specified {@code range} of a CDC CommitLog from the given instance and the * stream is consumed by the {@link StreamConsumer consumer}. * * @param sidecarInstance instance on which the CDC commit logs are to be streamed - * @param segment segment file name - * @param range range of the file to be streamed - * @param streamConsumer object that consumes the stream + * @param segment segment file name + * @param range range of the file to be streamed + * @param streamConsumer object that consumes the stream */ public void streamCdcSegments(SidecarInstance sidecarInstance, String segment, @@ -555,9 +541,9 @@ public void streamCdcSegments(SidecarInstance sidecarInstance, StreamConsumer streamConsumer) { executor.streamRequest(requestBuilder() - .singleInstanceSelectionPolicy(sidecarInstance) - .request(new StreamCdcSegmentRequest(segment, range)) - .build(), streamConsumer); + .singleInstanceSelectionPolicy(sidecarInstance) + .request(new StreamCdcSegmentRequest(segment, range)) + .build(), streamConsumer); } /** @@ -697,6 +683,7 @@ public CompletableFuture listOperationalJobs(Sideca /** * Executes the streams stats request using the default retry policy and configured selection policy + * * @param instance the instance where the request will be executed * @return a completable future of the connected client stats */ @@ -710,6 +697,7 @@ public CompletableFuture streamsStats(SidecarInstance insta /** * Executes the node decommission request using the default retry policy and configured selection policy + * * @param instance the instance where the request will be executed * @return a completable future of the jobs list */ diff --git a/conf/sidecar.yaml b/conf/sidecar.yaml index 8f790b8fc..bb610fa41 100644 --- a/conf/sidecar.yaml +++ b/conf/sidecar.yaml @@ -235,6 +235,12 @@ healthcheck: initial_delay: 0ms execute_interval: 30s +sidecar_peer_health: + enabled: true + execute_interval: 30s + health_check_retries: 5 + health_check_retry_delay: 10s + metrics: registry_name: cassandra_sidecar vertx: diff --git a/gradle.properties b/gradle.properties index 054db3b96..c14be2269 100644 --- a/gradle.properties +++ b/gradle.properties @@ -34,6 +34,4 @@ commonsCodecVersion=1.16.1 boringSslVersion=2.0.61.Final # If running MacOS then you need to increase the max # open FD limit -org.gradle.jvmargs=-XX:+MaxFDLimit - -assertjCoreVersion=3.24.2 +org.gradle.jvmargs=-XX:-MaxFDLimit diff --git a/integration-framework/build.gradle b/integration-framework/build.gradle index 400c0b686..04d370dda 100644 --- a/integration-framework/build.gradle +++ b/integration-framework/build.gradle @@ -67,6 +67,7 @@ dependencies { implementation(group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.14.3') api(testFixtures(project(path: ":test-common"))) + testImplementation(platform("org.junit:junit-bom:${project.junitVersion}")) testImplementation('org.junit.jupiter:junit-jupiter') testImplementation("org.assertj:assertj-core:${assertjCoreVersion}") diff --git a/integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java index 0042cb6b6..fc082a464 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java +++ b/integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java @@ -47,6 +47,7 @@ import org.apache.cassandra.testing.IClusterExtension; import org.apache.cassandra.testing.Partitioner; import org.apache.cassandra.testing.TestTokenSupplier; +import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; @@ -269,6 +270,7 @@ public Stream stream(String s, String s1) return delegate.stream(s, s1); } + @NotNull @Override public Iterator iterator() { diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TemporaryCqlSessionProvider.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TemporaryCqlSessionProvider.java index 0690579d1..61fb09d5b 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TemporaryCqlSessionProvider.java +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TemporaryCqlSessionProvider.java @@ -36,6 +36,7 @@ import com.datastax.driver.core.policies.ReconnectionPolicy; import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL; @@ -59,6 +60,7 @@ public TemporaryCqlSessionProvider(List contactPoints, NettyO this.contactPoints = contactPoints; } + @NotNull @Override public synchronized Session get() { diff --git a/server-common/build.gradle b/server-common/build.gradle index c14554505..755b3dd11 100644 --- a/server-common/build.gradle +++ b/server-common/build.gradle @@ -39,7 +39,7 @@ sourceCompatibility = JavaVersion.VERSION_11 test { useJUnitPlatform() if (Os.isFamily(Os.FAMILY_MAC)) { - jvmArgs "-XX:+MaxFDLimit" + jvmArgs "-XX:-MaxFDLimit" } maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 reports { diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java index 5471ef3d6..0a5c16cdf 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java @@ -22,13 +22,11 @@ import java.net.UnknownHostException; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.cassandra.sidecar.common.response.RingResponse; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.server.data.Name; -import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -128,17 +126,4 @@ default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String tab * @return the name of the cluster */ String clusterName(); - - /** - * Returns the set of natural replication endpoints in the local datacenter. - * - * @param tableNameList a list of qualified table names to use for retrieving the natural endpoints - * @param localDatacenter the name of the datacenter used to filter endpoints that are in the same datacenter - * @param tokens a set of tokens managed by the instance - * @return the set of natural replication endpoints in the local datacenter - */ - Set naturalEndpointsInLocalDatacenter(List tableNameList, - String localDatacenter, - Set tokens) throws UnknownHostException; - } diff --git a/server/build.gradle b/server/build.gradle index 0cb9b2559..fc53937fa 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -140,7 +140,6 @@ dependencies { testImplementation('org.mockito:mockito-core:4.10.0') testImplementation('org.mockito:mockito-inline:4.10.0') testImplementation("io.vertx:vertx-junit5:${project.vertxVersion}") -// implementation(testFixtures(project(":client"))) testImplementation(testFixtures(project(":client-common"))) testImplementation(testFixtures(project(":server-common"))) testImplementation(testFixtures(project(":test-common"))) @@ -199,7 +198,7 @@ test { // see the integrationTest task useJUnitPlatform() if (Os.isFamily(Os.FAMILY_MAC)) { - jvmArgs "-XX:+MaxFDLimit" + jvmArgs "-XX:-MaxFDLimit" } reports { junitXml.setRequired(true) @@ -222,7 +221,7 @@ tasks.register("containerTest", Test) { jvmArgs(project.ext.JDK11_OPTIONS) println("JVM arguments for $project.name are $allJvmArgs") } else { - jvmArgs '-XX:+MaxFDLimit' + jvmArgs '-XX:-MaxFDLimit' } useJUnitPlatform() diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarClientConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarClientConfiguration.java index 317a7881b..076e82158 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarClientConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarClientConfiguration.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.config; import org.apache.cassandra.sidecar.client.SidecarClientConfig; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; /** * Configuration for sidecar client @@ -30,11 +31,6 @@ public interface SidecarClientConfiguration extends SidecarClientConfig */ boolean useSsl(); - /** - * @return {@code true} if OpenSSL is preferred when available, {@code false} to use JDK's SSL implementation - */ - boolean preferOpenSSL(); - /** * @return {@code true} if the keystore is configured, and the {@link KeyStoreConfiguration#path()} and * {@link KeyStoreConfiguration#password()} parameters are provided @@ -64,14 +60,14 @@ default boolean isTruststoreConfigured() KeyStoreConfiguration truststore(); /** - * @return the client request timeout value in milliseconds for the connection to be established + * @return the client request timeout value for the connection to be established */ - long requestTimeoutMillis(); + MillisecondBoundConfiguration requestTimeout(); /** - * @return the client idle timeout in milliseconds before the connection is considered as stale + * @return the client idle timeout before the connection is considered as stale */ - long requestIdleTimeoutMillis(); + MillisecondBoundConfiguration requestIdleTimeout(); // Pooling options @@ -81,10 +77,10 @@ default boolean isTruststoreConfigured() int connectionPoolMaxSize(); /** - * @return the connection pool cleaner period in milliseconds, a non-positive value disables expiration checks - * and connections will remain in the pool until they are closed. + * @return the connection pool cleaner period, a non-positive value disables expiration checks and connections + * will remain in the pool until they are closed. */ - long connectionPoolCleanerPeriodMillis(); + MillisecondBoundConfiguration connectionPoolCleanerPeriod(); /** * Return the configured number of event-loop the pool use. diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarClientConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarClientConfigurationImpl.java index f8b97ba67..859a4f11a 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarClientConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarClientConfigurationImpl.java @@ -20,6 +20,7 @@ import java.time.Duration; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; import org.apache.cassandra.sidecar.config.SidecarClientConfiguration; @@ -29,19 +30,12 @@ */ public class SidecarClientConfigurationImpl implements SidecarClientConfiguration { - @Override public boolean useSsl() { return false; } - @Override - public boolean preferOpenSSL() - { - return false; - } - @Override public KeyStoreConfiguration keystore() { @@ -55,15 +49,15 @@ public KeyStoreConfiguration truststore() } @Override - public long requestTimeoutMillis() + public MillisecondBoundConfiguration requestTimeout() { - return 0; + return null; } @Override - public long requestIdleTimeoutMillis() + public MillisecondBoundConfiguration requestIdleTimeout() { - return 0; + return null; } @Override @@ -73,9 +67,9 @@ public int connectionPoolMaxSize() } @Override - public long connectionPoolCleanerPeriodMillis() + public MillisecondBoundConfiguration connectionPoolCleanerPeriod() { - return 0; + return null; } @Override diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java index 9aeb1805d..65026310b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraClientTokenRingProvider.java @@ -30,7 +30,6 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.cassandra.sidecar.client.SidecarInstance; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider; @@ -179,7 +178,7 @@ public static Map>>> assignedRangesOf perDcHosts.computeIfAbsent(host.getDatacenter(), (dc) -> new ArrayList<>()) .add(new CassandraInstance(tokenToString(minToken), getIpFromHost(dnsResolver, host))); } - perDcHosts.forEach((dc, hosts) -> hosts.sort(Comparator.comparing(o -> new BigInteger(o.token())))); + perDcHosts.forEach((dc, hosts) -> hosts.sort(Comparator.comparing(o -> new BigInteger(o.token)))); return perDcHosts.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> calculateTokenRanges(partitioner, e.getValue()))); @@ -203,7 +202,7 @@ protected static Map>> calculateTokenRanges(Parti // RingTopologyRefresher.calculate... return calculateTokenRanges(sortedPerDcHosts, 1, partitioner) .entries().stream() - .collect(Collectors.groupingBy(e -> e.getKey().nodeName(), Collectors.mapping(Map.Entry::getValue, Collectors.toList()))); + .collect(Collectors.groupingBy(e -> e.getKey().node, Collectors.mapping(Map.Entry::getValue, Collectors.toList()))); } private Map localInstanceIds() @@ -220,14 +219,14 @@ private Map localInstanceIds() .collect(Collectors.toMap(entry -> ipToHost.get(entry.getKey()), entry -> localIps.get(entry.getKey()))); } - private Metadata validatedMetadata(List localInstances) + private Metadata validatedMetadata(List localInstances) { if (localInstances.isEmpty()) { LOGGER.warn("No local instances found"); throw new RuntimeException("No local instances found"); } - return firstAvailableOperationFromDelegate(CassandraAdapterDelegate::metadata); + return fetcher.callOnFirstAvailableInstance(instanceMetadata -> instanceMetadata.delegate().metadata()); } private static Map>> perKeySpaceTokenRangesOfAllInstances(final Metadata metadata) @@ -264,8 +263,8 @@ public static Multimap> calculateTokenRange { CassandraInstance instance = instances.get(index); int disjointReplica = (instances.size() + index - replicationFactor) % instances.size(); - BigInteger rangeStart = new BigInteger((instances.get(disjointReplica)).token()); - BigInteger rangeEnd = new BigInteger(instance.token()); + BigInteger rangeStart = new BigInteger((instances.get(disjointReplica)).token); + BigInteger rangeEnd = new BigInteger(instance.token); if (rangeStart.compareTo(rangeEnd) >= 0) { tokenRanges.put(instance, Range.openClosed(rangeStart, partitioner.maximumToken().toBigInteger())); @@ -279,4 +278,19 @@ public static Multimap> calculateTokenRange } return tokenRanges; } + + /** + * Class to encapsule Cassandra instance data + */ + private static class CassandraInstance + { + private final String token; + private final String node; + + public CassandraInstance(String token, String node) + { + this.token = token; + this.node = node; + } + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraInstance.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraInstance.java deleted file mode 100644 index 84c06a5da..000000000 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/CassandraInstance.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.cassandra.sidecar.coordination; - -/** - * Class to encapsule Cassandra instance data - */ -public class CassandraInstance -{ - private final String token; - private final String node; - - public CassandraInstance(String token, String node) - { - this.token = token; - this.node = node; - } - - public String token() - { - return token; - } - - public String nodeName() - { - return node; - } -} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java index 8b613809c..114d987eb 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarHttpHealthProvider.java @@ -18,16 +18,15 @@ package org.apache.cassandra.sidecar.coordination; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import com.google.inject.Inject; import com.google.inject.Singleton; import io.vertx.core.Future; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstance; -import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.sidecar.client.retry.BasicRetryPolicy; +import org.apache.cassandra.sidecar.common.response.HealthResponse; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.SidecarPeerHealthConfiguration; import org.apache.cassandra.sidecar.utils.SidecarClientProvider; @@ -40,21 +39,14 @@ @Singleton public class SidecarHttpHealthProvider implements SidecarPeerHealthProvider { - private static final Logger LOGGER = LoggerFactory.getLogger(SidecarHttpHealthProvider.class); - - private final SidecarClient client; private final SidecarPeerHealthConfiguration config; + private final SidecarClientProvider clientProvider; @Inject public SidecarHttpHealthProvider(SidecarConfiguration sidecarConfiguration, SidecarClientProvider clientProvider) { - config = sidecarConfiguration.sidecarPeerHealthConfiguration(); - client = clientProvider.get(); - } - - protected BasicRetryPolicy retryPolicy() - { - return new BasicRetryPolicy(config.healthCheckRetries(), config.healthCheckRetryDelay().toMillis()); + this.config = sidecarConfiguration.sidecarPeerHealthConfiguration(); + this.clientProvider = clientProvider; } @Override @@ -62,16 +54,25 @@ public Future health(SidecarInstance instance) { try { - BasicRetryPolicy retryPolicy = retryPolicy(); - return Future.fromCompletionStage(client.sidecarInstanceHealth(new SidecarInstanceImpl(instance.hostname(), instance.port()), retryPolicy)) - .compose(healthResponse -> { - if (healthResponse.isOk()) - return Future.succeededFuture(Health.OK); - return Future.succeededFuture(Health.DOWN); - }); - } catch (Exception e) + SidecarClient client = clientProvider.get(); + CompletableFuture healthRequest = client.executeRequestAsync(client.requestBuilder() + .singleInstanceSelectionPolicy(instance) + .retryPolicy(retryPolicy()) + .sidecarHealthRequest() + .build()); + return Future.fromCompletionStage(healthRequest) + .map(healthResponse -> healthResponse.isOk() + ? Health.OK + : Health.DOWN); + } + catch (Exception e) { return Future.succeededFuture(Health.DOWN); } } + + protected BasicRetryPolicy retryPolicy() + { + return new BasicRetryPolicy(config.healthCheckRetries(), config.healthCheckRetryDelay().toMillis()); + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitor.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java similarity index 74% rename from server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitor.java rename to server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java index a0d82781d..de25edcfe 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitor.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/SidecarPeerHealthMonitorTask.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,9 +52,9 @@ * listeners when other Sidecar(s) goes DOWN or OK. */ @Singleton -public class SidecarPeerHealthMonitor implements PeriodicTask +public class SidecarPeerHealthMonitorTask implements PeriodicTask { - private static final Logger LOGGER = LoggerFactory.getLogger(SidecarPeerHealthMonitor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SidecarPeerHealthMonitorTask.class); private final Vertx vertx; private final SidecarPeerHealthConfiguration config; @@ -65,11 +64,11 @@ public class SidecarPeerHealthMonitor implements PeriodicTask private final Map status = new ConcurrentHashMap<>(); @Inject - public SidecarPeerHealthMonitor(Vertx vertx, - SidecarConfiguration sidecarConfiguration, - SidecarPeerProvider sidecarPeerProvider, - SidecarPeerHealthProvider healthProvider, - SidecarInstanceCodecs sidecarInstanceCodecs) + public SidecarPeerHealthMonitorTask(Vertx vertx, + SidecarConfiguration sidecarConfiguration, + SidecarPeerProvider sidecarPeerProvider, + SidecarPeerHealthProvider healthProvider, + SidecarInstanceCodecs sidecarInstanceCodecs) { this.vertx = vertx; this.config = sidecarConfiguration.sidecarPeerHealthConfiguration(); @@ -78,11 +77,6 @@ public SidecarPeerHealthMonitor(Vertx vertx, vertx.eventBus().registerDefaultCodec(InstanceMetadataImpl.class, sidecarInstanceCodecs); } - public Set sidecarPeers() - { - return ImmutableSet.copyOf(status.keySet()); - } - @NotNull public SidecarPeerHealthProvider.Health status(InstanceMetadata instance) { @@ -125,25 +119,19 @@ protected Future run() return Future.succeededFuture(); } - List>> futures = + List> futures = sidecarPeers.stream() - .map(instance -> { - Future future = healthProvider.health(instance) - .onSuccess(health -> { - updateHealth(instance, health); - }) - .onFailure(throwable -> { - LOGGER.error("Failed to run health check, marking instance as DOWN host={} port={}", - instance.hostname(), instance.port(), throwable); - markDown(instance); - }); - return Future.succeededFuture(future); - } - - ).collect(Collectors.toList()); + .map(instance -> healthProvider.health(instance) + .onSuccess(health -> updateHealth(instance, health)) + .onFailure(throwable -> { + LOGGER.error("Failed to run health check, marking instance as DOWN host={} port={}", + instance.hostname(), instance.port(), throwable); + markDown(instance); + })) + .collect(Collectors.toList()); return Future.all(futures) - .onComplete( f -> { + .onComplete(f -> { if (f.succeeded()) { status.keySet().retainAll(sidecarPeers); @@ -199,9 +187,4 @@ public Map getStatus() { return status; } - - public SidecarPeerHealthProvider.Health getInstanceStatus(SidecarInstance sidecarInstance) - { - return status.get(sidecarInstance); - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/coordination/TokenRingProvider.java b/server/src/main/java/org/apache/cassandra/sidecar/coordination/TokenRingProvider.java index 4c0b820ce..0ac611694 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/coordination/TokenRingProvider.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/coordination/TokenRingProvider.java @@ -20,8 +20,10 @@ import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.response.NodeSettings; +import org.apache.cassandra.sidecar.common.server.StorageOperations; import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioner; import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioners; +import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; @@ -95,30 +97,10 @@ public Map>> getPrimaryTokenRanges(@Nullable Stri @Nullable public String localDc() { - NodeSettings nodeSettings = firstAvailableOperationFromDelegate(CassandraAdapterDelegate::nodeSettings); - return nodeSettings == null ? null : nodeSettings.datacenter(); + NodeSettings nodeSettings = fetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings()); + return nodeSettings.datacenter(); } - // TODO: shared - O firstAvailableOperationFromDelegate(Function mapper) - { - for (InstanceMetadata instance : instancesMetadata.instances()) - { - try - { - CassandraAdapterDelegate delegate = instance.delegate(); - return mapper.apply(delegate); - } - catch (CassandraUnavailableException exception) - { - // no-op; try the next instance - LOGGER.debug("CassandraAdapterDelegate is not available for instance. instance={}", instance, exception); - } - } - return null; - } - - /** * Returns the partitioner * Ex: RandomPartitioner, Murmur3Partitioner @@ -127,7 +109,7 @@ O firstAvailableOperationFromDelegate(Function */ public Partitioner partitioner() { - String[] tokens = fetcher.anyInstance().delegate().nodeSettings().partitioner().split("\\."); + String[] tokens = fetcher.callOnFirstAvailableInstance(instance -> instance.delegate().nodeSettings().partitioner().split("\\.")); return Partitioners.from(tokens[tokens.length - 1]); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/SidecarPeersHealthHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/SidecarPeersHealthHandler.java deleted file mode 100644 index da83d07d9..000000000 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/SidecarPeersHealthHandler.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.sidecar.routes; - -import java.util.Map; -import java.util.stream.Collectors; - -import com.google.inject.Inject; -import com.google.inject.Singleton; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.vertx.core.http.HttpHeaders; -import io.vertx.core.http.HttpServerRequest; -import io.vertx.core.json.Json; -import io.vertx.core.json.JsonObject; -import io.vertx.core.net.SocketAddress; -import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.client.SidecarInstance; -import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; -import org.apache.cassandra.sidecar.concurrent.ExecutorPools; -import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitor; -import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider; -import org.apache.cassandra.sidecar.utils.CassandraInputValidator; -import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; - -import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.JMX; -import static org.apache.cassandra.sidecar.server.MainModule.NOT_OK_STATUS; -import static org.apache.cassandra.sidecar.server.MainModule.OK_STATUS; - -/** - * Provides a simple REST endpoint to determine if a Cassandra node is available - */ -@Singleton -public class SidecarPeersHealthHandler extends AbstractHandler -{ - private final SidecarPeerHealthMonitor sidecarPeerHealthMonitor; - /** - * Constructs a handler with the provided {@code metadataFetcher} - * - * @param metadataFetcher the interface to retrieve instance metadata - * @param executorPools the executor pools for blocking executions - * @param validator a validator instance to validate Cassandra-specific input - */ - @Inject - public SidecarPeersHealthHandler(InstanceMetadataFetcher metadataFetcher, - ExecutorPools executorPools, - CassandraInputValidator validator, - SidecarPeerHealthMonitor sidecarPeerHealthMonitor) - { - super(metadataFetcher, executorPools, validator); - this.sidecarPeerHealthMonitor = sidecarPeerHealthMonitor; - } - - /** - * Handles the request with the parameters for this request. - * - * @param context the request context - * @param httpRequest the {@link HttpServerRequest} object - * @param host the host where this request is intended for - * @param remoteAddress the address where the request originates - * @param request the request object - */ - @Override - protected void handleInternal(RoutingContext context, - HttpServerRequest httpRequest, - String host, - SocketAddress remoteAddress, - Void request) - { - - Map status = sidecarPeerHealthMonitor.getStatus(); - - Map responseBody = status.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().hostname(), e -> { - JsonObject value = new JsonObject(); - value.put("port", e.getKey().port()); - value.put("status", e.getValue()); - return value; - })); - - context.json(responseBody); - } - - @Override - protected Void extractParamsOrThrow(RoutingContext context) - { - return null; - } -} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index 61321573a..a76cd56f0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -73,7 +73,6 @@ import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl; import org.apache.cassandra.sidecar.cluster.locator.CachedLocalTokenRanges; import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider; -import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodecs; import org.apache.cassandra.sidecar.common.ApiEndpointsV1; import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; import org.apache.cassandra.sidecar.common.server.JmxClient; @@ -101,7 +100,7 @@ import org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider; import org.apache.cassandra.sidecar.coordination.MostReplicatedKeyspaceTokenZeroElectorateMembership; import org.apache.cassandra.sidecar.coordination.SidecarHttpHealthProvider; -import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitor; +import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask; import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider; import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider; import org.apache.cassandra.sidecar.datahub.EmitterFactory; @@ -139,7 +138,6 @@ import org.apache.cassandra.sidecar.routes.RingHandler; import org.apache.cassandra.sidecar.routes.RoutingOrder; import org.apache.cassandra.sidecar.routes.SchemaHandler; -import org.apache.cassandra.sidecar.routes.SidecarPeersHealthHandler; import org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler; import org.apache.cassandra.sidecar.routes.StreamStatsHandler; import org.apache.cassandra.sidecar.routes.TimeSkewHandler; @@ -162,7 +160,6 @@ import org.apache.cassandra.sidecar.routes.sstableuploads.SSTableUploadHandler; import org.apache.cassandra.sidecar.routes.validations.ValidateTableExistenceHandler; import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; -import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; import org.apache.cassandra.sidecar.utils.DigestAlgorithmProvider; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; @@ -329,7 +326,6 @@ public Router vertxRouter(Vertx vertx, ChainAuthHandler chainAuthHandler, Supplier protectedRouteBuilderFactory, CassandraHealthHandler cassandraHealthHandler, - SidecarPeersHealthHandler sidecarPeersHealthHandler, StreamSSTableComponentHandler streamSSTableComponentHandler, FileStreamHandler fileStreamHandler, ClearSnapshotHandler clearSnapshotHandler, @@ -407,9 +403,6 @@ public Router vertxRouter(Vertx vertx, router.get(ApiEndpointsV1.CASSANDRA_JMX_HEALTH_ROUTE) .handler(cassandraHealthHandler); - router.get(ApiEndpointsV1.SIDECAR_PEERS_HEALTH_ROUTE) - .handler(sidecarPeersHealthHandler); - // Node settings endpoint is not Access protected. Any user who can log in into Cassandra is able to view // node settings information. Since sidecar and cassandra share list of authenticated identities, sidecar's // authenticated users can also read node settings information. @@ -775,34 +768,6 @@ public SidecarPeerProvider sidecarPeerProvider(InstancesMetadata instancesMetada return new InnerDcTokenAdjacentPeerProvider(instancesMetadata, cassandraClientTokenRingProvider, configuration.serviceConfiguration(), dnsResolver); } - @Provides - @Singleton - public SidecarPeerHealthMonitor sidecarPeerHealthMonitor(Vertx vertx, - SidecarConfiguration sidecarConfiguration, - SidecarPeerProvider sidecarPeerProvider, - SidecarPeerHealthProvider healthProvider, - SidecarInstanceCodecs sidecarInstanceCodecs) - { - return new SidecarPeerHealthMonitor(vertx, - sidecarConfiguration, - sidecarPeerProvider, - healthProvider, - sidecarInstanceCodecs); - } - - @Provides - @Singleton - public SidecarPeersHealthHandler sidecarPeersHealthHandler(InstanceMetadataFetcher metadataFetcher, - ExecutorPools executorPools, - CassandraInputValidator validator, - SidecarPeerHealthMonitor sidecarPeerHealthMonitor) - { - return new SidecarPeersHealthHandler(metadataFetcher, - executorPools, - validator, - sidecarPeerHealthMonitor); - } - @Provides @Singleton public RestoreJobsSchema restoreJobsSchema(SidecarConfiguration configuration) @@ -934,14 +899,13 @@ public PeriodicTaskExecutor periodicTaskExecutor(Vertx vertx, ClusterLease clusterLease, ClusterLeaseClaimTask clusterLeaseClaimTask, SchemaReportingTask schemaReportingTask, - SidecarPeerHealthMonitor sidecarPeerHealthMonitor - ) + SidecarPeerHealthMonitorTask sidecarPeerHealthMonitorTask) { PeriodicTaskExecutor periodicTaskExecutor = new PeriodicTaskExecutor(executorPools, clusterLease); vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(), ignored -> { periodicTaskExecutor.schedule(clusterLeaseClaimTask); - periodicTaskExecutor.schedule(sidecarPeerHealthMonitor); + periodicTaskExecutor.schedule(sidecarPeerHealthMonitorTask); }); vertx.eventBus().localConsumer(ON_ALL_CASSANDRA_CQL_READY.address(), message -> periodicTaskExecutor.schedule(schemaReportingTask)); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java index 72c6f784a..65e480c3b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java @@ -49,13 +49,10 @@ import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; -import org.apache.cassandra.sidecar.codecs.SidecarInstanceCodecs; import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.SslConfiguration; -import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider; -import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.tasks.HealthCheckPeriodicTask; import org.apache.cassandra.sidecar.tasks.KeyStoreCheckPeriodicTask; @@ -81,9 +78,6 @@ public class Server protected final PeriodicTaskExecutor periodicTaskExecutor; protected final HttpServerOptionsProvider optionsProvider; protected final SidecarMetrics metrics; - protected final SidecarPeerProvider sidecarPeerProvider; - protected final SidecarPeerHealthProvider sidecarPeerHealthProvider; - protected final SidecarInstanceCodecs sidecarInstanceCodecs; protected final List deployedServerVerticles = new CopyOnWriteArrayList<>(); // Keeps track of all the Cassandra instance identifiers where CQL is ready private final Set cqlReadyInstanceIds = Collections.synchronizedSet(new HashSet<>()); @@ -96,10 +90,7 @@ public Server(Vertx vertx, ExecutorPools executorPools, PeriodicTaskExecutor periodicTaskExecutor, HttpServerOptionsProvider optionsProvider, - SidecarMetrics metrics, - SidecarPeerProvider sidecarPeerProvider, - SidecarPeerHealthProvider sidecarPeerHealthProvider, - SidecarInstanceCodecs sidecarInstanceCodecs) + SidecarMetrics metrics) { this.vertx = vertx; this.executorPools = executorPools; @@ -109,9 +100,6 @@ public Server(Vertx vertx, this.periodicTaskExecutor = periodicTaskExecutor; this.optionsProvider = optionsProvider; this.metrics = metrics; - this.sidecarPeerProvider = sidecarPeerProvider; - this.sidecarPeerHealthProvider = sidecarPeerHealthProvider; - this.sidecarInstanceCodecs = sidecarInstanceCodecs; } /** @@ -156,16 +144,6 @@ public Future stop(String deploymentId) .onSuccess(v -> LOGGER.info("Successfully stopped Cassandra Sidecar")); } - /** - * Undeploy the server deployment, stopping all the {@link ServerVerticle verticles}. - * - * @return a future completed with the result - */ - public Future stop() - { - return stop(getDeploymentId()); - } - /** * Stops the {@link Vertx} instance and release any resources held by it. * @@ -329,13 +307,7 @@ protected Future scheduleInternalPeriodicTasks(String deploymentId) instancesMetadata, executorPools, metrics)); - - if (sidecarConfiguration.sidecarClientConfiguration().isKeystoreConfigured()) - { - periodicTaskExecutor.schedule(new KeyStoreCheckPeriodicTask(vertx, - this, - sidecarConfiguration.sslConfiguration())); - } + maybeScheduleKeyStoreCheckPeriodicTask(); MessageConsumer cqlReadyConsumer = vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address()); cqlReadyConsumer.handler(message -> onCqlReady(cqlReadyConsumer, message)); @@ -399,13 +371,4 @@ protected void notifyAllCassandraCqlAreReady() vertx.eventBus().publish(ON_ALL_CASSANDRA_CQL_READY.address(), allReadyMessage); } - - - /** - * Gets the server vertx Deployment Id to be used at stop time. - */ - private String getDeploymentId() - { - return vertx.deploymentIDs().stream().findFirst().orElseThrow(); - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/SidecarClientProvider.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/SidecarClientProvider.java index 4bcce746e..f39158c10 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/SidecarClientProvider.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/SidecarClientProvider.java @@ -45,7 +45,6 @@ import org.apache.cassandra.sidecar.config.SidecarClientConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.SslConfiguration; -import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; /** * Provider class for retrieving the singleton {@link SidecarClient} instance @@ -85,41 +84,41 @@ private SidecarClient initializeSidecarClient() WebClient webClient = WebClient.wrap(httpClient, webClientOptions); HttpClientConfig httpClientConfig = new HttpClientConfig.Builder<>() - .ssl(webClientOptions().isSsl()) - .timeoutMillis(clientConfig.requestTimeoutMillis()) - .idleTimeoutMillis((int) clientConfig.requestIdleTimeoutMillis()) - .userAgent("cassandra-sidecar/" + sidecarVersionProvider.sidecarVersion()) - .build(); + .ssl(webClientOptions().isSsl()) + .timeoutMillis(clientConfig.requestTimeout().toMillis()) + .idleTimeoutMillis(clientConfig.requestIdleTimeout().toIntMillis()) + .userAgent("cassandra-sidecar/" + sidecarVersionProvider.sidecarVersion()) + .build(); VertxHttpClient vertxHttpClient = new VertxHttpClient(vertx, webClient, httpClientConfig); RetryPolicy defaultRetryPolicy = new ExponentialBackoffRetryPolicy(clientConfig.maxRetries(), - clientConfig.retryDelayMillis(), - clientConfig.maxRetryDelayMillis()); + clientConfig.retryDelayMillis(), + clientConfig.maxRetryDelayMillis()); VertxRequestExecutor requestExecutor = new VertxRequestExecutor(vertxHttpClient); SidecarInstance instance = new SidecarInstanceImpl(webClientOptions.getDefaultHost(), webClientOptions.getDefaultPort()); ArrayList instances = new ArrayList<>(); instances.add(instance); SimpleSidecarInstancesProvider instancesProvider = new SimpleSidecarInstancesProvider(instances); return new SidecarClient(instancesProvider, - requestExecutor, - clientConfig, - defaultRetryPolicy); + requestExecutor, + clientConfig, + defaultRetryPolicy); } private WebClientOptions webClientOptions() { WebClientOptions options = new WebClientOptions(); options.getPoolOptions() - .setCleanerPeriod((int) clientConfig.connectionPoolCleanerPeriodMillis()) - .setEventLoopSize(clientConfig.connectionPoolEventLoopSize()) - .setHttp1MaxSize(10) - .setMaxWaitQueueSize(clientConfig.connectionPoolMaxWaitQueueSize()); + .setCleanerPeriod(clientConfig.connectionPoolCleanerPeriod().toIntMillis()) + .setEventLoopSize(clientConfig.connectionPoolEventLoopSize()) + .setHttp1MaxSize(clientConfig.connectionPoolMaxSize()) + .setMaxWaitQueueSize(clientConfig.connectionPoolMaxWaitQueueSize()); - boolean useSsl = false; + boolean useSsl = clientConfig.useSsl(); if (sslConfig.isKeystoreConfigured()) { options.setKeyStoreOptions(new JksOptions().setPath(sslConfig.keystore().path()) - .setPassword(sslConfig.keystore().password())); + .setPassword(sslConfig.keystore().password())); if (sslConfig.preferOpenSSL() && OpenSSLEngineOptions.isAvailable()) { LOGGER.info("Using OpenSSL for encryption in Webclient Options"); @@ -135,7 +134,7 @@ private WebClientOptions webClientOptions() if (sslConfig.truststore() != null && sslConfig.truststore().isConfigured()) { options.setTrustStoreOptions(new JksOptions().setPath(sslConfig.truststore().path()) - .setPassword(sslConfig.truststore().password())); + .setPassword(sslConfig.truststore().password())); } options.setSsl(useSsl); diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java index 4c32500fb..614cb4454 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java @@ -183,17 +183,6 @@ void setup(AbstractCassandraTestContext cassandraTestContext, TestInfo testInfo) }) .onFailure(context::failNow); - server.start() - .onSuccess(s -> { - sidecarTestContext.registerInstanceConfigListener(this::healthCheck); - if (!sidecarTestContext.isClusterBuilt()) - { - // Give everything a moment to get started and connected - vertx.setTimer(TimeUnit.SECONDS.toMillis(1), id1 -> context.completeNow()); - } - }) - .onFailure(context::failNow); - context.awaitCompletion(5, TimeUnit.SECONDS); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java index 7930e7ac3..563e8eae4 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java @@ -296,8 +296,12 @@ void testMetricsAllowedWithDefaultRegexFilter() @Test void testSidecarPeerHealthConfiguration() throws IOException { - Path yamlPath = yaml("config/sidecar_down_detector_config.yaml"); - SidecarConfigurationImpl sidecarConfiguration = SidecarConfigurationImpl.readYamlConfiguration(yamlPath); + String yaml = "sidecar_peer_health:\n" + + " enabled: false\n" + + " execute_interval: 1s\n" + + " health_check_retries: 1\n" + + " health_check_retry_delay: 2s"; + SidecarConfigurationImpl sidecarConfiguration = SidecarConfigurationImpl.readYamlConfiguration(yaml); assertThat(sidecarConfiguration).isNotNull(); SidecarPeerHealthConfiguration config = sidecarConfiguration.sidecarPeerHealthConfiguration(); assertThat(config).isNotNull(); diff --git a/server/src/test/resources/config/sidecar_down_detector_config.yaml b/server/src/test/resources/config/sidecar_down_detector_config.yaml deleted file mode 100644 index 6dc23eecd..000000000 --- a/server/src/test/resources/config/sidecar_down_detector_config.yaml +++ /dev/null @@ -1,118 +0,0 @@ -# -# Cassandra SideCar configuration file -# -cassandra: - host: localhost - port: 9042 - username: cassandra - password: cassandra - data_dirs: - - /ccm/test/node1/data0 - - /ccm/test/node1/data1 - staging_dir: /ccm/test/node1/sstable-staging - jmx_host: 127.0.0.1 - jmx_port: 7199 - jmx_role: controlRole - jmx_role_password: controlPassword - jmx_ssl_enabled: true - -sidecar: - host: 0.0.0.0 - port: 0 # bind sever to the first available port - request_idle_timeout_millis: 300000 # this field expects integer value - request_timeout_millis: 300000 - tcp_keep_alive: false - accept_backlog: 1024 - server_verticle_instances: 2 - throttle: - stream_requests_per_sec: 5000 - timeout_sec: 10 - traffic_shaping: - inbound_global_bandwidth_bps: 500 - outbound_global_bandwidth_bps: 1500 - peak_outbound_global_bandwidth_bps: 2000 - max_delay_to_wait_millis: 2500 - check_interval_for_stats_millis: 3000 - sstable_upload: - concurrent_upload_limit: 80 - min_free_space_percent: 10 - # file_permissions: "rw-r--r--" # when not specified, the default file permissions are owner read & write, group & others read - allowable_time_skew_in_minutes: 60 - sstable_import: - poll_interval_millis: 100 - cache: - expire_after_access_millis: 7200000 # 2 hours - maximum_size: 10000 - sstable_snapshot: - snapshot_list_cache: - expire_after_access_millis: 350 - maximum_size: 450 - worker_pools: - service: - name: "sidecar-worker-pool" - size: 20 - max_execution_time_millis: 60000 # 60 seconds - internal: - name: "sidecar-internal-worker-pool" - size: 20 - max_execution_time_millis: 900000 # 15 minutes - jmx: - max_retries: 42 - retry_delay_millis: 1234 - schema: - is_enabled: false - keyspace: sidecar_internal - replication_strategy: SimpleStrategy - replication_factor: 1 - -vertx: - filesystem_options: - classpath_resolving_enabled: true - file_cache_dir: /path/to/vertx/cache - file_caching_enabled: true - -# -# Enable SSL configuration (Disabled by default) -# -# ssl: -# enabled: true -# use_openssl: true -# handshake_timeout_sec: 10 -# client_auth: NONE # valid options are NONE, REQUEST, REQUIRED -# accepted_protocols: -# - TLSv1.2 -# - TLSv1.3 -# cipher_suites: [] -# keystore: -# type: PKCS12 -# path: "path/to/keystore.p12" -# password: password -# check_interval_sec: 300 -# truststore: -# path: "path/to/truststore.p12" -# password: password - - -healthcheck: - initial_delay_millis: 100 - poll_freq_millis: 30000 - -sidecar_peer_health: - enabled: false - frequency_millis: 1000 - health_check_retries: 1 - health_check_retry_delay_millis: 2000 - -cassandra_input_validation: - forbidden_keyspaces: - - system_schema - - system_traces - - system_distributed - - system - - system_auth - - system_views - - system_virtual_schema - allowed_chars_for_directory: "[a-zA-Z][a-zA-Z0-9_]{0,47}" - allowed_chars_for_quoted_name: "[a-zA-Z_0-9]{1,48}" - allowed_chars_for_component_name: "[a-zA-Z0-9_-]+(.db|.cql|.json|.crc32|TOC.txt)" - allowed_chars_for_restricted_component_name: "[a-zA-Z0-9_-]+(.db|TOC.txt)" \ No newline at end of file