Skip to content

Commit

Permalink
fix(s3stream): find in data block with real record size (#839)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 16, 2023
1 parent 6a00e83 commit eaf816f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
// we consider first block as not matched because we do not know exactly how many bytes are within
// the range in first block, as a result we may read one more block than expected.
if (matched) {
nextMaxBytes -= Math.min(nextMaxBytes, blockSize);
int recordPayloadSize = blockSize
- recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
nextMaxBytes -= Math.min(nextMaxBytes, recordPayloadSize);
}
if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) {
isFulfilled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ public int size() {
}

class DataBlock {
public static final int BLOCK_HEADER_SIZE = 2;
private final CompositeByteBuf encodedBuf;
private final ObjectStreamRange streamRange;
private final int recordCount;
Expand All @@ -258,7 +259,7 @@ class DataBlock {
public DataBlock(long streamId, List<StreamRecordBatch> records) {
this.recordCount = records.size();
this.encodedBuf = DirectByteBufAlloc.compositeByteBuffer();
ByteBuf header = DirectByteBufAlloc.byteBuffer(2);
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE);
header.writeByte(DATA_BLOCK_MAGIC);
header.writeByte(DATA_BLOCK_DEFAULT_FLAG);
encodedBuf.addComponent(true, header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@

public class StreamRecordBatchCodec {
public static final byte MAGIC_V0 = 0x22;
public static final int HEADER_SIZE =
1 // magic
+ 8 // streamId
+ 8 // epoch
+ 8 // baseOffset
+ 4 // lastOffsetDelta
+ 4; // payload length

public static ByteBuf encode(StreamRecordBatch streamRecord) {
int totalLength = 1 // magic
+ 8 // streamId
+ 8 // epoch
+ 8 // baseOffset
+ 4 // lastOffsetDelta
+ 4 // payload length
+ streamRecord.size(); // payload
int totalLength = HEADER_SIZE + streamRecord.size(); // payload

ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength);
buf.writeByte(MAGIC_V0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,21 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
LOGGER.debug("[S3BlockCache] read data cache miss, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, maxBytes);
}
return streamReader.syncReadAhead(streamId, startOffset, endOffset, maxBytes, agent, uuid)
.thenApply(rst -> new ReadDataBlock(rst, CacheAccessType.BLOCK_CACHE_MISS));
.thenCompose(rst -> {
if (!rst.isEmpty()) {
int remainBytes = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
long lastOffset = rst.get(rst.size() - 1).getLastOffset();
if (remainBytes > 0 && lastOffset < endOffset) {
// retry read
return read0(streamId, lastOffset, endOffset, remainBytes, uuid, context).thenApply(rst2 -> {
List<StreamRecordBatch> records = new ArrayList<>(rst);
records.addAll(rst2.getRecords());
return new ReadDataBlock(records, CacheAccessType.BLOCK_CACHE_MISS);
});
}
}
return CompletableFuture.completedFuture(new ReadDataBlock(rst, CacheAccessType.BLOCK_CACHE_MISS));
});
}

private void asyncReadAhead(long streamId, ReadAheadAgent agent, List<ReadAheadRecord> readAheadRecords) {
Expand Down
79 changes: 48 additions & 31 deletions s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,71 +38,88 @@
@Tag("S3Unit")
public class ObjectReaderTest {

private int recordCntToBlockSize(int recordCnt, int bodySize) {
return (bodySize + StreamRecordBatchCodec.HEADER_SIZE) * recordCnt + ObjectWriter.DataBlock.BLOCK_HEADER_SIZE;
}

@Test
public void testIndexBlock() {
// block0: s1 [0, 100)
// block1: s1 [100, 300)
// block2: s1 [300, 400)
// block1: s1 [100, 150)
// block2: s1 [150, 200)
// block3: s2 [110, 200)
int bodySize = 10;
int recordCnt1 = 100;
int blockSize1 = recordCntToBlockSize(recordCnt1, bodySize);
int recordCnt2 = 50;
int blockSize2 = recordCntToBlockSize(recordCnt2, bodySize);
int recordCnt3 = 90;
int blockSize3 = recordCntToBlockSize(recordCnt3, bodySize);
long streamId1 = 1;
long streamId2 = 2;
ByteBuf blocks = Unpooled.buffer(3 * ObjectReader.DataBlockIndex.BLOCK_INDEX_SIZE);
blocks.writeLong(0);
blocks.writeInt(1024);
blocks.writeInt(100);
blocks.writeInt(blockSize1);
blocks.writeInt(recordCnt1);

blocks.writeLong(1024);
blocks.writeInt(512);
blocks.writeInt(100);
blocks.writeLong(blockSize1);
blocks.writeInt(blockSize2);
blocks.writeInt(recordCnt2);

blocks.writeLong(1536);
blocks.writeInt(512);
blocks.writeInt(100);
blocks.writeLong((long) blockSize1 + blockSize2);
blocks.writeInt(blockSize2);
blocks.writeInt(recordCnt2);

blocks.writeLong(2048);
blocks.writeInt(512);
blocks.writeInt(90);
blocks.writeLong((long) blockSize1 + blockSize2 + blockSize2);
blocks.writeInt(blockSize3);
blocks.writeInt(recordCnt3);


ByteBuf streamRanges = Unpooled.buffer(3 * (8 + 8 + 4 + 4));
streamRanges.writeLong(1);
streamRanges.writeLong(streamId1);
streamRanges.writeLong(0);
streamRanges.writeInt(100);
streamRanges.writeInt(recordCnt1);
streamRanges.writeInt(0);

streamRanges.writeLong(1);
streamRanges.writeLong(100);
streamRanges.writeInt(200);
streamRanges.writeLong(streamId1);
streamRanges.writeLong(recordCnt1);
streamRanges.writeInt(recordCnt2);
streamRanges.writeInt(1);

streamRanges.writeLong(1);
streamRanges.writeLong(300);
streamRanges.writeInt(400);
streamRanges.writeLong(streamId1);
streamRanges.writeLong(recordCnt1 + recordCnt2);
streamRanges.writeInt(recordCnt2);
streamRanges.writeInt(2);

streamRanges.writeLong(2);
streamRanges.writeLong(streamId2);
streamRanges.writeLong(110);
streamRanges.writeInt(90);
streamRanges.writeInt(2);
streamRanges.writeInt(recordCnt3);
streamRanges.writeInt(3);

ObjectReader.IndexBlock indexBlock = new ObjectReader.IndexBlock(Mockito.mock(S3ObjectMetadata.class), blocks, streamRanges);

ObjectReader.FindIndexResult rst = indexBlock.find(1, 10, 300, 100000);
ObjectReader.FindIndexResult rst = indexBlock.find(1, 10, 150, 100000);
assertTrue(rst.isFulfilled());
List<StreamDataBlock> streamDataBlocks = rst.streamDataBlocks();
assertEquals(2, streamDataBlocks.size());
assertEquals(0, streamDataBlocks.get(0).getBlockId());
assertEquals(0, streamDataBlocks.get(0).getBlockStartPosition());
assertEquals(1024, streamDataBlocks.get(0).getBlockEndPosition());
assertEquals(blockSize1, streamDataBlocks.get(0).getBlockEndPosition());
assertEquals(1, streamDataBlocks.get(1).getBlockId());
assertEquals(1024, streamDataBlocks.get(1).getBlockStartPosition());
assertEquals(1536, streamDataBlocks.get(1).getBlockEndPosition());
assertEquals(blockSize1, streamDataBlocks.get(1).getBlockStartPosition());
assertEquals((long) blockSize1 + blockSize2, streamDataBlocks.get(1).getBlockEndPosition());

rst = indexBlock.find(1, 10, 400);
rst = indexBlock.find(1, 10, 200);
assertTrue(rst.isFulfilled());
assertEquals(3, rst.streamDataBlocks().size());

rst = indexBlock.find(1, 10, 400, 10);
rst = indexBlock.find(1L, 10, 10000, 80 * bodySize);
assertTrue(rst.isFulfilled());
assertEquals(2, rst.streamDataBlocks().size());
assertEquals(3, rst.streamDataBlocks().size());

rst = indexBlock.find(1L, 10, 10000, 160 * bodySize);
assertFalse(rst.isFulfilled());
assertEquals(3, rst.streamDataBlocks().size());

rst = indexBlock.find(1, 10, 800);
assertFalse(rst.isFulfilled());
Expand Down

0 comments on commit eaf816f

Please sign in to comment.