Skip to content

Commit

Permalink
Some refactors and improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbotella committed Feb 11, 2025
1 parent 9d8eb11 commit 6a72d73
Show file tree
Hide file tree
Showing 29 changed files with 139 additions and 599 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.slf4j.Logger;
Expand All @@ -37,17 +35,14 @@
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.common.server.cluster.locator.Partitioners;
import org.apache.cassandra.sidecar.common.server.data.Name;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.common.server.exceptions.NodeBootstrappingException;
import org.apache.cassandra.sidecar.common.server.exceptions.SnapshotAlreadyExistsException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static java.util.Objects.requireNonNull;
import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
import static org.apache.cassandra.sidecar.common.utils.Preconditions.checkArgument;

/**
* An implementation of the {@link StorageOperations} that interfaces with Cassandra 4.0 and later
Expand Down Expand Up @@ -258,60 +253,4 @@ public String clusterName()
return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
.getClusterName();
}

@Override
public Set<String> naturalEndpointsInLocalDatacenter(List<QualifiedTableName> tableNameList,
String localDatacenter,
Set<String> tokens) throws UnknownHostException
{
requireNonNull(tableNameList, "tableNameList must be non-null");
requireNonNull(localDatacenter, "localDatacenter must be non-null");
requireNonNull(tokens, "tokens must be non-null");
checkArgument(!tableNameList.isEmpty(), "tableNameList must be non-empty");
checkArgument(!tokens.isEmpty(), "tokens must be non-empty");

Set<String> nonLocalEndpoints = new HashSet<>();
Set<String> naturalEndpoints = new HashSet<>();
StorageJmxOperations storageOps = initializeStorageOps();
EndpointSnitchJmxOperations epSnitchInfo = initializeEndpointProxy();
for (QualifiedTableName name : tableNameList)
{
for (String token : tokens)
{
List<String> naturalEndpointsWithPort = storageOps.getNaturalEndpointsWithPort(name.keyspace(), name.tableName(), token);
for (String endpoint : naturalEndpointsWithPort)
{
if (naturalEndpoints.contains(endpoint) ||
nonLocalEndpoints.contains(endpoint))
{
continue; // We already have checked this endpoint
}

String datacenter = epSnitchInfo.getDatacenter(endpoint);

if (localDatacenter.equals(datacenter))
{
naturalEndpoints.add(endpoint);
}
else
{
nonLocalEndpoints.add(endpoint); // avoid checking the datacenter information from the endpoint snitch again
}
}
}
}

return naturalEndpoints;
}

protected EndpointSnitchJmxOperations initializeEndpointProxy()
{
return jmxClient.proxy(EndpointSnitchJmxOperations.class, ENDPOINT_SNITCH_INFO_OBJ_NAME);
}

protected StorageJmxOperations initializeStorageOps()
{
return new GossipDependentStorageJmxOperations(jmxClient.proxy(StorageJmxOperations.class,
STORAGE_SERVICE_OBJ_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ public Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keys
return delegate.getRangeToEndpointWithPortMap(keyspace);
}

@Override
public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key)
{
return delegate.getNaturalEndpointsWithPort(keyspaceName, cf, key);
}

@Override
public Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,6 @@ public interface StorageJmxOperations
*/
Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace);

/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param keyspaceName keyspace name
* @param cf Column family name
* @param key Key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key);

