Skip to content

CASSANDRA-20499: add metrics about apply, fileSize and throttle to hinted handoff #4021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/java/org/apache/cassandra/hints/HintVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
Expand Down Expand Up @@ -101,7 +102,16 @@ else if (!StorageProxy.instance.appliesLocally(hint.mutation))
else
{
// the common path - the node is both the destination and a valid replica for the hint.
hint.applyFuture().addCallback(o -> respond(message), e -> logger.debug("Failed to apply hint", e));
hint.applyFuture().addCallback(
o -> {
HintsServiceMetrics.hintsApplySucceeded.mark();
respond(message);
},
e -> {
HintsServiceMetrics.hintsApplyFailed.mark();
logger.debug("Failed to apply hint", e);
}
);
}
}

Expand Down
16 changes: 12 additions & 4 deletions src/java/org/apache/cassandra/hints/HintsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AbstractIterator;
Expand Down Expand Up @@ -231,8 +232,7 @@ private Hint computeNextInternal() throws IOException

private Hint readHint(int size) throws IOException
{
if (rateLimiter != null)
rateLimiter.acquire(size);
applyThrottleRateLimit(size);
input.limit(size);

Hint hint;
Expand Down Expand Up @@ -338,8 +338,7 @@ private ByteBuffer computeNextInternal() throws IOException

private ByteBuffer readBuffer(int size) throws IOException
{
if (rateLimiter != null)
rateLimiter.acquire(size);
applyThrottleRateLimit(size);
input.limit(size);

ByteBuffer buffer = Hint.serializer.readBufferIfLive(input, now, size, descriptor.messagingVersion());
Expand All @@ -364,4 +363,13 @@ private static boolean verifyAllZeros(ChecksummedDataInput input) throws IOExcep
}
return true;
}

private void applyThrottleRateLimit(int size)
{
if (rateLimiter != null)
{
rateLimiter.acquire(size);
HintsServiceMetrics.hintsThrottle.inc(size);
}
}
}
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/hints/HintsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,6 +94,8 @@ public final class HintsService implements HintsServiceMBean

public final HintedHandoffMetrics metrics;

public final HintsServiceMetrics hintsServiceMetrics;

private HintsService()
{
this(FailureDetector.instance);
Expand Down Expand Up @@ -125,6 +128,8 @@ private HintsService()
triggerCleanupFuture = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(cleanupTrigger, 1, 1, TimeUnit.HOURS);

metrics = new HintedHandoffMetrics();

hintsServiceMetrics = new HintsServiceMetrics(this);
}

private static ImmutableMap<String, Object> createDescriptorParams()
Expand Down Expand Up @@ -258,6 +263,17 @@ public long getTotalHintsSize(UUID hostId)
return store.getTotalFileSize();
}

/**
* Get the total hints file size of current node
*/
public long getTotalHintsSizeOfNode()
{
return catalog.stores()
.filter(Objects::nonNull)
.mapToLong(HintsStore::getTotalFileSize)
.sum();
}

/**
* Gracefully and blockingly shut down the service.
*
Expand Down
17 changes: 17 additions & 0 deletions src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.locator.InetAddressAndPort;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
Expand All @@ -39,9 +42,17 @@ public final class HintsServiceMetrics

private static final MetricNameFactory factory = new DefaultNameFactory(TYPE_NAME);

// About the delivery of hints
public static final Meter hintsSucceeded = Metrics.meter(factory.createMetricName("HintsSucceeded"));
public static final Meter hintsFailed = Metrics.meter(factory.createMetricName("HintsFailed"));
public static final Meter hintsTimedOut = Metrics.meter(factory.createMetricName("HintsTimedOut"));
public final Gauge<Long> hintsFileSize;
// Corresponding to the hinted_handoff_throttle_in_kb configuration
public static final Counter hintsThrottle = Metrics.counter(factory.createMetricName("HintsThrottle"));

// About the apply/replay of hints
public static final Meter hintsApplySucceeded = Metrics.meter(factory.createMetricName("HintsApplySucceeded"));
public static final Meter hintsApplyFailed = Metrics.meter(factory.createMetricName("HintsApplyFailed"));

/** Histogram of all hint delivery delays */
private static final Histogram globalDelayHistogram = Metrics.histogram(factory.createMetricName("Hint_delays"), false);
Expand All @@ -51,6 +62,12 @@ public final class HintsServiceMetrics
.executor(ImmediateExecutor.INSTANCE)
.build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false));

