Skip to content

Commit 8d83fd1

Browse files
committed
feat: add metrics for chunk transform/detransform latency
1 parent 7c2ebc9 commit 8d83fd1

File tree

4 files changed

+128
-9
lines changed

4 files changed

+128
-9
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataBuilder;
5656
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField;
5757
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataSerde;
58+
import io.aiven.kafka.tieredstorage.metrics.MeteredInputStream;
5859
import io.aiven.kafka.tieredstorage.metrics.Metrics;
5960
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
6061
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
@@ -287,7 +288,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
287288
final SegmentCustomMetadataBuilder customMetadataBuilder)
288289
throws IOException, StorageBackendException {
289290
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
290-
try (final var sis = transformFinisher.toInputStream()) {
291+
try (
292+
final var sis = new MeteredInputStream(
293+
transformFinisher.toInputStream(),
294+
time,
295+
millis -> metrics.recordChunkTransformTime(
296+
remoteLogSegmentMetadata.topicIdPartition().topicPartition(),
297+
millis
298+
))
299+
) {
291300
final var bytes = uploader.upload(sis, fileKey);
292301
metrics.recordObjectUpload(
293302
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
@@ -318,7 +327,15 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
318327

319328
final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
320329
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
321-
try (final var in = transformFinisher.toInputStream()) {
330+
try (
331+
final var in = new MeteredInputStream(
332+
transformFinisher.toInputStream(),
333+
time,
334+
millis -> metrics.recordChunkTransformTime(
335+
remoteLogSegmentMetadata.topicIdPartition().topicPartition(),
336+
millis
337+
))
338+
) {
322339
final var bytes = uploader.upload(in, key);
323340
metrics.recordObjectUpload(
324341
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
@@ -382,8 +399,14 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
382399
final var suffix = ObjectKey.Suffix.LOG;
383400
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);
384401

385-
return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
386-
.toInputStream();
402+
return new MeteredInputStream(
403+
new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range).toInputStream(),
404+
time,
405+
millis -> metrics.recordChunkDetransformTime(
406+
remoteLogSegmentMetadata.topicIdPartition().topicPartition(),
407+
millis
408+
)
409+
);
387410
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
388411
throw new RemoteResourceNotFoundException(e);
389412
} catch (final Exception e) {

core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

38+
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.CHUNK_DETRANSFORM_TIME;
39+
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.CHUNK_TRANSFORM_TIME;
3840
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD;
3941
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD_BYTES;
4042
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_TIME;
@@ -210,6 +212,46 @@ private void recordSegmentFetchRequestedBytes(final TopicPartition topicPartitio
210212
.record(bytes);
211213
}
212214

215+
public void recordChunkTransformTime(final TopicPartition topicPartition, final long millis) {
216+
new SensorProvider(metrics, sensorName(CHUNK_TRANSFORM_TIME))
217+
.with(metricsRegistry.chunkTransformTimeAvg, new Avg())
218+
.with(metricsRegistry.chunkTransformTimeMax, new Max())
219+
.get()
220+
.record(millis);
221+
new SensorProvider(metrics, sensorNameByTopic(topicPartition, CHUNK_TRANSFORM_TIME),
222+
() -> topicTags(topicPartition))
223+
.with(metricsRegistry.chunkTransformTimeAvgByTopic, new Avg())
224+
.with(metricsRegistry.chunkTransformTimeMaxByTopic, new Max())
225+
.get()
226+
.record(millis);
227+
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, CHUNK_TRANSFORM_TIME),
228+
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
229+
.with(metricsRegistry.chunkTransformTimeAvgByTopicPartition, new Avg())
230+
.with(metricsRegistry.chunkTransformTimeMaxByTopicPartition, new Max())
231+
.get()
232+
.record(millis);
233+
}
234+
235+
public void recordChunkDetransformTime(final TopicPartition topicPartition, final long millis) {
236+
new SensorProvider(metrics, sensorName(CHUNK_DETRANSFORM_TIME))
237+
.with(metricsRegistry.chunkDetransformTimeAvg, new Avg())
238+
.with(metricsRegistry.chunkDetransformTimeMax, new Max())
239+
.get()
240+
.record(millis);
241+
new SensorProvider(metrics, sensorNameByTopic(topicPartition, CHUNK_DETRANSFORM_TIME),
242+
() -> topicTags(topicPartition))
243+
.with(metricsRegistry.chunkDetransformTimeAvgByTopic, new Avg())
244+
.with(metricsRegistry.chunkDetransformTimeMaxByTopic, new Max())
245+
.get()
246+
.record(millis);
247+
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, CHUNK_DETRANSFORM_TIME),
248+
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
249+
.with(metricsRegistry.chunkDetransformTimeAvgByTopicPartition, new Avg())
250+
.with(metricsRegistry.chunkDetransformTimeMaxByTopicPartition, new Max())
251+
.get()
252+
.record(millis);
253+
}
254+
213255
public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix,
214256
final long bytes) {
215257
recordObjectUploadRequests(topicPartition, suffix);

core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,37 @@ public class MetricsRegistry {
133133
final MetricNameTemplate segmentFetchRequestedBytesTotalByTopicPartition =
134134
new MetricNameTemplate(SEGMENT_FETCH_REQUESTED_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
135135

136+
// Transform/Detransform metrics
137+
static final String CHUNK_TRANSFORM_TIME = "chunk-transform-time";
138+
static final String CHUNK_TRANSFORM_TIME_AVG = CHUNK_TRANSFORM_TIME + "-avg";
139+
final MetricNameTemplate chunkTransformTimeAvg = new MetricNameTemplate(CHUNK_TRANSFORM_TIME_AVG, METRIC_GROUP, "");
140+
final MetricNameTemplate chunkTransformTimeAvgByTopic =
141+
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_TAG_NAMES);
142+
final MetricNameTemplate chunkTransformTimeAvgByTopicPartition =
143+
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
144+
static final String CHUNK_TRANSFORM_TIME_MAX = CHUNK_TRANSFORM_TIME + "-max";
145+
final MetricNameTemplate chunkTransformTimeMax = new MetricNameTemplate(CHUNK_TRANSFORM_TIME_MAX, METRIC_GROUP, "");
146+
final MetricNameTemplate chunkTransformTimeMaxByTopic =
147+
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_TAG_NAMES);
148+
final MetricNameTemplate chunkTransformTimeMaxByTopicPartition =
149+
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
150+
151+
static final String CHUNK_DETRANSFORM_TIME = "chunk-detransform-time";
152+
static final String CHUNK_DETRANSFORM_TIME_AVG = CHUNK_DETRANSFORM_TIME + "-avg";
153+
final MetricNameTemplate chunkDetransformTimeAvg =
154+
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_AVG, METRIC_GROUP, "");
155+
final MetricNameTemplate chunkDetransformTimeAvgByTopic =
156+
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_TAG_NAMES);
157+
final MetricNameTemplate chunkDetransformTimeAvgByTopicPartition =
158+
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
159+
static final String CHUNK_DETRANSFORM_TIME_MAX = CHUNK_DETRANSFORM_TIME + "-max";
160+
final MetricNameTemplate chunkDetransformTimeMax =
161+
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_MAX, METRIC_GROUP, "");
162+
final MetricNameTemplate chunkDetransformTimeMaxByTopic =
163+
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_TAG_NAMES);
164+
final MetricNameTemplate chunkDetransformTimeMaxByTopicPartition =
165+
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
166+
136167
// Object upload metrics
137168
static final String OBJECT_UPLOAD = "object-upload";
138169
static final String OBJECT_UPLOAD_RATE = OBJECT_UPLOAD + "-rate";