/**
* Retrieve the list of pending node endpoints by token range for the given keyspace
*
Expand Down
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ allprojects {
shouldRunAfter(tasks.withType(Checkstyle))
shouldRunAfter(tasks.withType(RatTask))
}

ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?: "${rootDir}/dtest-jars") + "/"
}

group 'org.apache.cassandra'
Expand Down Expand Up @@ -241,7 +239,7 @@ subprojects {
}

test {
jvmArgs "-XX:+MaxFDLimit"
jvmArgs "-XX:-MaxFDLimit"
finalizedBy jacocoTestReport // report is always generated after tests run
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public final class ApiEndpointsV1

public static final String HEALTH = "/__health";
public static final String CASSANDRA = "/cassandra";
public static final String PEERS = "/peers";

public static final String NATIVE = "/native";
public static final String JMX = "/jmx";
Expand Down Expand Up @@ -63,8 +62,6 @@ public final class ApiEndpointsV1
public static final String CASSANDRA_NATIVE_HEALTH_ROUTE = API_V1 + CASSANDRA + NATIVE + HEALTH;
public static final String CASSANDRA_JMX_HEALTH_ROUTE = API_V1 + CASSANDRA + JMX + HEALTH;

public static final String SIDECAR_PEERS_HEALTH_ROUTE = API_V1 + PEERS + HEALTH;

@Deprecated // NOTE: Uses singular forms of "keyspace" and "table"
public static final String DEPRECATED_SNAPSHOTS_ROUTE = API_V1 + "/keyspace/" + KEYSPACE_PATH_PARAM +
"/table/" + TABLE_PATH_PARAM +
Expand Down
2 changes: 1 addition & 1 deletion client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ repositories {
test {
useJUnitPlatform()
if (Os.isFamily(Os.FAMILY_MAC)) {
jvmArgs "-XX:+MaxFDLimit"
jvmArgs "-XX:-MaxFDLimit"
}
testLogging {
events "passed", "skipped", "failed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.cassandra.sidecar.client.retry.RunnableOnStatusCodeRetryPolicy;
import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.RandomInstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy;
import org.apache.cassandra.sidecar.common.request.AbortRestoreJobRequest;
import org.apache.cassandra.sidecar.common.request.CreateRestoreJobRequest;
import org.apache.cassandra.sidecar.common.request.CreateRestoreJobSliceRequest;
Expand Down Expand Up @@ -114,21 +113,6 @@ public CompletableFuture<HealthResponse> sidecarHealth()
.build());
}

/**
* Executes the Sidecar health request against a single sidecar instance and retries to confirm
* the Sidecar is DOWN for extended period of time.
*
* @return a completable future of the Sidecar health response
*/
public CompletableFuture<HealthResponse> sidecarInstanceHealth(SidecarInstance instance, RetryPolicy retryPolicy)
{
return executor.executeRequestAsync(requestBuilder()
.sidecarHealthRequest()
.instanceSelectionPolicy(new SingleInstanceSelectionPolicy(instance))
.retryPolicy(retryPolicy)
.build());
}

/**
* Executes the Cassandra health request using the configured selection policy and with no retries
*
Expand Down Expand Up @@ -240,6 +224,7 @@ public CompletableFuture<GossipInfoResponse> gossipInfo()

/**
* Executes the GET gossip health request using the default retry policy and configured selection policy
*
* @param instance the instance where the request will be executed
* @return a completable future with gossip health response
*/
Expand Down Expand Up @@ -527,37 +512,38 @@ public CompletableFuture<Void> cleanUploadSession(SidecarInstance instance, Stri

/**
* Lists CDC commit logs in CDC directory for an instance
*
* @param sidecarInstance instance on which the CDC commit logs are to be listed
* @return a completable future with List of cdc commitLogs on the requested instance
*/
public CompletableFuture<ListCdcSegmentsResponse> listCdcSegments(SidecarInstance sidecarInstance)
{
return executor.executeRequestAsync(requestBuilder()
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new ListCdcSegmentsRequest())
.build());
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new ListCdcSegmentsRequest())
.build());
}

/**
* Streams CDC commit log segments from the requested instance.
*
* <p>
* Streams the specified {@code range} of a CDC CommitLog from the given instance and the
* stream is consumed by the {@link StreamConsumer consumer}.
*
* @param sidecarInstance instance on which the CDC commit logs are to be streamed
* @param segment segment file name
* @param range range of the file to be streamed
* @param streamConsumer object that consumes the stream
* @param segment segment file name
* @param range range of the file to be streamed
* @param streamConsumer object that consumes the stream
*/
public void streamCdcSegments(SidecarInstance sidecarInstance,
String segment,
HttpRange range,
StreamConsumer streamConsumer)
{
executor.streamRequest(requestBuilder()
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new StreamCdcSegmentRequest(segment, range))
.build(), streamConsumer);
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new StreamCdcSegmentRequest(segment, range))
.build(), streamConsumer);
}

