Skip to content

Commit

Permalink
feat(kafka_issues642): stream object compact group small data blocks
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Jan 9, 2024
1 parent 2c72855 commit 0eb5d36
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 92 deletions.
10 changes: 5 additions & 5 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public CompletableFuture<FindIndexResult> find(long streamId, long startOffset,
return basicObjectInfoCf.thenApply(basicObjectInfo -> basicObjectInfo.indexBlock().find(streamId, startOffset, endOffset, maxBytes));
}

public CompletableFuture<DataBlock> read(DataBlockIndex block) {
public CompletableFuture<DataBlockGroup> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.THROTTLE_1);
return rangeReadCf.thenApply(DataBlock::new);
return rangeReadCf.thenApply(DataBlockGroup::new);
}

void asyncGetBasicObjectInfo() {
Expand Down Expand Up @@ -234,7 +234,7 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
if (matched) {
int recordPayloadSize = index.size()
- index.recordCount() * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
- ObjectWriter.SingleDataBlockGroup.BLOCK_HEADER_SIZE; // block header size
nextMaxBytes -= Math.min(nextMaxBytes, recordPayloadSize);
}
if ((endOffset != NOOP_OFFSET && nextStartOffset >= endOffset) || nextMaxBytes == 0) {
Expand Down Expand Up @@ -275,11 +275,11 @@ public IndexBlockParseException(long indexBlockPosition) {

}

public static class DataBlock implements AutoCloseable {
public static class DataBlockGroup implements AutoCloseable {
private final ByteBuf buf;
private final int recordCount;

public DataBlock(ByteBuf buf) {
public DataBlockGroup(ByteBuf buf) {
this.buf = buf.duplicate();
this.recordCount = check(buf);
}
Expand Down
24 changes: 11 additions & 13 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

// TODO: memory optimization

/**
* Write stream records to a single object.
*/
Expand Down Expand Up @@ -63,8 +61,8 @@ class DefaultObjectWriter implements ObjectWriter {

private final int blockSizeThreshold;
private final int partSizeThreshold;
private final List<DataBlock> waitingUploadBlocks;
private final List<DataBlock> completedBlocks;
private final List<SingleDataBlockGroup> waitingUploadBlocks;
private final List<SingleDataBlockGroup> completedBlocks;
private final Writer writer;
private final long objectId;
private int waitingUploadBlocksSize;
Expand Down Expand Up @@ -93,7 +91,7 @@ public DefaultObjectWriter(long objectId, S3Operator s3Operator, int blockSizeTh
public void write(long streamId, List<StreamRecordBatch> records) {
List<List<StreamRecordBatch>> blocks = groupByBlock(records);
blocks.forEach(blockRecords -> {
DataBlock block = new DataBlock(streamId, blockRecords);
SingleDataBlockGroup block = new SingleDataBlockGroup(streamId, blockRecords);
waitingUploadBlocks.add(block);
waitingUploadBlocksSize += block.size();
});
Expand Down Expand Up @@ -122,10 +120,10 @@ private List<List<StreamRecordBatch>> groupByBlock(List<StreamRecordBatch> recor

private synchronized void tryUploadPart() {
for (; ; ) {
List<DataBlock> uploadBlocks = new ArrayList<>(waitingUploadBlocks.size());
List<SingleDataBlockGroup> uploadBlocks = new ArrayList<>(waitingUploadBlocks.size());
boolean partFull = false;
int size = 0;
for (DataBlock block : waitingUploadBlocks) {
for (SingleDataBlockGroup block : waitingUploadBlocks) {
uploadBlocks.add(block);
size += block.size();
if (size >= partSizeThreshold) {
Expand All @@ -135,7 +133,7 @@ private synchronized void tryUploadPart() {
}
if (partFull) {
CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer();
for (DataBlock block : uploadBlocks) {
for (SingleDataBlockGroup block : uploadBlocks) {
waitingUploadBlocksSize -= block.size();
partBuf.addComponent(true, block.buffer());
}
Expand All @@ -150,7 +148,7 @@ private synchronized void tryUploadPart() {

public CompletableFuture<Void> close() {
CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer();
for (DataBlock block : waitingUploadBlocks) {
for (SingleDataBlockGroup block : waitingUploadBlocks) {
buf.addComponent(true, block.buffer());
completedBlocks.add(block);
}
Expand All @@ -167,7 +165,7 @@ public CompletableFuture<Void> close() {
public List<ObjectStreamRange> getStreamRanges() {
List<ObjectStreamRange> streamRanges = new LinkedList<>();
ObjectStreamRange lastStreamRange = null;
for (DataBlock block : completedBlocks) {
for (SingleDataBlockGroup block : completedBlocks) {
ObjectStreamRange streamRange = block.getStreamRange();
if (lastStreamRange == null || lastStreamRange.getStreamId() != streamRange.getStreamId()) {
if (lastStreamRange != null) {
Expand Down Expand Up @@ -202,7 +200,7 @@ public IndexBlock() {
long nextPosition = 0;
int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size();
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, "write_index_block");
for (DataBlock block : completedBlocks) {
for (SingleDataBlockGroup block : completedBlocks) {
ObjectStreamRange streamRange = block.getStreamRange();
new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()),
block.recordCount(), nextPosition, block.size()).encode(buf);
Expand All @@ -225,14 +223,14 @@ public int size() {
}
}

class DataBlock {
class SingleDataBlockGroup {
public static final int BLOCK_HEADER_SIZE = 1 /* magic */ + 1/* flag */ + 4 /* record count*/ + 4 /* data length */;
private final CompositeByteBuf encodedBuf;
private final ObjectStreamRange streamRange;
private final int recordCount;
private final int size;

public DataBlock(long streamId, List<StreamRecordBatch> records) {
public SingleDataBlockGroup(long streamId, List<StreamRecordBatch> records) {
this.recordCount = records.size();
this.encodedBuf = DirectByteBufAlloc.compositeByteBuffer();
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,23 @@ public class StreamObjectCompactor {
*/
private static final int MAX_OBJECT_GROUP_COUNT = Math.min(5000, Writer.MAX_PART_COUNT / 2);
private static final Logger LOGGER = LoggerFactory.getLogger(StreamObjectCompactor.class);
public static final int DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD = 1024 * 1024; // 1MiB
private final Logger s3ObjectLogger;
private final long maxStreamObjectSize;
private final S3Stream stream;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final int dataBlockGroupSizeThreshold;

private StreamObjectCompactor(ObjectManager objectManager, S3Operator s3Operator, S3Stream stream,
long maxStreamObjectSize) {
long maxStreamObjectSize, int dataBlockGroupSizeThreshold) {
this.objectManager = objectManager;
this.s3Operator = s3Operator;
this.stream = stream;
this.maxStreamObjectSize = Math.min(maxStreamObjectSize, Writer.MAX_OBJECT_SIZE);
String logIdent = "[StreamObjectsCompactionTask streamId=" + stream.streamId() + "] ";
this.s3ObjectLogger = S3ObjectLogger.logger(logIdent);
this.dataBlockGroupSizeThreshold = dataBlockGroupSizeThreshold;
}

public void compact() {
Expand All @@ -75,7 +78,13 @@ void compact0() throws ExecutionException, InterruptedException {
long streamId = stream.streamId();
long startOffset = stream.startOffset();
for (List<S3ObjectMetadata> objectGroup : objectGroups) {
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, startOffset, objectGroup, objectManager, s3Operator).compact();
// the object group is single object and there is no data block need to be removed.
if (objectGroup.size() == 1 && objectGroup.get(0).startOffset() >= startOffset) {
continue;
}
long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get();
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, startOffset,
objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact();
if (requestOpt.isPresent()) {
CompactStreamObjectRequest request = requestOpt.get();
objectManager.compactStreamObject(request).get();
Expand All @@ -95,31 +104,34 @@ static class StreamObjectGroupCompactor {
private final List<S3ObjectMetadata> objectGroup;
private final long streamId;
private final long startOffset;
private final ObjectManager objectManager;
// compact object group to the new object
private final long objectId;
private final S3Operator s3Operator;
private final int dataBlockGroupSizeThreshold;

public StreamObjectGroupCompactor(long streamId, long startOffset, List<S3ObjectMetadata> objectGroup,
ObjectManager objectManager, S3Operator s3Operator) {
long objectId, int dataBlockGroupSizeThreshold, S3Operator s3Operator) {
this.streamId = streamId;
this.startOffset = startOffset;
this.objectGroup = objectGroup;
this.objectManager = objectManager;
this.objectId = objectId;
this.dataBlockGroupSizeThreshold = dataBlockGroupSizeThreshold;
this.s3Operator = s3Operator;
}

public Optional<CompactStreamObjectRequest> compact() throws ExecutionException, InterruptedException {
// the object group is single object and there is no data block need to be removed.
if (objectGroup.size() == 1 && objectGroup.get(0).startOffset() >= startOffset) {
return Optional.empty();
}
long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get();
long nextBlockPosition = 0;
long objectSize = 0;
long compactedStartOffset = objectGroup.get(0).startOffset();
long compactedEndOffset = objectGroup.get(objectGroup.size() - 1).endOffset();
List<Long> compactedObjectIds = new LinkedList<>();
CompositeByteBuf indexes = DirectByteBufAlloc.compositeByteBuffer();
Writer writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2);
long groupStartOffset = -1L;
long groupStartPosition = -1L;
int groupSize = 0;
int groupRecordCount = 0;
DataBlockIndex lastIndex = null;
for (S3ObjectMetadata object : objectGroup) {
try (ObjectReader reader = new ObjectReader(object, s3Operator)) {
ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().get();
Expand All @@ -133,16 +145,37 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
compactedStartOffset = dataBlock.endOffset();
continue;
}
new DataBlockIndex(streamId, dataBlock.startOffset(), dataBlock.endOffsetDelta(),
dataBlock.recordCount(), nextBlockPosition, dataBlock.size()).encode(subIndexes);
if (groupSize == 0 // the first data block
|| (long) groupSize + dataBlock.size() > dataBlockGroupSizeThreshold
|| (long) groupRecordCount + dataBlock.recordCount() > Integer.MAX_VALUE
|| dataBlock.endOffset() - groupStartOffset > Integer.MAX_VALUE) {
if (groupSize != 0) {
new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset),
groupRecordCount, groupStartPosition, groupSize).encode(subIndexes);
}
groupStartOffset = dataBlock.startOffset();
groupStartPosition = nextBlockPosition;
groupSize = 0;
groupRecordCount = 0;
}
groupSize += dataBlock.size();
groupRecordCount += dataBlock.recordCount();
nextBlockPosition += dataBlock.size();
lastIndex = dataBlock;
}
writer.copyWrite(ObjectUtils.genKey(0, object.objectId()), validDataBlockStartPosition, basicObjectInfo.dataBlockSize());
objectSize += basicObjectInfo.dataBlockSize() - validDataBlockStartPosition;
indexes.addComponent(true, subIndexes);
compactedObjectIds.add(object.objectId());
}
}
if (lastIndex != null) {
ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE);
new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset),
groupRecordCount, groupStartPosition, groupSize).encode(subIndexes);
indexes.addComponent(true, subIndexes);
}

CompositeByteBuf indexBlockAndFooter = DirectByteBufAlloc.compositeByteBuffer();
indexBlockAndFooter.addComponent(true, indexes);
indexBlockAndFooter.addComponent(true, new ObjectWriter.Footer(nextBlockPosition, indexBlockAndFooter.readableBytes()).buffer());
Expand Down Expand Up @@ -205,6 +238,7 @@ public static class Builder {
private S3Operator s3Operator;
private S3Stream stream;
private long maxStreamObjectSize;
private int dataBlockGroupSizeThreshold = DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD;

public Builder objectManager(ObjectManager objectManager) {
this.objectManager = objectManager;
Expand Down Expand Up @@ -234,8 +268,13 @@ public Builder maxStreamObjectSize(long maxStreamObjectSize) {
return this;
}

public Builder dataBlockGroupSizeThreshold(int dataBlockGroupSizeThreshold) {
this.dataBlockGroupSizeThreshold = dataBlockGroupSizeThreshold;
return this;
}

public StreamObjectCompactor build() {
return new StreamObjectCompactor(objectManager, s3Operator, stream, maxStreamObjectSize);
return new StreamObjectCompactor(objectManager, s3Operator, stream, maxStreamObjectSize, dataBlockGroupSizeThreshold);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public void registerListener(BiConsumer<DataBlockRecords, Throwable> listener) {
listeners.add(listener);
}

public void complete(ObjectReader.DataBlock dataBlock, Throwable ex) {
public void complete(ObjectReader.DataBlockGroup dataBlockGroup, Throwable ex) {
if (ex == null) {
records = new ArrayList<>(dataBlock.recordCount());
try (CloseableIterator<StreamRecordBatch> it = dataBlock.iterator()) {
records = new ArrayList<>(dataBlockGroup.recordCount());
try (CloseableIterator<StreamRecordBatch> it = dataBlockGroup.iterator()) {
while (it.hasNext()) {
records.add(it.next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public void testUpload() throws Exception {
ObjectReader objectReader = new ObjectReader(s3ObjectMetadata, s3Operator);
DataBlockIndex blockIndex = objectReader.find(234, 20, 24).get()
.streamDataBlocks().get(0).dataBlockIndex();
ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get();
try (CloseableIterator<StreamRecordBatch> it = dataBlock.iterator()) {
ObjectReader.DataBlockGroup dataBlockGroup = objectReader.read(blockIndex).get();
try (CloseableIterator<StreamRecordBatch> it = dataBlockGroup.iterator()) {
StreamRecordBatch record = it.next();
assertEquals(20, record.getBaseOffset());
record = it.next();
Expand All @@ -134,8 +134,8 @@ record = it.next();
ObjectReader objectReader = new ObjectReader(streamObjectMetadata, s3Operator);
DataBlockIndex blockIndex = objectReader.find(233, 10, 16).get()
.streamDataBlocks().get(0).dataBlockIndex();
ObjectReader.DataBlock dataBlock = objectReader.read(blockIndex).get();
try (CloseableIterator<StreamRecordBatch> it = dataBlock.iterator()) {
ObjectReader.DataBlockGroup dataBlockGroup = objectReader.read(blockIndex).get();
try (CloseableIterator<StreamRecordBatch> it = dataBlockGroup.iterator()) {
StreamRecordBatch r1 = it.next();
assertEquals(10, r1.getBaseOffset());
r1.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class ObjectReaderTest {

private int recordCntToBlockSize(int recordCnt, int bodySize) {
return (bodySize + StreamRecordBatchCodec.HEADER_SIZE) * recordCnt + ObjectWriter.DataBlock.BLOCK_HEADER_SIZE;
return (bodySize + StreamRecordBatchCodec.HEADER_SIZE) * recordCnt + ObjectWriter.SingleDataBlockGroup.BLOCK_HEADER_SIZE;
}

@Test
Expand Down Expand Up @@ -114,11 +114,11 @@ public void testGetBasicObjectInfo() throws ExecutionException, InterruptedExcep
public void testReadBlockGroup() throws ExecutionException, InterruptedException {
S3Operator s3Operator = new MemoryS3Operator();
ByteBuf buf = DirectByteBufAlloc.byteBuffer(0);
buf.writeBytes(new ObjectWriter.DataBlock(233L, List.of(
buf.writeBytes(new ObjectWriter.SingleDataBlockGroup(233L, List.of(
new StreamRecordBatch(233L, 0, 10, 1, TestUtils.random(100)),
new StreamRecordBatch(233L, 0, 11, 2, TestUtils.random(100))
)).buffer());
buf.writeBytes(new ObjectWriter.DataBlock(233L, List.of(
buf.writeBytes(new ObjectWriter.SingleDataBlockGroup(233L, List.of(
new StreamRecordBatch(233L, 0, 13, 1, TestUtils.random(100))
)).buffer());
int indexPosition = buf.readableBytes();
Expand All @@ -131,9 +131,9 @@ public void testReadBlockGroup() throws ExecutionException, InterruptedException
try (ObjectReader reader = new ObjectReader(new S3ObjectMetadata(1L, objectSize, S3ObjectType.STREAM), s3Operator)) {
ObjectReader.FindIndexResult rst = reader.find(233L, 10L, 14L, 1024).get();
assertEquals(1, rst.streamDataBlocks().size());
try (ObjectReader.DataBlock dataBlock = reader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get()) {
assertEquals(3, dataBlock.recordCount());
Iterator<StreamRecordBatch> it = dataBlock.iterator();
try (ObjectReader.DataBlockGroup dataBlockGroup = reader.read(rst.streamDataBlocks().get(0).dataBlockIndex()).get()) {
assertEquals(3, dataBlockGroup.recordCount());
Iterator<StreamRecordBatch> it = dataBlockGroup.iterator();
assertEquals(10, it.next().getBaseOffset());
assertEquals(11, it.next().getBaseOffset());
assertEquals(13, it.next().getBaseOffset());
Expand Down
Loading

0 comments on commit 0eb5d36

Please sign in to comment.