Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(issues690): concurrent read log cache #701

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ public class S3Storage implements Storage {
private final long maxDeltaWALCacheSize;
private final Config config;
private final WriteAheadLog deltaWAL;
/**
* WAL log cache. Single thread mainReadExecutor will ensure the memory safety.
*/
/** WAL log cache */
private final LogCache deltaWALCache;
/**
* WAL out of order callback sequencer. Single thread mainWriteExecutor will ensure the memory safety.
Expand All @@ -78,8 +76,6 @@ public class S3Storage implements Storage {

private final ExecutorService mainWriteExecutor = Threads.newFixedThreadPoolWithMonitor(1,
"s3-storage-main-write", false, LOGGER);
private final ExecutorService mainReadExecutor = Threads.newFixedThreadPoolWithMonitor(1,
"s3-storage-main-read", false, LOGGER);
private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPoolWithMonitor(
Expand Down Expand Up @@ -225,7 +221,6 @@ public void shutdown() {
}
deltaWAL.shutdownGracefully();
backgroundExecutor.shutdown();
mainReadExecutor.shutdown();
mainWriteExecutor.shutdown();
}

Expand Down Expand Up @@ -317,7 +312,7 @@ private void tryDrainBackoffRecords() {
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf));
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf);
cf.whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}
Expand Down Expand Up @@ -346,7 +341,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
}
continuousCheck(rst);
return new ReadDataBlock(rst, readDataBlock.getCacheAccessType());
}, mainReadExecutor).whenComplete((rst, ex) -> {
}).whenComplete((rst, ex) -> {
if (ex != null) {
logCacheRecords.forEach(StreamRecordBatch::release);
}
Expand Down Expand Up @@ -380,7 +375,7 @@ public synchronized CompletableFuture<Void> forceUpload(long streamId) {
}
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId));
}, mainReadExecutor);
});
return cf;
}

Expand All @@ -396,16 +391,14 @@ private void handleAppendCallback0(WalWriteRequest request) {
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
long walConfirmOffset = callbackSequencer.getWALConfirmOffset();
waitingAckRequests.forEach(r -> r.record.retain());
mainReadExecutor.execute(() -> {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
deltaWALCache.setConfirmOffset(walConfirmOffset);
LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock();
uploadDeltaWAL(logCacheBlock);
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
deltaWALCache.setConfirmOffset(walConfirmOffset);
LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock();
uploadDeltaWAL(logCacheBlock);
}
});
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
Expand Down Expand Up @@ -495,7 +488,7 @@ private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
}

private void freeCache(LogCache.LogCacheBlock cacheBlock) {
mainReadExecutor.execute(() -> deltaWALCache.markFree(cacheBlock));
deltaWALCache.markFree(cacheBlock);
}

/**
Expand Down Expand Up @@ -581,7 +574,7 @@ class LogCacheEvictOOMHandler implements DirectByteBufAlloc.OOMHandler {
public int handle(int memoryRequired) {
try {
CompletableFuture<Integer> cf = new CompletableFuture<>();
mainReadExecutor.submit(() -> FutureUtil.exec(() -> cf.complete(deltaWALCache.forceFree(memoryRequired)), cf, LOGGER, "handleOOM"));
FutureUtil.exec(() -> cf.complete(deltaWALCache.forceFree(memoryRequired)), cf, LOGGER, "handleOOM");
return cf.get();
} catch (Throwable e) {
return 0;
Expand Down
142 changes: 95 additions & 47 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

import static com.automq.stream.s3.cache.LogCache.StreamRange.NOOP_OFFSET;
Expand All @@ -51,6 +52,11 @@ public class LogCache {
private final AtomicLong size = new AtomicLong();
private final Consumer<LogCacheBlock> blockFreeListener;

// read write lock which guards the <code>LogCache.blocks</code>
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCount, Consumer<LogCacheBlock> blockFreeListener) {
this.capacity = capacity;
this.cacheBlockMaxSize = cacheBlockMaxSize;
Expand Down Expand Up @@ -103,8 +109,15 @@ public boolean put(StreamRecordBatch recordBatch) {
*/
public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
List<StreamRecordBatch> records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
List<StreamRecordBatch> records;
readLock.lock();
try {
records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
} finally {
readLock.unlock();
}

if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE).inc();
} else {
Expand All @@ -119,6 +132,7 @@ public List<StreamRecordBatch> get0(long streamId, long startOffset, long endOff
long nextStartOffset = startOffset;
int nextMaxBytes = maxBytes;
boolean fulfill = false;
List<LogCacheBlock> blocks = this.blocks;
for (LogCacheBlock archiveBlock : blocks) {
List<StreamRecordBatch> records = archiveBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes);
if (records.isEmpty()) {
Expand Down Expand Up @@ -158,11 +172,16 @@ public List<StreamRecordBatch> get0(long streamId, long startOffset, long endOff
}

public LogCacheBlock archiveCurrentBlock() {
LogCacheBlock block = activeBlock;
block.confirmOffset = confirmOffset;
activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
blocks.add(activeBlock);
return block;
writeLock.lock();
try {
LogCacheBlock block = activeBlock;
block.confirmOffset = confirmOffset;
activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
blocks.add(activeBlock);
return block;
} finally {
writeLock.unlock();
}
}

public Optional<LogCacheBlock> archiveCurrentBlockIfContains(long streamId) {
Expand Down Expand Up @@ -191,30 +210,48 @@ private void tryRealFree() {
if (size.get() <= capacity * 0.9) {
return;
}
blocks.removeIf(b -> {
if (size.get() <= capacity * 0.9) {
return false;
}
if (b.free) {
size.addAndGet(-b.size);
blockFreeListener.accept(b);
b.free();
}
return b.free;
List<LogCacheBlock> removed = new ArrayList<>();
writeLock.lock();
try {
blocks.removeIf(b -> {
if (size.get() <= capacity * 0.9) {
return false;
}
if (b.free) {
size.addAndGet(-b.size);
removed.add(b);
}
return b.free;
});
} finally {
writeLock.unlock();
}
removed.forEach(b -> {
blockFreeListener.accept(b);
b.free();
});
}

public int forceFree(int required) {
AtomicInteger freedBytes = new AtomicInteger();
blocks.removeIf(block -> {
if (!block.free || freedBytes.get() >= required) {
return false;
}
size.addAndGet(-block.size);
freedBytes.addAndGet((int) block.size);
blockFreeListener.accept(block);
block.free();
return true;
List<LogCacheBlock> removed = new ArrayList<>();
writeLock.lock();
try {
blocks.removeIf(block -> {
if (!block.free || freedBytes.get() >= required) {
return false;
}
size.addAndGet(-block.size);
freedBytes.addAndGet((int) block.size);
removed.add(block);
return true;
});
} finally {
writeLock.unlock();
}
removed.forEach(b -> {
blockFreeListener.accept(b);
b.free();
});
return freedBytes.get();
}
Expand All @@ -232,10 +269,10 @@ public static class LogCacheBlock {
private final long blockId;
private final long maxSize;
private final int maxStreamCount;
private final Map<Long, List<StreamRecordBatch>> map = new HashMap<>();
private final Map<Long, List<StreamRecordBatch>> map = new ConcurrentHashMap<>();
private long size = 0;
private long confirmOffset;
boolean free;
volatile boolean free;

public LogCacheBlock(long maxSize, int maxStreamCount) {
this.blockId = BLOCK_ID_ALLOC.getAndIncrement();
Expand All @@ -253,8 +290,15 @@ public long blockId() {
}

public boolean put(StreamRecordBatch recordBatch) {
List<StreamRecordBatch> streamCache = map.computeIfAbsent(recordBatch.getStreamId(), id -> new ArrayList<>());
streamCache.add(recordBatch);
map.compute(recordBatch.getStreamId(), (id, records) -> {
if (records == null) {
records = new ArrayList<>();
}
synchronized (records) {
records.add(recordBatch);
}
return records;
});
int recordSize = recordBatch.size();
size += recordSize;
return size >= maxSize || map.size() >= maxStreamCount;
Expand All @@ -268,31 +312,35 @@ public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffs
if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) {
superhx marked this conversation as resolved.
Show resolved Hide resolved
return Collections.emptyList();
}
StreamRecordBatchList records = new StreamRecordBatchList(streamRecords);
int startIndex = records.search(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
for (int i = startIndex; i < streamRecords.size(); i++) {
StreamRecordBatch record = streamRecords.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;
synchronized (streamRecords) {
StreamRecordBatchList records = new StreamRecordBatchList(streamRecords);
int startIndex = records.search(startOffset);
if (startIndex == -1) {
// mismatched
return Collections.emptyList();
}
int endIndex = -1;
int remainingBytesSize = maxBytes;
for (int i = startIndex; i < streamRecords.size(); i++) {
StreamRecordBatch record = streamRecords.get(i);
endIndex = i + 1;
remainingBytesSize -= Math.min(remainingBytesSize, record.size());
if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) {
break;
}
}
return new ArrayList<>(streamRecords.subList(startIndex, endIndex));
}
return streamRecords.subList(startIndex, endIndex);
}

StreamRange getStreamRange(long streamId) {
List<StreamRecordBatch> streamRecords = map.get(streamId);
if (streamRecords == null || streamRecords.isEmpty()) {
superhx marked this conversation as resolved.
Show resolved Hide resolved
return new StreamRange(NOOP_OFFSET, NOOP_OFFSET);
} else {
return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset());
synchronized (streamRecords) {
return new StreamRange(streamRecords.get(0).getBaseOffset(), streamRecords.get(streamRecords.size() - 1).getLastOffset());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
public class StreamRecordBatchList extends AbstractOrderedCollection<Long> {

private final List<ComparableStreamRecordBatch> records;
private final int size;

public StreamRecordBatchList(List<StreamRecordBatch> records) {
this.records = records.stream().map(ComparableStreamRecordBatch::new).toList();
this.size = records.size();
}

@Override
int size() {
return records.size();
return size;
}

@Override
Expand Down