Skip to content

Commit 7535e76

Browse files
authored
refactor(StreamRecordBatch): replace constructor with static factory method for improved clarity (#3109)
Signed-off-by: Robin Han <[email protected]>
1 parent 60b1ec6 commit 7535e76

32 files changed

+317
-239
lines changed

core/src/main/java/kafka/automq/partition/snapshot/ConfirmWalDataDelta.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.kafka.common.message.AutomqGetPartitionSnapshotResponseData;
2323

2424
import com.automq.stream.s3.ConfirmWAL;
25-
import com.automq.stream.s3.StreamRecordBatchCodec;
2625
import com.automq.stream.s3.model.StreamRecordBatch;
2726
import com.automq.stream.s3.wal.RecordOffset;
2827

@@ -184,8 +183,7 @@ public static List<StreamRecordBatch> decodeDeltaRecords(byte[] data) {
184183
List<StreamRecordBatch> records = new ArrayList<>();
185184
ByteBuf buf = Unpooled.wrappedBuffer(data);
186185
while (buf.readableBytes() > 0) {
187-
StreamRecordBatch record = StreamRecordBatchCodec.sliceRetainDecode(buf);
188-
record.encoded();
186+
StreamRecordBatch record = StreamRecordBatch.parse(buf, false);
189187
records.add(record);
190188
}
191189
return records;

core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.concurrent.CompletableFuture;
3232

3333
import io.netty.buffer.ByteBuf;
34-
import io.netty.buffer.Unpooled;
3534

3635
public class DefaultLinkRecordDecoder implements com.automq.stream.api.LinkRecordDecoder {
3736
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLinkRecordDecoder.class);
@@ -61,11 +60,8 @@ public CompletableFuture<StreamRecordBatch> decode(StreamRecordBatch src) {
6160
recordBatch.setLastOffset(linkRecord.lastOffset());
6261
recordBatch.setMaxTimestamp(linkRecord.timestampType(), linkRecord.maxTimestamp());
6362
recordBatch.setPartitionLeaderEpoch(linkRecord.partitionLeaderEpoch());
64-
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(src.getStreamId(), src.getEpoch(), src.getBaseOffset(),
65-
-src.getCount(), Unpooled.wrappedBuffer(records.buffer()));
66-
// The buf will be release after the finally block, so we need copy the data by #encoded.
67-
streamRecordBatch.encoded(SnapshotReadCache.ENCODE_ALLOC);
68-
return streamRecordBatch;
63+
return StreamRecordBatch.of(src.getStreamId(), src.getEpoch(), src.getBaseOffset(),
64+
-src.getCount(), records.buffer(), SnapshotReadCache.ENCODE_ALLOC);
6965
} finally {
7066
buf.release();
7167
}

core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,10 @@ public CompletableFuture<AppendResult> append(int targetNodeId, short orderHint,
8383
}
8484

8585
CompletableFuture<AppendResult> append0(int targetNodeId, short orderHint, ByteBuf data) {
86-
StreamRecordBatch record = new StreamRecordBatch(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data);
87-
record.encoded();
88-
record.retain();
86+
StreamRecordBatch record = StreamRecordBatch.of(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data);
8987
for (; ; ) {
9088
try {
89+
record.retain();
9190
return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> {
9291
readLock.lock();
9392
try {

core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public RecordBatch nextBatch() throws IOException {
457457
ElasticStreamSlice slice = elasticLogFileRecords.streamSlice;
458458
byte[] bytes = new byte[streamRecord.rawPayload().remaining()];
459459
streamRecord.rawPayload().get(bytes);
460-
LOGGER.error("next batch parse error, stream={} baseOffset={} payload={}", slice.stream().streamId(), slice.sliceRange().start() + streamRecord.baseOffset(), bytes);
460+
LOGGER.error("next batch parse error, stream={} baseOffset={} payload={}", slice.stream().streamId(), slice.sliceRange().start() + streamRecord.baseOffset(), bytes, e);
461461
throw new RuntimeException(e);
462462
}
463463
});

core/src/test/java/kafka/automq/partition/snapshot/ConfirmWalDataDeltaTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,8 @@ public void testOnAppend_bufferExceed() {
135135
}
136136

137137
int onAppend(long recordBaseOffset) {
138-
StreamRecordBatch record = new StreamRecordBatch(1, 2, recordBaseOffset, 1, Unpooled.wrappedBuffer(new byte[1024]));
138+
StreamRecordBatch record = StreamRecordBatch.of(1, 2, recordBaseOffset, 1, Unpooled.wrappedBuffer(new byte[1024]));
139139
nextWalOffset = walOffset + record.encoded().readableBytes();
140-
record.encoded();
141140
delta.onAppend(
142141
record,
143142
DefaultRecordOffset.of(1, walOffset, record.encoded().readableBytes()),

metadata/src/test/java/org/apache/kafka/controller/stream/StreamControlManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1537,7 +1537,7 @@ private Optional<ObjectReader> mockObjectReader(
15371537
objectWriter.write(
15381538
range.streamId(),
15391539
List.of(
1540-
new StreamRecordBatch(range.streamId(), 0, range.startOffset(), (int) (range.endOffset() - range.startOffset()), Unpooled.buffer(1))
1540+
StreamRecordBatch.of(range.streamId(), 0, range.startOffset(), (int) (range.endOffset() - range.startOffset()), Unpooled.buffer(1))
15411541
)
15421542
)
15431543
);

s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ public StreamRecordBatch next() {
603603
buf.skipBytes(4);
604604
}
605605
currentBlockRecordCount.decrementAndGet();
606-
return copy ? StreamRecordBatchCodec.duplicateDecode(buf) : StreamRecordBatchCodec.sliceRetainDecode(buf);
606+
return StreamRecordBatch.parse(buf, copy);
607607
}
608608

609609
@Override

s3stream/src/main/java/com/automq/stream/s3/S3Storage.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,6 @@ public void shutdown() {
546546
public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch streamRecord) {
547547
final long startTime = System.nanoTime();
548548
CompletableFuture<Void> cf = new CompletableFuture<>();
549-
// encoded before append to free heap ByteBuf.
550-
streamRecord.encoded();
551549
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, null, cf, context);
552550
append0(context, writeRequest, false);
553551
return cf.whenComplete((nil, ex) -> {
@@ -588,7 +586,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
588586
appendCf = deltaWAL.append(new TraceContext(context), streamRecord);
589587
} else {
590588
StreamRecordBatch record = request.record;
591-
StreamRecordBatch linkStreamRecord = toLinkRecord(record, context.linkRecord().retain());
589+
StreamRecordBatch linkStreamRecord = toLinkRecord(record, context.linkRecord().retainedSlice());
592590
appendCf = deltaWAL.append(new TraceContext(context), linkStreamRecord);
593591
}
594592

@@ -1064,9 +1062,7 @@ public void close() {
10641062
}
10651063

10661064
static StreamRecordBatch toLinkRecord(StreamRecordBatch origin, ByteBuf link) {
1067-
StreamRecordBatch record = new StreamRecordBatch(origin.getStreamId(), origin.getEpoch(), origin.getBaseOffset(), -origin.getCount(), link);
1068-
record.encoded();
1069-
return record;
1065+
return StreamRecordBatch.of(origin.getStreamId(), origin.getEpoch(), origin.getBaseOffset(), -origin.getCount(), link);
10701066
}
10711067

10721068
public static class DeltaWALUploadTaskContext {

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private CompletableFuture<AppendResult> append0(AppendContext context, RecordBat
216216
return FutureUtil.failedFuture(new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + "stream is not writable"));
217217
}
218218
long offset = nextOffset.getAndAdd(recordBatch.count());
219-
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));
219+
StreamRecordBatch streamRecordBatch = StreamRecordBatch.of(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));
220220
CompletableFuture<AppendResult> cf = storage.append(context, streamRecordBatch).thenApply(nil -> {
221221
updateConfirmOffset(offset + recordBatch.count());
222222
return new DefaultAppendResult(offset);
@@ -523,6 +523,8 @@ public DefaultFetchResult(List<StreamRecordBatch> streamRecords, CacheAccessType
523523

524524
private static RecordBatch convert(StreamRecordBatch streamRecordBatch, boolean pooledBuf) {
525525
ByteBuffer buf;
526+
// We shouldn't access the StreamRecordBatch after release it.
527+
int count = streamRecordBatch.getCount();
526528
if (pooledBuf) {
527529
buf = streamRecordBatch.getPayload().nioBuffer();
528530
} else {
@@ -533,12 +535,12 @@ private static RecordBatch convert(StreamRecordBatch streamRecordBatch, boolean
533535
return new RecordBatch() {
534536
@Override
535537
public int count() {
536-
return streamRecordBatch.getCount();
538+
return count;
537539
}
538540

539541
@Override
540542
public long baseTimestamp() {
541-
return streamRecordBatch.getEpoch();
543+
return 0;
542544
}
543545

544546
@Override

s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java

Lines changed: 7 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@
1919

2020
package com.automq.stream.s3;
2121

22-
import com.automq.stream.ByteBufSeqAlloc;
23-
import com.automq.stream.s3.model.StreamRecordBatch;
24-
25-
import io.netty.buffer.ByteBuf;
26-
2722
public class StreamRecordBatchCodec {
2823
public static final byte MAGIC_V0 = 0x22;
2924
public static final int HEADER_SIZE =
@@ -33,64 +28,11 @@ public class StreamRecordBatchCodec {
3328
+ 8 // baseOffset
3429
+ 4 // lastOffsetDelta
3530
+ 4; // payload length
36-
37-
public static ByteBuf encode(StreamRecordBatch streamRecord, ByteBufSeqAlloc alloc) {
38-
int totalLength = HEADER_SIZE + streamRecord.size(); // payload
39-
// use sequential allocator to avoid memory fragmentation
40-
ByteBuf buf = alloc.byteBuffer(totalLength);
41-
buf.writeByte(MAGIC_V0);
42-
buf.writeLong(streamRecord.getStreamId());
43-
buf.writeLong(streamRecord.getEpoch());
44-
buf.writeLong(streamRecord.getBaseOffset());
45-
buf.writeInt(streamRecord.getCount());
46-
buf.writeInt(streamRecord.size());
47-
buf.writeBytes(streamRecord.getPayload().duplicate());
48-
return buf;
49-
}
50-
51-
/**
52-
* Decode a stream record batch from a byte buffer and move the reader index.
53-
* The returned stream record batch does NOT share the payload buffer with the input buffer.
54-
*/
55-
public static StreamRecordBatch duplicateDecode(ByteBuf buf) {
56-
byte magic = buf.readByte(); // magic
57-
if (magic != MAGIC_V0) {
58-
throw new RuntimeException("Invalid magic byte " + magic);
59-
}
60-
long streamId = buf.readLong();
61-
long epoch = buf.readLong();
62-
long baseOffset = buf.readLong();
63-
int lastOffsetDelta = buf.readInt();
64-
int payloadLength = buf.readInt();
65-
ByteBuf payload = ByteBufAlloc.byteBuffer(payloadLength, ByteBufAlloc.DECODE_RECORD);
66-
buf.readBytes(payload);
67-
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
68-
}
69-
70-
/**
71-
* Decode a stream record batch from a byte buffer and move the reader index.
72-
* The returned stream record batch shares the payload buffer with the input buffer.
73-
*/
74-
public static StreamRecordBatch decode(ByteBuf buf, boolean retain) {
75-
byte magic = buf.readByte(); // magic
76-
if (magic != MAGIC_V0) {
77-
throw new RuntimeException("Invalid magic byte " + magic);
78-
}
79-
long streamId = buf.readLong();
80-
long epoch = buf.readLong();
81-
long baseOffset = buf.readLong();
82-
int lastOffsetDelta = buf.readInt();
83-
int payloadLength = buf.readInt();
84-
ByteBuf payload = retain ? buf.retainedSlice(buf.readerIndex(), payloadLength) : buf.slice(buf.readerIndex(), payloadLength);
85-
buf.skipBytes(payloadLength);
86-
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
87-
}
88-
89-
public static StreamRecordBatch decode(ByteBuf buf) {
90-
return decode(buf, false);
91-
}
92-
93-
public static StreamRecordBatch sliceRetainDecode(ByteBuf buf) {
94-
return decode(buf, true);
95-
}
31+
public static final int MAGIC_POS = 0;
32+
public static final int STREAM_ID_POS = 1;
33+
public static final int EPOCH_POS = STREAM_ID_POS + 8;
34+
public static final int BASE_OFFSET_POS = EPOCH_POS + 8;
35+
public static final int LAST_OFFSET_DELTA_POS = BASE_OFFSET_POS + 8;
36+
public static final int PAYLOAD_LENGTH_POS = LAST_OFFSET_DELTA_POS + 4;
37+
public static final int PAYLOAD_POS = PAYLOAD_LENGTH_POS + 4;
9638
}

0 commit comments

Comments
 (0)