Skip to content

Commit

Permalink
feat(wal): reduce concurrent conflicts between block write operations…
Browse files Browse the repository at this point in the history
… and poll operations (AutoMQ#1550)
  • Loading branch information
CLFutureX committed Jul 12, 2024
1 parent 3cc61aa commit 8ffec25
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ public class SlidingWindowService {
private volatile long lastWriteTimeNanos = 0;

/**
* The maximum offset currently written into writeBlocks.*
* The maximum alignment offset in {@link #writingBlocks}.*
*/
private long maxWriteBlockOffset = 0;
private long maxAlignWriteBlockOffset = 0;

public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit,
long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) {
Expand Down Expand Up @@ -321,14 +321,18 @@ private BlockBatch pollBlocksLocked() {
}
}

if (pendingBlocks.isEmpty()) {
return null;
}
Collection<Block> blocks = new LinkedList<>();
Block leastBlock = null;
while (!pendingBlocks.isEmpty()) {
blocks.add(pendingBlocks.poll());
leastBlock = pendingBlocks.poll();
blocks.add(leastBlock);
}

BlockBatch blockBatch = new BlockBatch(blocks);
writingBlocks.add(blockBatch.startOffset());
maxWriteBlockOffset = blockBatch.endOffset();
maxAlignWriteBlockOffset = nextBlockStartOffset(leastBlock);

return blockBatch;
}
Expand All @@ -352,7 +356,7 @@ private long wroteBlocksLocked(BlockBatch wroteBlocks) {
boolean removed = writingBlocks.remove(wroteBlocks.startOffset());
assert removed;
if (writingBlocks.isEmpty()) {
return this.maxWriteBlockOffset;
return this.maxAlignWriteBlockOffset;
}
return writingBlocks.peek();
}
Expand Down

0 comments on commit 8ffec25

Please sign in to comment.