public HintsServiceMetrics(HintsService hintsService)
{
hintsFileSize = Metrics.register(factory.createMetricName("HintsFileSize"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the only purpose of putting hintsService into the constructor is to get access to getTotalHintsSizeOfNode on it. We might go with parameter-less constructor instead and do HintsService.instance::getTotalHintsSizeOfNode()

hintsService::getTotalHintsSizeOfNode);
}

public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay)
{
if (delay <= 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.cassandra.distributed.shared.Metrics;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.concurrent.Future;
Expand Down Expand Up @@ -108,15 +109,24 @@ public void testHintsServiceMetrics() throws Exception
dropWritesForNode2.set(true);
for (int i = 0; i < NUM_ROWS / 2; i++)
coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (?, ?)"), QUORUM, i, i);
// some hints have created for node1, so file size must be greater than 0
waitUntilAsserted(() -> assertThat(countHintsFileSize(node1)).isGreaterThan(0));
dropWritesForNode2.set(false);

// write the second half of the rows with the third node dropping mutations requests,
// so some hints will be created for that node
dropWritesForNode3.set(true);
for (int i = NUM_ROWS / 2; i < NUM_ROWS; i++)
coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (?, ?)"), QUORUM, i, i);
// another hints have created for node1, so file size must be greater than 0
waitUntilAsserted(() -> assertThat(countHintsFileSize(node1)).isGreaterThan(0));
dropWritesForNode3.set(false);

// Hints Throttle happens in the delivery process, so must be greater than 0
waitUntilAsserted(() -> assertThat(countHintsThrottle(node1)).isGreaterThan(0));
waitUntilAsserted(() -> assertThat(countHintsApplySucceeded(node1)).isEqualTo(0));
waitUntilAsserted(() -> assertThat(countHintsApplyFailed(node1)).isEqualTo(0));

// wait until all the hints have been successfully applied to the nodes that have been dropping mutations
waitUntilAsserted(() -> assertThat(countRows(node2)).isEqualTo(countRows(node3)).isEqualTo(NUM_ROWS));

Expand All @@ -142,6 +152,13 @@ public void testHintsServiceMetrics() throws Exception
assertThat(countHintsSucceeded(node)).isEqualTo(0);
assertThat(countHintsFailed(node)).isEqualTo(0);
assertThat(countHintsTimedOut(node)).isEqualTo(0);

assertThat(countHintsFileSize(node)).isEqualTo(0);
assertThat(countHintsThrottle(node)).isEqualTo(0);
// node two and three must apply these hints which belongs to them, so must be greater than 0
assertThat(countHintsApplySucceeded(node)).isGreaterThan(0);
assertThat(countHintsApplyFailed(node)).isEqualTo(0);

assertThat(countGlobalDelays(node)).isEqualTo(0);
cluster.forEach(target -> assertThat(countEndpointDelays(node, target)).isEqualTo(0));
}
Expand Down Expand Up @@ -180,6 +197,29 @@ private static Long countHintsTimedOut(IInvokableInstance node)
return node.callOnInstance(() -> HintsServiceMetrics.hintsTimedOut.getCount());
}

private static Long countHintsFileSize(IInvokableInstance node)
{
return node.callOnInstance(() -> HintsService.instance.hintsServiceMetrics.hintsFileSize.getValue());
}

@SuppressWarnings("Convert2MethodRef")
private static Long countHintsApplySucceeded(IInvokableInstance node)
{
return node.callOnInstance(() -> HintsServiceMetrics.hintsApplySucceeded.getCount());
}

@SuppressWarnings("Convert2MethodRef")
private static Long countHintsApplyFailed(IInvokableInstance node)
{
return node.callOnInstance(() -> HintsServiceMetrics.hintsApplyFailed.getCount());
}

@SuppressWarnings("Convert2MethodRef")
private static Long countHintsThrottle(IInvokableInstance node)
{
return node.callOnInstance(() -> HintsServiceMetrics.hintsThrottle.getCount());
}

private static Long countGlobalDelays(IInvokableInstance node)
{
return getHistogramCount(node, "org.apache.cassandra.metrics.HintsService.Hint_delays");
Expand Down