/**
Expand Down Expand Up @@ -697,6 +683,7 @@ public CompletableFuture<ListOperationalJobsResponse> listOperationalJobs(Sideca

/**
* Executes the streams stats request using the default retry policy and configured selection policy
*
* @param instance the instance where the request will be executed
* @return a completable future of the connected client stats
*/
Expand All @@ -710,6 +697,7 @@ public CompletableFuture<StreamStatsResponse> streamsStats(SidecarInstance insta

/**
* Executes the node decommission request using the default retry policy and configured selection policy
*
* @param instance the instance where the request will be executed
* @return a completable future of the jobs list
*/
Expand Down
6 changes: 6 additions & 0 deletions conf/sidecar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ healthcheck:
initial_delay: 0ms
execute_interval: 30s

sidecar_peer_health:
enabled: true
execute_interval: 30s
health_check_retries: 5
health_check_retry_delay: 10s

metrics:
registry_name: cassandra_sidecar
vertx:
Expand Down
4 changes: 1 addition & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,4 @@ commonsCodecVersion=1.16.1
boringSslVersion=2.0.61.Final
# If running MacOS then you need to increase the max
# open FD limit
org.gradle.jvmargs=-XX:+MaxFDLimit

assertjCoreVersion=3.24.2
org.gradle.jvmargs=-XX:-MaxFDLimit
1 change: 1 addition & 0 deletions integration-framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dependencies {
implementation(group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.14.3')

api(testFixtures(project(path: ":test-common")))

testImplementation(platform("org.junit:junit-bom:${project.junitVersion}"))
testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation("org.assertj:assertj-core:${assertjCoreVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.cassandra.testing.IClusterExtension;
import org.apache.cassandra.testing.Partitioner;
import org.apache.cassandra.testing.TestTokenSupplier;
import org.jetbrains.annotations.NotNull;

import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
Expand Down Expand Up @@ -269,6 +270,7 @@ public Stream<I> stream(String s, String s1)
return delegate.stream(s, s1);
}

@NotNull
@Override
public Iterator<I> iterator()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.datastax.driver.core.policies.ReconnectionPolicy;
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
import org.jetbrains.annotations.NotNull;

import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL;

Expand All @@ -59,6 +60,7 @@ public TemporaryCqlSessionProvider(List<InetSocketAddress> contactPoints, NettyO
this.contactPoints = contactPoints;
}

@NotNull
@Override
public synchronized Session get()
{
Expand Down
2 changes: 1 addition & 1 deletion server-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sourceCompatibility = JavaVersion.VERSION_11
test {
useJUnitPlatform()
if (Os.isFamily(Os.FAMILY_MAC)) {
jvmArgs "-XX:+MaxFDLimit"
jvmArgs "-XX:-MaxFDLimit"
}
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
reports {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.cassandra.sidecar.common.response.RingResponse;
import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
import org.apache.cassandra.sidecar.common.server.data.Name;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -128,17 +126,4 @@ default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String tab
* @return the name of the cluster
*/
String clusterName();

/**
* Returns the set of natural replication endpoints in the local datacenter.
*
* @param tableNameList a list of qualified table names to use for retrieving the natural endpoints
* @param localDatacenter the name of the datacenter used to filter endpoints that are in the same datacenter
* @param tokens a set of tokens managed by the instance
* @return the set of natural replication endpoints in the local datacenter
*/
Set<String> naturalEndpointsInLocalDatacenter(List<QualifiedTableName> tableNameList,
String localDatacenter,
Set<String> tokens) throws UnknownHostException;

}
5 changes: 2 additions & 3 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ dependencies {
testImplementation('org.mockito:mockito-core:4.10.0')
testImplementation('org.mockito:mockito-inline:4.10.0')
testImplementation("io.vertx:vertx-junit5:${project.vertxVersion}")
// implementation(testFixtures(project(":client")))
testImplementation(testFixtures(project(":client-common")))
testImplementation(testFixtures(project(":server-common")))
testImplementation(testFixtures(project(":test-common")))
Expand Down Expand Up @@ -199,7 +198,7 @@ test {
// see the integrationTest task
useJUnitPlatform()
if (Os.isFamily(Os.FAMILY_MAC)) {
jvmArgs "-XX:+MaxFDLimit"
jvmArgs "-XX:-MaxFDLimit"
}
reports {
junitXml.setRequired(true)
Expand All @@ -222,7 +221,7 @@ tasks.register("containerTest", Test) {
jvmArgs(project.ext.JDK11_OPTIONS)
println("JVM arguments for $project.name are $allJvmArgs")
} else {
jvmArgs '-XX:+MaxFDLimit'
jvmArgs '-XX:-MaxFDLimit'
}

useJUnitPlatform()
Expand Down
Loading

0 comments on commit 6a72d73

Please sign in to comment.