Skip to content

Commit

Permalink
feat(s3stream): optimize metrics performance
Browse files Browse the repository at this point in the history
- optimize metrics performance by reuse attributes and reduced map indexing cost by using plain object reference

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Jan 9, 2024
1 parent d53d0f8 commit 96943a4
Show file tree
Hide file tree
Showing 39 changed files with 1,026 additions and 374 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.15.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.16.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.15.0-SNAPSHOT</version>
<version>0.16.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.automq.stream.s3;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.ByteBufStats;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand All @@ -45,7 +45,7 @@ public static ByteBuf byteBuffer(int initCapacity) {
public static ByteBuf byteBuffer(int initCapacity, String name) {
try {
if (name != null) {
S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, initCapacity, name);
ByteBufStats.getInstance().allocateByteBufSizeStats(name).record(MetricsLevel.DEBUG, initCapacity);
}
return ALLOC.directBuffer(initCapacity);
} catch (OutOfMemoryError e) {
Expand Down
25 changes: 12 additions & 13 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -280,7 +279,7 @@ public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch s
append0(context, writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
StorageOperationStats.getInstance().appendStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
return cf;
}
Expand All @@ -300,7 +299,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
if (!fromBackoff) {
backoffRecords.offer(request);
}
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, 0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
StorageOperationStats.getInstance().appendLogCacheFullStats.record(MetricsLevel.INFO, 0L);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -376,7 +375,7 @@ public CompletableFuture<ReadDataBlock> read(FetchContext context,
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf);
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().readStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}

Expand Down Expand Up @@ -454,7 +453,7 @@ public CompletableFuture<Void> forceUpload(long streamId) {
CompletableFuture<Void> cf = new CompletableFuture<>();
// Wait for a while to group force upload tasks.
forceUploadTicker.tick().whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT);
StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
uploadDeltaWAL(streamId, true);
// Wait for all tasks contains streamId complete.
List<CompletableFuture<Void>> tasksContainsStream = this.inflightWALUploadTasks.stream()
Expand All @@ -466,7 +465,7 @@ public CompletableFuture<Void> forceUpload(long streamId) {
callbackSequencer.tryFree(streamId);
}
});
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE));
cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().forceUploadWALCompleteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}

Expand Down Expand Up @@ -498,7 +497,7 @@ private void handleAppendCallback0(WalWriteRequest request) {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
StorageOperationStats.getInstance().appendCallbackStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
}

private Lock getStreamCallbackLock(long streamId) {
Expand Down Expand Up @@ -543,7 +542,7 @@ CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
inflightWALUploadTasks.add(context);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordStageLatency(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE);
StorageOperationStats.getInstance().uploadWALCompleteStats.record(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
inflightWALUploadTasks.remove(context);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
Expand Down Expand Up @@ -584,11 +583,11 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {

private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.prepare().thenAcceptAsync(nil -> {
S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE);
StorageOperationStats.getInstance().uploadWALPrepareStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
// 1. poll out current task and trigger upload.
DeltaWALUploadTaskContext peek = walPrepareQueue.poll();
Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> S3StreamMetricsManager.recordStageLatency(
MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD));
Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> StorageOperationStats.getInstance()
.uploadWALUploadStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS)));
// 2. add task to commit queue.
boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty();
walCommitQueue.add(peek);
Expand All @@ -605,7 +604,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {

private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.commit().thenAcceptAsync(nil -> {
S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT);
StorageOperationStats.getInstance().uploadWALCommitStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
// 1. poll out current task
walCommitQueue.poll();
if (context.cache.confirmOffset() != 0) {
Expand Down
14 changes: 7 additions & 7 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.streams.StreamManager;
Expand Down Expand Up @@ -137,7 +136,6 @@ public long nextOffset() {
public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
try {
CompletableFuture<AppendResult> cf = exec(() -> {
if (networkInboundLimiter != null) {
Expand All @@ -152,7 +150,7 @@ public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM);
StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -194,12 +192,11 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
@SpanAttribute int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
try {
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
Expand Down Expand Up @@ -263,7 +260,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM);
StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
});
return cf;
}, LOGGER, "trim");
Expand Down Expand Up @@ -294,6 +291,7 @@ private CompletableFuture<Void> trim0(long newStartOffset) {

@Override
public CompletableFuture<Void> close() {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
try {
status.markClosed();
Expand All @@ -312,8 +310,10 @@ public CompletableFuture<Void> close() {
closeCf.whenComplete((nil, ex) -> {
if (ex != null) {
LOGGER.error("{} close fail", logIdent, ex);
StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
} else {
LOGGER.info("{} closed", logIdent);
StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -81,7 +80,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM);
StreamOperationStats.getInstance().createStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
Expand Down Expand Up @@ -114,14 +113,14 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
StreamObjectCompactor.Builder builder = StreamObjectCompactor.builder().objectManager(objectManager).s3Operator(s3Operator)
.maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes());
S3Stream stream = new S3Stream(
metadata.streamId(), metadata.epoch(),
metadata.startOffset(), metadata.endOffset(),
storage, streamManager, openedStreams::remove, networkInboundBucket, networkOutboundBucket);
openedStreams.put(streamId, stream);
StreamOperationStats.getInstance().openStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return stream;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

import com.automq.stream.s3.Config;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -111,8 +110,8 @@ public CompletableFuture<ReadDataBlock> read(TraceContext traceContext,

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
StorageOperationStats.getInstance().readBlockCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed);
Span.fromContext(finalTraceContext.currentContext()).setAttribute("cache_hit", isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {}",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;
Expand Down Expand Up @@ -99,7 +99,7 @@ public boolean put(StreamRecordBatch recordBatch) {
} finally {
readLock.unlock();
}
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE);
StorageOperationStats.getInstance().appendLogCacheStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return full;
}

Expand Down Expand Up @@ -150,7 +150,7 @@ public List<StreamRecordBatch> get(TraceContext context,

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit);
StorageOperationStats.getInstance().readLogCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed);
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package com.automq.stream.s3.cache;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.utils.LogContext;
import com.google.common.base.Objects;
import java.util.ArrayList;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) {
lock.lock();
this.readAheadEndOffset = readAheadEndOffset;
this.lastReadAheadSize = readAheadSize;
S3StreamMetricsManager.recordReadAheadSize(MetricsLevel.INFO, readAheadSize);
StorageOperationStats.getInstance().readAheadSizeStats.record(MetricsLevel.INFO, readAheadSize);
if (logger.isDebugEnabled()) {
logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset);
}
Expand Down
Loading

0 comments on commit 96943a4

Please sign in to comment.