Skip to content

Commit 28e3d32

Browse files
authored
STAR-1340 Add more metrics for counters and batches (apache#453)
The patch adds 2 more metrics for counters: - the lock wait time, - the number of locks in a counter mutation. No tests for them as OSS doesn't test existing metrics either. It also adds the number of batches executed in the coordinator and tests them.
1 parent c006bfc commit 28e3d32

File tree

5 files changed

+354
-21
lines changed

5 files changed

+354
-21
lines changed

src/java/org/apache/cassandra/cql3/statements/BatchStatement.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.cassandra.db.Slice;
5656
import org.apache.cassandra.db.Slices;
5757
import org.apache.cassandra.db.partitions.PartitionUpdate;
58+
import org.apache.cassandra.db.rows.Row;
5859
import org.apache.cassandra.db.rows.RowIterator;
5960
import org.apache.cassandra.exceptions.InvalidRequestException;
6061
import org.apache.cassandra.exceptions.RequestExecutionException;
@@ -453,21 +454,25 @@ private void executeWithoutConditions(List<? extends IMutation> mutations,
453454
verifyBatchSize(mutations, queryState);
454455
verifyBatchType(mutations, queryState);
455456

456-
updatePartitionsPerBatchMetrics(mutations.size());
457+
updatePerBatchMetrics(mutations);
457458

458459
boolean mutateAtomic = (isLogged() && mutations.size() > 1);
459460
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, queryStartNanoTime, queryState.getClientState());
460461
}
461462

462-
private void updatePartitionsPerBatchMetrics(int updatedPartitions)
463+
private void updatePerBatchMetrics(Collection<? extends IMutation> mutations)
463464
{
464-
if (isLogged()) {
465-
metrics.partitionsPerLoggedBatch.update(updatedPartitions);
466-
} else if (isCounter()) {
467-
metrics.partitionsPerCounterBatch.update(updatedPartitions);
468-
} else {
469-
metrics.partitionsPerUnloggedBatch.update(updatedPartitions);
465+
int nrUpdatedPartitions = mutations.size();
466+
int nrUpdatedColumns = 0;
467+
for (IMutation mutation : mutations)
468+
{
469+
for (PartitionUpdate update : mutation.getPartitionUpdates())
470+
{
471+
for (Row row : update)
472+
nrUpdatedColumns += row.columns().size();
473+
}
470474
}
475+
metrics.update(type, nrUpdatedPartitions, nrUpdatedColumns);
471476
}
472477

473478
private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, long queryStartNanoTime)

src/java/org/apache/cassandra/db/CounterMutation.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.TimeUnit;
2323
import java.util.concurrent.locks.Lock;
2424

25+
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Function;
2627
import com.google.common.base.Objects;
2728
import com.google.common.collect.Iterables;
@@ -33,6 +34,7 @@
3334
import org.slf4j.LoggerFactory;
3435

3536
import com.codahale.metrics.Counter;
37+
import com.codahale.metrics.Histogram;
3638
import org.apache.cassandra.config.DatabaseDescriptor;
3739
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
3840
import org.apache.cassandra.db.rows.*;
@@ -45,6 +47,7 @@
4547
import org.apache.cassandra.io.util.DataOutputPlus;
4648
import org.apache.cassandra.locator.AbstractReplicationStrategy;
4749
import org.apache.cassandra.metrics.DefaultNameFactory;
50+
import org.apache.cassandra.metrics.LatencyMetrics;
4851
import org.apache.cassandra.schema.TableId;
4952
import org.apache.cassandra.service.CacheService;
5053
import org.apache.cassandra.tracing.Tracing;
@@ -66,8 +69,31 @@ public class CounterMutation implements IMutation
6669

6770
public static final CounterMutationSerializer serializer = new CounterMutationSerializer();
6871

72+
/**
73+
* This metric tracks the number of timeouts that occurred because the locks could not be
74+
* acquired within DatabaseDescriptor.getCounterWriteRpcTimeout().
75+
*/
6976
public static final Counter lockTimeout = Metrics.counter(DefaultNameFactory.createMetricName("Counter", "lock_timeout", null));
7077

