Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSSIDECAR-206: Add Peer Health Monitor #189

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.0.0
-----
* Add Token Ring Peer Health monitor (CASSSIDECAR-206)
* Adapt to cluster topology change for restore jobs (CASSSIDECAR-185)
* Fix PeriodicTaskExecutor double execution due to race from reschedule (CASSSIDECAR-210)
* Upgrade Netty to 4.1.118.Final and Vert.x to 4.5.13 Version (CASSSIDECAR-207)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.cassandra.sidecar.client;
package org.apache.cassandra.sidecar.common.client;

/**
* Holds information about the Cassandra Sidecar Instance host and port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.cassandra.sidecar.client;
package org.apache.cassandra.sidecar.common.client;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.concurrent.CompletableFuture;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;

/**
* An interface to represent an HTTP client
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;

/**
* Represents the HTTP response received from the remote Sidecar service
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;

/**
* A simple implementation of the {@link HttpResponse}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.request.CassandraJmxHealthRequest;
import org.apache.cassandra.sidecar.common.request.CassandraNativeHealthRequest;
import org.apache.cassandra.sidecar.common.request.CleanSSTableUploadSessionRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.request.Request;
import org.apache.cassandra.sidecar.common.request.ResponseBytesDecoder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.cassandra.sidecar.client.retry.RunnableOnStatusCodeRetryPolicy;
import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.RandomInstanceSelectionPolicy;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.request.AbortRestoreJobRequest;
import org.apache.cassandra.sidecar.common.request.CreateRestoreJobRequest;
import org.apache.cassandra.sidecar.common.request.CreateRestoreJobSliceRequest;
Expand Down Expand Up @@ -224,6 +225,7 @@ public CompletableFuture<GossipInfoResponse> gossipInfo()

/**
* Executes the GET gossip health request using the default retry policy and configured selection policy
*
* @param instance the instance where the request will be executed
* @return a completable future with gossip health response
*/
Expand Down Expand Up @@ -511,37 +513,38 @@ public CompletableFuture<Void> cleanUploadSession(SidecarInstance instance, Stri

/**
* Lists CDC commit logs in CDC directory for an instance
*
* @param sidecarInstance instance on which the CDC commit logs are to be listed
* @return a completable future with List of cdc commitLogs on the requested instance
*/
public CompletableFuture<ListCdcSegmentsResponse> listCdcSegments(SidecarInstance sidecarInstance)
{
return executor.executeRequestAsync(requestBuilder()
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new ListCdcSegmentsRequest())
.build());
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new ListCdcSegmentsRequest())
.build());
}

/**
* Streams CDC commit log segments from the requested instance.
*
* <p>
* Streams the specified {@code range} of a CDC CommitLog from the given instance and the
* stream is consumed by the {@link StreamConsumer consumer}.
*
* @param sidecarInstance instance on which the CDC commit logs are to be streamed
* @param segment segment file name
* @param range range of the file to be streamed
* @param streamConsumer object that consumes the stream
* @param segment segment file name
* @param range range of the file to be streamed
* @param streamConsumer object that consumes the stream
*/
public void streamCdcSegments(SidecarInstance sidecarInstance,
String segment,
HttpRange range,
StreamConsumer streamConsumer)
{
executor.streamRequest(requestBuilder()
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new StreamCdcSegmentRequest(segment, range))
.build(), streamConsumer);
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new StreamCdcSegmentRequest(segment, range))
.build(), streamConsumer);
}

/**
Expand Down Expand Up @@ -681,6 +684,7 @@ public CompletableFuture<ListOperationalJobsResponse> listOperationalJobs(Sideca

/**
* Executes the streams stats request using the default retry policy and configured selection policy
*
* @param instance the instance where the request will be executed
* @return a completable future of the connected client stats
*/
Expand All @@ -694,6 +698,7 @@ public CompletableFuture<StreamStatsResponse> streamsStats(SidecarInstance insta

/**
* Executes the node decommission request using the default retry policy and configured selection policy
*
* @param instance the instance where the request will be executed
* @return a completable future of the jobs list
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.request.data.AbortRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.CreateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;

/**
* A class that provides the list of {@link SidecarInstance}s. This class allows for statically or dynamically
* providing a list of instances. It is meant to support expansions and shrink of Cassandra clusters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Collections;
import java.util.List;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;

/**
* A {@link SidecarInstancesProvider} that returns Sidecar instances from a list of instances.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.cassandra.sidecar.client.selection;

import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;

/**
* Defines the selection policy for an instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.util.Iterator;

import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.jetbrains.annotations.NotNull;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.jetbrains.annotations.NotNull;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.jetbrains.annotations.NotNull;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.cassandra.sidecar.client;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;

/**
* Unit tests for the {@link SidecarInstanceImpl} class
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.apache.cassandra.sidecar.common.client.SidecarInstance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.junit.jupiter.api.Test;

import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.cassandra.sidecar.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.cassandra.sidecar.client.retry.RetryAction;
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import org.apache.cassandra.sidecar.common.client.SidecarInstance;
import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import org.apache.cassandra.sidecar.client.HttpClientConfig;
import org.apache.cassandra.sidecar.client.RequestContext;
import org.apache.cassandra.sidecar.client.RequestExecutor;
import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider;
import org.apache.cassandra.sidecar.client.retry.NoRetryPolicy;
import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.OrderedInstanceSelectionPolicy;
import org.apache.cassandra.sidecar.client.selection.RandomInstanceSelectionPolicy;
import org.apache.cassandra.sidecar.common.client.SidecarInstanceImpl;
import org.jetbrains.annotations.NotNull;

/**
Expand Down
30 changes: 30 additions & 0 deletions conf/sidecar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,36 @@ healthcheck:
initial_delay: 0ms
execute_interval: 30s

# Sidecar Peer Health Monitor settings
# Enables a periodic task checking for the health of adjacent Sidecar peers in the token ring
sidecar_peer_health:
enabled: true
execute_interval: 30s
health_check_retries: 5
health_check_retry_delay: 10s

# Sidecar client settings used to interact with other sidecars
sidecar_client:
use_ssl: true
request_timeout: 1s
request_idle_timeout: 1s
connection_pool_max_size: 10
connection_pool_clearing_period: 10s
connection_pool_event_loop_size: 10
connection_pool_max_wait_queue_size: 10
max_retries: 3
retry_delay: 1s
max_retry_delay: 2s
ssl:
keystore:
type: PKCS12
path: path/to/keystore.p12
password: password
truststore:
type: PKCS12
path: path/to/keystore.p12
password: password

metrics:
registry_name: cassandra_sidecar
vertx:
Expand Down
Loading