diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockBatch.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockBatch.java index 97f167c76d..f31e99687f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockBatch.java @@ -21,7 +21,6 @@ public class BlockBatch { private final Collection blocks; private final long startOffset; private final long endOffset; - private final long blockBatchSize; public BlockBatch(Collection blocks) { assert !blocks.isEmpty(); @@ -34,9 +33,6 @@ public BlockBatch(Collection blocks) { .map(b -> b.startOffset() + b.size()) .max(Long::compareTo) .orElseThrow(); - this.blockBatchSize = blocks.stream() - .mapToLong(Block::size) - .sum(); } public long startOffset() { @@ -51,10 +47,6 @@ public Collection blocks() { return Collections.unmodifiableCollection(blocks); } - public long blockBatchSize(){ - return blockBatchSize; - } - public Iterator> futures() { return new Iterator<>() { private final Iterator blockIterator = blocks.iterator(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java index e93370f21d..1a534a900c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java @@ -23,10 +23,10 @@ import java.util.Collection; import java.util.LinkedList; import java.util.Queue; +import java.util.PriorityQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,7 +71,7 @@ public class SlidingWindowService { /** * Blocks that are being written. */ - private final Queue writingBlocks = new PriorityBlockingQueue<>(); + private final Queue writingBlocks = new PriorityQueue<>(); /** * Whether the service is initialized. * After the service is initialized, data in {@link #windowCoreData} is valid. @@ -106,6 +106,11 @@ public class SlidingWindowService { */ private volatile long lastWriteTimeNanos = 0; + /** + * The maximum offset currently written into writeBlocks.* + */ + private long maxWriteBlockOffset = 0; + public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit, long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) { this.walChannel = walChannel; @@ -323,6 +328,7 @@ private BlockBatch pollBlocksLocked() { BlockBatch blockBatch = new BlockBatch(blocks); writingBlocks.add(blockBatch.startOffset()); + maxWriteBlockOffset = blockBatch.endOffset(); return blockBatch; } @@ -331,10 +337,22 @@ private BlockBatch pollBlocksLocked() { * Finish the given block batch, and return the start offset of the first block which has not been flushed yet. */ private long wroteBlocks(BlockBatch wroteBlocks) { + this.pollBlockLock.lock(); + try { + return wroteBlocksLocked(wroteBlocks); + } finally { + this.pollBlockLock.unlock(); + } + } + /** + * Finish the given block batch, and return the start offset of the first block which has not been flushed yet. + * Note: this method is NOT thread safe, and it should be called with {@link #pollBlockLock} locked. + */ + private long wroteBlocksLocked(BlockBatch wroteBlocks) { boolean removed = writingBlocks.remove(wroteBlocks.startOffset()); assert removed; if (writingBlocks.isEmpty()) { - return wroteBlocks.startOffset() + WALUtil.alignLargeByBlockSize(wroteBlocks.blockBatchSize()); + return this.maxWriteBlockOffset; } return writingBlocks.peek(); }