78+
/**
79+
* This metric tracks how long it took to acquire all the locks
80+
* that must be acquired before applying the counter mutation.
81+
*/
82+
public static final LatencyMetrics lockAcquireTime = new LatencyMetrics("Counter", "lock_acquire_time");
83+
84+
/**
85+
* This metric tracks the number of locks that must be acquired before applying the counter
86+
* mutation. A mutation normally has one partition only, unless it comes from a batch,
87+
* where the same partition key is used across different tables.
88+
* For each partition, we need to acquire one lock for each column on each row.
89+
* The locks are striped, see {@link CounterMutation#LOCKS} for details.
90+
*/
91+
public static final Histogram locksPerUpdate = Metrics.histogram(DefaultNameFactory
92+
.createMetricName("Counter",
93+
"locks_per_update",
94+
null),
95+
false);
96+
7197
private static final String LOCK_TIMEOUT_MESSAGE = "Failed to acquire locks for counter mutation on keyspace {} for longer than {} millis, giving up";
7298
private static final String LOCK_TIMEOUT_TRACE = "Failed to acquire locks for counter mutation for longer than {} millis, giving up";
7399

@@ -164,32 +190,60 @@ public void apply()
164190
applyCounterMutation();
165191
}
166192

167-
private void grabCounterLocks(Keyspace keyspace, List<Lock> locks) throws WriteTimeoutException
193+
private int countDistinctLocks(Iterable<Lock> sortedLocks)
194+
{
195+
Lock prev = null;
196+
int counter = 0;
197+
for(Lock l: sortedLocks)
198+
{
199+
if (prev != l)
200+
counter++;
201+
prev = l;
202+
}
203+
return counter;
204+
}
205+
206+
@VisibleForTesting
207+
public void grabCounterLocks(Keyspace keyspace, List<Lock> locks) throws WriteTimeoutException
168208
{
169209
long startTime = System.nanoTime();
170210

171211
AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy();
172-
for (Lock lock : LOCKS.bulkGet(getCounterLockKeys()))
212+
Iterable<Lock> sortedLocks = LOCKS.bulkGet(getCounterLockKeys());
213+
locksPerUpdate.update(countDistinctLocks(sortedLocks));
214+
215+
try
173216
{
174-
long timeout = getTimeout(NANOSECONDS) - (System.nanoTime() - startTime);
175-
try
217+
for (Lock lock : sortedLocks)
176218
{
177-
if (!lock.tryLock(timeout, NANOSECONDS))
178-
handleLockTimeout(replicationStrategy);
179-
locks.add(lock);
180-
}
181-
catch (InterruptedException e)
182-
{
183-
handleLockTimeout(replicationStrategy);
219+
long timeout = getTimeout(NANOSECONDS) - (System.nanoTime() - startTime);
220+
try
221+
{
222+
if (!lock.tryLock(timeout, NANOSECONDS))
223+
handleLockTimeoutAndThrow(replicationStrategy);
224+
locks.add(lock);
225+
}
226+
catch (InterruptedException e)
227+
{
228+
handleLockTimeoutAndThrow(replicationStrategy);
229+
}
184230
}
185231
}
232+
finally
233+
{
234+
lockAcquireTime.addNano(System.nanoTime() - startTime);
235+
}
186236
}
187237

188-
private void handleLockTimeout(AbstractReplicationStrategy replicationStrategy)
238+
private void handleLockTimeoutAndThrow(AbstractReplicationStrategy replicationStrategy)
189239
{
190240
lockTimeout.inc();
191-
nospamLogger.error(LOCK_TIMEOUT_MESSAGE, getKeyspaceName(), DatabaseDescriptor.getCounterWriteRpcTimeout(MILLISECONDS));
241+
242+
nospamLogger.error(LOCK_TIMEOUT_MESSAGE,
243+
getKeyspaceName(),
244+
DatabaseDescriptor.getCounterWriteRpcTimeout(MILLISECONDS));
192245
Tracing.trace(LOCK_TIMEOUT_TRACE, DatabaseDescriptor.getCounterWriteRpcTimeout(MILLISECONDS));
246+
193247
throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(replicationStrategy));
194248
}
195249

src/java/org/apache/cassandra/metrics/BatchMetrics.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,64 @@
1717
*/
1818
package org.apache.cassandra.metrics;
1919

20+
import com.codahale.metrics.Counter;
2021
import com.codahale.metrics.Histogram;
22+
import org.apache.cassandra.cql3.statements.BatchStatement;
2123

2224
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
2325