core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,18 @@ class RemoteStorageManagerMetricsTest {
7575
1, 100, -1, -1, 1L,
7676
LOG_SEGMENT_BYTES, Collections.singletonMap(1, 100L));
7777

78+
Time time;
7879
RemoteStorageManager rsm;
7980
LogSegmentData logSegmentData;
8081

8182
private Map<String, Object> configs;
8283

8384
@BeforeEach
8485
void setup(@TempDir final Path tmpDir,
85-
@Mock final Time time) throws IOException {
86-
when(time.milliseconds()).thenReturn(0L);
87-
rsm = new RemoteStorageManager(time);
86+
@Mock final Time time) throws IOException {
87+
this.time = time;
88+
when(this.time.milliseconds()).thenReturn(0L);
89+
rsm = new RemoteStorageManager(this.time);
8890

8991
final Path target = tmpDir.resolve("target");
9092
Files.createDirectories(target);
@@ -117,6 +119,7 @@ void setup(@TempDir final Path tmpDir,
117119
void metricsShouldBeReported(final String tags) throws RemoteStorageException, JMException {
118120
rsm.configure(configs);
119121

122+
when(time.nanoseconds()).thenReturn(0L, 1000000L);
120123
rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData);
121124
logSegmentData.leaderEpochIndex().flip(); // so leader epoch can be consumed again
122125
rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData);
@@ -133,6 +136,11 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J
133136
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-time-max"))
134137
.isEqualTo(0.0);
135138

139+
assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-transform-time-avg"))
140+
.isEqualTo(1.0);
141+
assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-transform-time-max"))
142+
.isEqualTo(1.0);
143+
136144
assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-total"))
137145
.isEqualTo(18.0);
138146
assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-rate"))
@@ -177,13 +185,28 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J
177185
final var segmentManifestCacheObjectName =
178186
new ObjectName("aiven.kafka.server.tieredstorage.cache:type=segment-manifest-cache");
179187

180-
rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0);
188+
when(time.nanoseconds()).thenReturn(0L, 1000000L);
189+
try (final var inputStream = rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0)) {
190+
inputStream.readAllBytes();
191+
} catch (final IOException e) {
192+
throw new RuntimeException(e);
193+
}
194+
195+
assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-detransform-time-avg"))
196+
.isEqualTo(1.0);
197+
assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-detransform-time-max"))
198+
.isEqualTo(1.0);
181199

182200
// check cache size increases after first miss
183201
assertThat(MBEAN_SERVER.getAttribute(segmentManifestCacheObjectName, "cache-size-total"))
184202
.isEqualTo(1.0);
185203

186-
rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0);
204+
when(time.nanoseconds()).thenReturn(0L, 1000000L);
205+
try (final var inputStream = rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0)) {
206+
inputStream.readAllBytes();
207+
} catch (final IOException e) {
208+
throw new RuntimeException(e);
209+
}
187210

188211
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-fetch-requested-bytes-rate"))
189212
.isEqualTo(20.0 / METRIC_TIME_WINDOW_SEC);

0 commit comments

Comments
 (0)