Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,48 @@ public enum DefaultDriverOption implements DriverOption {
* <p>Value type: {@link java.util.List List}&#60;{@link String}&#62;
*/
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS(
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs");
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs"),

/**
* The largest latency that we expect to record for requests.
*
* <p>Value-type: {@link java.time.Duration Duration}
*/
METRICS_SESSION_SEND_LATENCY_HIGHEST("advanced.metrics.session.send-latency.highest-latency"),
/**
* The shortest latency that we expect to record for requests.
*
* <p>Value-type: {@link java.time.Duration Duration}
*/
METRICS_SESSION_SEND_LATENCY_LOWEST("advanced.metrics.session.send-latency.lowest-latency"),
/**
* The number of significant decimal digits to which internal structures will maintain for
* requests.
*
* <p>Value-type: int
*/
METRICS_SESSION_SEND_LATENCY_DIGITS("advanced.metrics.session.send-latency.significant-digits"),
/**
* The interval at which percentile data is refreshed for requests.
*
* <p>Value-type: {@link java.time.Duration Duration}
*/
METRICS_SESSION_SEND_LATENCY_INTERVAL("advanced.metrics.session.send-latency.refresh-interval"),
/**
* Optional service-level objectives to meet, as a list of latencies to track.
*
* <p>Value-type: List of {@link java.time.Duration Duration}
*/
METRICS_SESSION_SEND_LATENCY_SLO("advanced.metrics.session.send-latency.slo"),
/**
* Optional list of percentiles to publish for send-latency metric. Produces an additional time
* series for each requested percentile. This percentile is computed locally, and so can't be
* aggregated with percentiles computed across other dimensions (e.g. in a different instance).
*
* <p>Value type: {@link java.util.List List}&#60;{@link Double}&#62;
*/
METRICS_SESSION_SEND_LATENCY_PUBLISH_PERCENTILES(
"advanced.metrics.session.send-latency.publish-percentiles");

private final String path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.METRICS_SESSION_CQL_REQUESTS_LOWEST, Duration.ofMillis(1));
map.put(TypedDriverOption.METRICS_SESSION_CQL_REQUESTS_DIGITS, 3);
map.put(TypedDriverOption.METRICS_SESSION_CQL_REQUESTS_INTERVAL, Duration.ofMinutes(5));
map.put(TypedDriverOption.METRICS_SESSION_SEND_LATENCY_HIGHEST, Duration.ofSeconds(3));
map.put(TypedDriverOption.METRICS_SESSION_SEND_LATENCY_LOWEST, Duration.ofMillis(1));
map.put(TypedDriverOption.METRICS_SESSION_SEND_LATENCY_DIGITS, 3);
map.put(TypedDriverOption.METRICS_SESSION_SEND_LATENCY_INTERVAL, Duration.ofMinutes(5));
map.put(TypedDriverOption.METRICS_SESSION_THROTTLING_HIGHEST, Duration.ofSeconds(3));
map.put(TypedDriverOption.METRICS_SESSION_THROTTLING_LOWEST, Duration.ofMillis(1));
map.put(TypedDriverOption.METRICS_SESSION_THROTTLING_DIGITS, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,37 @@ public String toString() {
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS,
GenericType.listOf(String.class));

/** The largest latency that we expect to record for requests. */
public static final TypedDriverOption<Duration> METRICS_SESSION_SEND_LATENCY_HIGHEST =
new TypedDriverOption<>(
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_HIGHEST, GenericType.DURATION);
/** The shortest latency that we expect to record for requests. */
public static final TypedDriverOption<Duration> METRICS_SESSION_SEND_LATENCY_LOWEST =
new TypedDriverOption<>(
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_LOWEST, GenericType.DURATION);
/** Optional service-level objectives to meet, as a list of latencies to track. */
public static final TypedDriverOption<List<Duration>> METRICS_SESSION_SEND_LATENCY_SLO =
new TypedDriverOption<>(
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_SLO,
GenericType.listOf(GenericType.DURATION));
/** Optional pre-defined percentile of send-latency to publish, as a list of percentiles . */
public static final TypedDriverOption<List<Double>>
METRICS_SESSION_SEND_LATENCY_PUBLISH_PERCENTILES =
new TypedDriverOption<>(
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_PUBLISH_PERCENTILES,
GenericType.listOf(GenericType.DOUBLE));
/**
* The number of significant decimal digits to which internal structures will maintain for
* requests.
*/
public static final TypedDriverOption<Integer> METRICS_SESSION_SEND_LATENCY_DIGITS =
new TypedDriverOption<>(
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_DIGITS, GenericType.INTEGER);
/** The interval at which percentile data is refreshed for requests. */
public static final TypedDriverOption<Duration> METRICS_SESSION_SEND_LATENCY_INTERVAL =
new TypedDriverOption<>(
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_INTERVAL, GenericType.DURATION);

private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
try {
ImmutableList.Builder<TypedDriverOption<?>> result = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum DefaultSessionMetric implements SessionMetric {
THROTTLING_QUEUE_SIZE("throttling.queue-size"),
THROTTLING_ERRORS("throttling.errors"),
CQL_PREPARED_CACHE_SIZE("cql-prepared-cache-size"),
SEND_LATENCY("send-latency"),
;

private static final Map<String, DefaultSessionMetric> BY_PATH = sortByPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ private void write(ChannelHandlerContext ctx, RequestMessage message, ChannelPro
message.request);

inFlight.put(streamId, message.responseCallback);
message.responseCallback.onRequestSent(frame);
ChannelFuture writeFuture = ctx.write(frame, promise);
writeFuture.addListener(
future -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ public interface ResponseCallback {
*/
void onFailure(Throwable error);

/**
* Reports the request frame to be sent on the current connection.
*
* <p>This is called every time just before the request is written to a connection (and therefore
* might multiple times in case of retries).
*
* <p>The default implementation does nothing.
*/
default void onRequestSent(Frame frame) {
// nothing to do
}

/**
* Reports the stream id used for the request on the current connection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ private class NodeResponseCallback
implements ResponseCallback, GenericFutureListener<Future<java.lang.Void>> {

private final long nodeStartTimeNanos = System.nanoTime();
private long sendStartTimeNanos = NANOTIME_NOT_MEASURED_YET;
private final Statement<?> statement;
private final Node node;
private final Queue<Node> queryPlan;
Expand Down Expand Up @@ -491,7 +492,15 @@ private NodeResponseCallback(
this.logPrefix = logPrefix + "|" + execution;
}

// this gets invoked once the write completes.
@Override
public void onRequestSent(Frame frame) {
if (sessionMetricUpdater.isEnabled(
DefaultSessionMetric.SEND_LATENCY, executionProfile.getName())) {
sendStartTimeNanos = System.nanoTime();
}
}

// this gets invoked once the write request completes.
@Override
public void operationComplete(Future<java.lang.Void> future) throws Exception {
if (!future.isSuccess()) {
Expand Down Expand Up @@ -521,6 +530,14 @@ public void operationComplete(Future<java.lang.Void> future) throws Exception {
}
} else {
LOG.trace("[{}] Request sent on {}", logPrefix, channel);
if (sendStartTimeNanos != NANOTIME_NOT_MEASURED_YET) {
// only if send latency metric is enabled
sessionMetricUpdater.updateTimer(
DefaultSessionMetric.SEND_LATENCY,
executionProfile.getName(),
System.nanoTime() - sendStartTimeNanos,
TimeUnit.NANOSECONDS);
}
if (result.isDone()) {
// If the handler completed since the last time we checked, cancel directly because we
// don't know if cancelScheduledTasks() has run yet
Expand Down Expand Up @@ -729,7 +746,7 @@ private void processErrorResponse(Error errorMessage) {
trackNodeError(node, illegalStateException, NANOTIME_NOT_MEASURED_YET);
setFinalError(statement, illegalStateException, node, execution);
}
LOG.trace("[{}] Reprepare sucessful, retrying", logPrefix);
LOG.trace("[{}] Reprepare successful, retrying", logPrefix);
sendRequest(statement, node, queryPlan, execution, retryCount, false);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public DropwizardSessionMetricUpdater(
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_HIGHEST,
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_DIGITS,
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_INTERVAL);
initializeHdrTimer(
DefaultSessionMetric.SEND_LATENCY,
profile,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_HIGHEST,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_DIGITS,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_INTERVAL);
initializeHdrTimer(
DefaultSessionMetric.THROTTLING_DELAY,
profile,
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,9 @@ datastax-java-driver {
# retry.
// cql-requests,

# The throughput and latency percentiles of requests sent over the network (exposed as a Timer).
// send-latency,

# The number of CQL requests that timed out -- that is, the session.execute() call failed
# with a DriverTimeoutException (exposed as a Counter).
// cql-client-timeouts,
Expand Down Expand Up @@ -1625,6 +1628,18 @@ datastax-java-driver {
// publish-percentiles = [ 0.75, 0.95, 0.99 ]
}

# Required: if the 'send-latency' metric is enabled, and Dropwizard or Micrometer is used.
# Modifiable at runtime: no
# Overridable in a profile: no
send-latency {
highest-latency = 3 seconds
lowest-latency = 1 millisecond
significant-digits = 3
refresh-interval = 5 minutes
// slo = [ 100 milliseconds, 500 milliseconds, 1 second ]
// publish-percentiles = [ 0.75, 0.95, 0.99 ]
}

# Required: if the 'throttling.delay' metric is enabled, and Dropwizard or Micrometer is used.
# Modifiable at runtime: no
# Overridable in a profile: no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ protected void assertMetricsPresent(CqlSession session) {
assertThat((Integer) ((Gauge<?>) m).getValue()).isEqualTo(3);
break;
case CQL_REQUESTS:
case SEND_LATENCY:
assertThat(m).isInstanceOf(Timer.class);
await().untilAsserted(() -> assertThat(((Timer) m).getCount()).isEqualTo(30));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -77,6 +78,9 @@ protected void assertMetricsPresent(CqlSession session) {
MetricIdGenerator metricIdGenerator =
((InternalDriverContext) session.getContext()).getMetricIdGenerator();

double cqlRequestsTotal = 0;
double sendLatencyTotal = 0;

for (DefaultSessionMetric metric : ENABLED_SESSION_METRICS) {
MetricId id = metricIdGenerator.sessionMetricId(metric);
Iterable<Tag> tags = MicrometerTags.toMicrometerTags(id.getTags());
Expand All @@ -89,7 +93,15 @@ protected void assertMetricsPresent(CqlSession session) {
break;
case CQL_REQUESTS:
assertThat(m).isInstanceOf(Timer.class);
await().untilAsserted(() -> assertThat(((Timer) m).count()).isEqualTo(30));
Timer tr = (Timer) m;
await().untilAsserted(() -> assertThat(tr.count()).isEqualTo(30));
cqlRequestsTotal = tr.totalTime(TimeUnit.NANOSECONDS);
break;
case SEND_LATENCY:
assertThat(m).isInstanceOf(Timer.class);
Timer tl = (Timer) m;
await().untilAsserted(() -> assertThat(tl.count()).isEqualTo(30));
sendLatencyTotal = tl.totalTime(TimeUnit.NANOSECONDS);
break;
case CQL_PREPARED_CACHE_SIZE:
assertThat(m).isInstanceOf(Gauge.class);
Expand All @@ -116,6 +128,8 @@ protected void assertMetricsPresent(CqlSession session) {
}
}

assertThat(sendLatencyTotal).isLessThanOrEqualTo(cqlRequestsTotal);

for (Node node : session.getMetadata().getNodes().values()) {

for (DefaultNodeMetric metric : ENABLED_NODE_METRICS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.datastax.oss.driver.internal.metrics.microprofile.MicroProfileTags;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import io.smallrye.metrics.MetricsRegistryImpl;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.microprofile.metrics.Counter;
Expand Down Expand Up @@ -81,6 +82,9 @@ protected void assertMetricsPresent(CqlSession session) {
MetricIdGenerator metricIdGenerator =
((InternalDriverContext) session.getContext()).getMetricIdGenerator();

Duration cqlRequestsDuration = null;
Duration sendLatencyDuration = null;

for (DefaultSessionMetric metric : ENABLED_SESSION_METRICS) {
MetricId metricId = metricIdGenerator.sessionMetricId(metric);
Tag[] tags = MicroProfileTags.toMicroProfileTags(metricId.getTags());
Expand All @@ -94,7 +98,15 @@ protected void assertMetricsPresent(CqlSession session) {
break;
case CQL_REQUESTS:
assertThat(m).isInstanceOf(Timer.class);
await().untilAsserted(() -> assertThat(((Timer) m).getCount()).isEqualTo(30));
Timer tr = (Timer) m;
await().untilAsserted(() -> assertThat(tr.getCount()).isEqualTo(30));
cqlRequestsDuration = tr.getElapsedTime();
break;
case SEND_LATENCY:
assertThat(m).isInstanceOf(Timer.class);
Timer tl = (Timer) m;
await().untilAsserted(() -> assertThat(tl.getCount()).isEqualTo(30));
sendLatencyDuration = tl.getElapsedTime();
break;
case CQL_PREPARED_CACHE_SIZE:
assertThat(m).isInstanceOf(Gauge.class);
Expand All @@ -121,6 +133,10 @@ protected void assertMetricsPresent(CqlSession session) {
}
}

assertThat(sendLatencyDuration).isNotNull();
assertThat(cqlRequestsDuration).isNotNull();
assertThat(sendLatencyDuration.toNanos()).isLessThanOrEqualTo(cqlRequestsDuration.toNanos());

for (Node node : session.getMetadata().getNodes().values()) {

for (DefaultNodeMetric metric : ENABLED_NODE_METRICS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public MicrometerSessionMetricUpdater(

initializeTimer(DefaultSessionMetric.CQL_REQUESTS, profile);
initializeTimer(DefaultSessionMetric.THROTTLING_DELAY, profile);
initializeTimer(DefaultSessionMetric.SEND_LATENCY, profile);
initializeTimer(DseSessionMetric.CONTINUOUS_CQL_REQUESTS, profile);
initializeTimer(DseSessionMetric.GRAPH_REQUESTS, profile);
}
Expand Down Expand Up @@ -149,6 +150,25 @@ protected Timer.Builder configureTimer(Timer.Builder builder, SessionMetric metr

configurePercentilesPublishIfDefined(
builder, profile, DseDriverOption.METRICS_SESSION_GRAPH_REQUESTS_PUBLISH_PERCENTILES);
} else if (metric == DefaultSessionMetric.SEND_LATENCY) {
builder
.minimumExpectedValue(
profile.getDuration(DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_LOWEST))
.maximumExpectedValue(
profile.getDuration(DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_HIGHEST))
.serviceLevelObjectives(
profile.isDefined(DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_SLO)
? profile
.getDurationList(DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_SLO)
.toArray(new Duration[0])
: null)
.percentilePrecision(
profile.isDefined(DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_DIGITS)
? profile.getInt(DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_DIGITS)
: null);

configurePercentilesPublishIfDefined(
builder, profile, DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_PUBLISH_PERCENTILES);
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ public static Object[][] timerMetrics() {
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_SLO,
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_PUBLISH_PERCENTILES,
},
{
DefaultSessionMetric.SEND_LATENCY,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_LOWEST,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_HIGHEST,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_DIGITS,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_SLO,
DefaultDriverOption.METRICS_SESSION_SEND_LATENCY_PUBLISH_PERCENTILES,
},
{
DseSessionMetric.GRAPH_REQUESTS,
DseDriverOption.METRICS_SESSION_GRAPH_REQUESTS_LOWEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public MicroProfileSessionMetricUpdater(

initializeTimer(DefaultSessionMetric.CQL_REQUESTS, profile);
initializeTimer(DefaultSessionMetric.THROTTLING_DELAY, profile);
initializeTimer(DefaultSessionMetric.SEND_LATENCY, profile);
initializeTimer(DseSessionMetric.CONTINUOUS_CQL_REQUESTS, profile);
initializeTimer(DseSessionMetric.GRAPH_REQUESTS, profile);
}
Expand Down