2426
public class BatchMetrics
2527
{
2628
private static final MetricNameFactory factory = new DefaultNameFactory("Batch");
2729

30+
public final Counter numLoggedBatches;
31+
public final Counter numUnloggedBatches;
32+
public final Counter numCounterBatches;
33+
2834
public final Histogram partitionsPerLoggedBatch;
2935
public final Histogram partitionsPerUnloggedBatch;
3036
public final Histogram partitionsPerCounterBatch;
3137

38+
public final Histogram columnsPerLoggedBatch;
39+
public final Histogram columnsPerUnloggedBatch;
40+
public final Histogram columnsPerCounterBatch;
41+
3242
public BatchMetrics()
3343
{
44+
numLoggedBatches = Metrics.counter(factory.createMetricName("NumLoggedBatches"));
45+
numUnloggedBatches = Metrics.counter(factory.createMetricName("NumUnloggedBatches"));
46+
numCounterBatches = Metrics.counter(factory.createMetricName("NumCounterBatches"));
47+
3448
partitionsPerLoggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerLoggedBatch"), false);
3549
partitionsPerUnloggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerUnloggedBatch"), false);
3650
partitionsPerCounterBatch = Metrics.histogram(factory.createMetricName("PartitionsPerCounterBatch"), false);
51+
52+
columnsPerLoggedBatch = Metrics.histogram(factory.createMetricName("ColumnsPerLoggedBatch"), false);
53+
columnsPerUnloggedBatch = Metrics.histogram(factory.createMetricName("ColumnsPerUnloggedBatch"), false);
54+
columnsPerCounterBatch = Metrics.histogram(factory.createMetricName("ColumnsPerCounterBatch"), false);
55+
}
56+
57+
public void update(BatchStatement.Type batchType, int updatedPartitions, int updatedColumns)
58+
{
59+
switch (batchType)
60+
{
61+
case LOGGED:
62+
numLoggedBatches.inc();
63+
partitionsPerLoggedBatch.update(updatedPartitions);
64+
columnsPerLoggedBatch.update(updatedColumns);
65+
break;
66+
case COUNTER:
67+
numCounterBatches.inc();
68+
partitionsPerCounterBatch.update(updatedPartitions);
69+
columnsPerCounterBatch.update(updatedColumns);
70+
break;
71+
case UNLOGGED:
72+
numUnloggedBatches.inc();
73+
partitionsPerUnloggedBatch.update(updatedPartitions);
74+
columnsPerUnloggedBatch.update(updatedColumns);
75+
break;
76+
default:
77+
throw new IllegalStateException("Unexpected batch type: " + batchType);
78+
}
3779
}
3880
}

