From 8ffec2551dab882f7055da4b7dcd44f2c5cf5d1f Mon Sep 17 00:00:00 2001 From: chenyong152 Date: Fri, 12 Jul 2024 16:25:36 +0800 Subject: [PATCH] feat(wal): reduce concurrent conflicts between block write operations and poll operations (#1550) --- .../s3/wal/impl/block/SlidingWindowService.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java index 10809f0601..876f104c76 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java @@ -107,9 +107,9 @@ public class SlidingWindowService { private volatile long lastWriteTimeNanos = 0; /** - * The maximum offset currently written into writeBlocks.* + * The maximum alignment offset in {@link #writingBlocks}.* */ - private long maxWriteBlockOffset = 0; + private long maxAlignWriteBlockOffset = 0; public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit, long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) { @@ -321,14 +321,18 @@ private BlockBatch pollBlocksLocked() { } } + if (pendingBlocks.isEmpty()) { + return null; + } Collection blocks = new LinkedList<>(); + Block leastBlock = null; while (!pendingBlocks.isEmpty()) { - blocks.add(pendingBlocks.poll()); + leastBlock = pendingBlocks.poll(); + blocks.add(leastBlock); } - BlockBatch blockBatch = new BlockBatch(blocks); writingBlocks.add(blockBatch.startOffset()); - maxWriteBlockOffset = blockBatch.endOffset(); + maxAlignWriteBlockOffset = nextBlockStartOffset(leastBlock); return blockBatch; } @@ -352,7 +356,7 @@ private long wroteBlocksLocked(BlockBatch wroteBlocks) { boolean removed = writingBlocks.remove(wroteBlocks.startOffset()); assert removed; if (writingBlocks.isEmpty()) { - return this.maxWriteBlockOffset; + return this.maxAlignWriteBlockOffset; } return writingBlocks.peek(); }