test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,28 @@ private void assertMetrics(BatchStatement.Type batchTypeTested, int[] rounds, in
151151
long partitionsPerCounterBatchCountPre = metrics.partitionsPerCounterBatch.getCount();
152152
long expectedPartitionsPerCounterBatchCount = partitionsPerCounterBatchCountPre + (batchTypeTested == BatchStatement.Type.COUNTER ? 1 : 0);
153153

154+
long columnsPerLoggedBatchCountPre = metrics.columnsPerLoggedBatch.getCount();
155+
long expectedColumnsPerLoggedBatchCount = columnsPerLoggedBatchCountPre + (batchTypeTested == BatchStatement.Type.LOGGED ? 1 : 0);
156+
long columnsPerUnloggedBatchCountPre = metrics.columnsPerUnloggedBatch.getCount();
157+
long expectedColumnsPerUnloggedBatchCount = columnsPerUnloggedBatchCountPre + (batchTypeTested == BatchStatement.Type.UNLOGGED ? 1 : 0);
158+
long columnsPerCounterBatchCountPre = metrics.columnsPerCounterBatch.getCount();
159+
long expectedColumnsPerCounterBatchCount = columnsPerCounterBatchCountPre + (batchTypeTested == BatchStatement.Type.COUNTER ? 1 : 0);
160+
154161
executeLoggerBatch(batchTypeTested, distinctPartitions, rounds[ix]);
155162

156163
assertEquals(expectedPartitionsPerUnloggedBatchCount, metrics.partitionsPerUnloggedBatch.getCount());
157164
assertEquals(expectedPartitionsPerLoggedBatchCount, metrics.partitionsPerLoggedBatch.getCount());
158165
assertEquals(expectedPartitionsPerCounterBatchCount, metrics.partitionsPerCounterBatch.getCount());
166+
assertEquals(expectedColumnsPerUnloggedBatchCount, metrics.columnsPerUnloggedBatch.getCount());
167+
assertEquals(expectedColumnsPerLoggedBatchCount, metrics.columnsPerLoggedBatch.getCount());
168+
assertEquals(expectedColumnsPerCounterBatchCount, metrics.columnsPerCounterBatch.getCount());
159169

160170
EstimatedHistogramReservoirSnapshot partitionsPerLoggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.partitionsPerLoggedBatch.getSnapshot();
161171
EstimatedHistogramReservoirSnapshot partitionsPerUnloggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.partitionsPerUnloggedBatch.getSnapshot();
162172
EstimatedHistogramReservoirSnapshot partitionsPerCounterBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.partitionsPerCounterBatch.getSnapshot();
173+
EstimatedHistogramReservoirSnapshot columnsPerLoggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.columnsPerLoggedBatch.getSnapshot();
174+
EstimatedHistogramReservoirSnapshot columnsPerUnloggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.columnsPerUnloggedBatch.getSnapshot();
175+
EstimatedHistogramReservoirSnapshot columnsPerCounterBatchSnapshot = (EstimatedHistogramReservoirSnapshot) metrics.columnsPerCounterBatch.getSnapshot();
163176

164177
// BatchMetrics uses DecayingEstimatedHistogramReservoir which notes that the return of getMax()
165178
// may be more than the actual max value recorded in the reservoir with similar but reverse properties
@@ -174,10 +187,22 @@ private void assertMetrics(BatchStatement.Type batchTypeTested, int[] rounds, in
174187
Range expectedPartitionsPerCounterBatchMinMax = batchTypeTested == BatchStatement.Type.COUNTER ?
175188
determineExpectedMinMax(partitionsPerCounterBatchSnapshot, distinctPartitions) :
176189
new Range(0L, 0L);
190+
Range expectedColumnsPerLoggedBatchMinMax = batchTypeTested == BatchStatement.Type.LOGGED ?
191+
determineExpectedMinMax(columnsPerLoggedBatchSnapshot, distinctPartitions) :
192+
new Range(0L, 0L);
193+
Range expectedColumnsPerUnloggedBatchMinMax = batchTypeTested == BatchStatement.Type.UNLOGGED ?
194+
determineExpectedMinMax(columnsPerUnloggedBatchSnapshot, distinctPartitions) :
195+
new Range(0L, 0L);
196+
Range expectedColumnsPerCounterBatchMinMax = batchTypeTested == BatchStatement.Type.COUNTER ?
197+
determineExpectedMinMax(columnsPerCounterBatchSnapshot, distinctPartitions) :
198+
new Range(0L, 0L);
177199

178200
assertEquals(expectedPartitionsPerLoggedBatchMinMax, new Range(partitionsPerLoggedBatchSnapshot.getMin(), partitionsPerLoggedBatchSnapshot.getMax()));
179201
assertEquals(expectedPartitionsPerUnloggedBatchMinMax, new Range(partitionsPerUnloggedBatchSnapshot.getMin(), partitionsPerUnloggedBatchSnapshot.getMax()));
180202
assertEquals(expectedPartitionsPerCounterBatchMinMax, new Range(partitionsPerCounterBatchSnapshot.getMin(), partitionsPerCounterBatchSnapshot.getMax()));
203+
assertEquals(expectedColumnsPerLoggedBatchMinMax, new Range(columnsPerLoggedBatchSnapshot.getMin(), columnsPerLoggedBatchSnapshot.getMax()));
204+
assertEquals(expectedColumnsPerUnloggedBatchMinMax, new Range(columnsPerUnloggedBatchSnapshot.getMin(), columnsPerUnloggedBatchSnapshot.getMax()));
205+
assertEquals(expectedColumnsPerCounterBatchMinMax, new Range(columnsPerCounterBatchSnapshot.getMin(), columnsPerCounterBatchSnapshot.getMax()));
181206
}
182207
}
183208

@@ -186,6 +211,9 @@ private void clearHistogram()
186211
((ClearableHistogram) metrics.partitionsPerLoggedBatch).clear();
187212
((ClearableHistogram) metrics.partitionsPerUnloggedBatch).clear();
188213
((ClearableHistogram) metrics.partitionsPerCounterBatch).clear();
214+
((ClearableHistogram) metrics.columnsPerLoggedBatch).clear();
215+
((ClearableHistogram) metrics.columnsPerUnloggedBatch).clear();
216+
((ClearableHistogram) metrics.columnsPerCounterBatch).clear();
189217
}
190218

191219
private Range determineExpectedMinMax(EstimatedHistogramReservoirSnapshot snapshot, long value)

0 commit comments

Comments
 (0)