Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.common.message.AutomqGetPartitionSnapshotResponseData;

import com.automq.stream.s3.ConfirmWAL;
import com.automq.stream.s3.StreamRecordBatchCodec;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.wal.RecordOffset;

Expand Down Expand Up @@ -184,8 +183,7 @@ public static List<StreamRecordBatch> decodeDeltaRecords(byte[] data) {
List<StreamRecordBatch> records = new ArrayList<>();
ByteBuf buf = Unpooled.wrappedBuffer(data);
while (buf.readableBytes() > 0) {
StreamRecordBatch record = StreamRecordBatchCodec.sliceRetainDecode(buf);
record.encoded();
StreamRecordBatch record = StreamRecordBatch.parse(buf, false);
records.add(record);
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.CompletableFuture;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class DefaultLinkRecordDecoder implements com.automq.stream.api.LinkRecordDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLinkRecordDecoder.class);
Expand Down Expand Up @@ -61,11 +60,8 @@ public CompletableFuture<StreamRecordBatch> decode(StreamRecordBatch src) {
recordBatch.setLastOffset(linkRecord.lastOffset());
recordBatch.setMaxTimestamp(linkRecord.timestampType(), linkRecord.maxTimestamp());
recordBatch.setPartitionLeaderEpoch(linkRecord.partitionLeaderEpoch());
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(src.getStreamId(), src.getEpoch(), src.getBaseOffset(),
-src.getCount(), Unpooled.wrappedBuffer(records.buffer()));
// The buf will be release after the finally block, so we need copy the data by #encoded.
streamRecordBatch.encoded(SnapshotReadCache.ENCODE_ALLOC);
return streamRecordBatch;
return StreamRecordBatch.of(src.getStreamId(), src.getEpoch(), src.getBaseOffset(),
-src.getCount(), records.buffer(), SnapshotReadCache.ENCODE_ALLOC);
} finally {
buf.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ public CompletableFuture<AppendResult> append(int targetNodeId, short orderHint,
}

CompletableFuture<AppendResult> append0(int targetNodeId, short orderHint, ByteBuf data) {
StreamRecordBatch record = new StreamRecordBatch(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data);
record.encoded();
record.retain();
StreamRecordBatch record = StreamRecordBatch.of(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data);
for (; ; ) {
try {
record.retain();
return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> {
readLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public RecordBatch nextBatch() throws IOException {
ElasticStreamSlice slice = elasticLogFileRecords.streamSlice;
byte[] bytes = new byte[streamRecord.rawPayload().remaining()];
streamRecord.rawPayload().get(bytes);
LOGGER.error("next batch parse error, stream={} baseOffset={} payload={}", slice.stream().streamId(), slice.sliceRange().start() + streamRecord.baseOffset(), bytes);
LOGGER.error("next batch parse error, stream={} baseOffset={} payload={}", slice.stream().streamId(), slice.sliceRange().start() + streamRecord.baseOffset(), bytes, e);
throw new RuntimeException(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ public void testOnAppend_bufferExceed() {
}

int onAppend(long recordBaseOffset) {
StreamRecordBatch record = new StreamRecordBatch(1, 2, recordBaseOffset, 1, Unpooled.wrappedBuffer(new byte[1024]));
StreamRecordBatch record = StreamRecordBatch.of(1, 2, recordBaseOffset, 1, Unpooled.wrappedBuffer(new byte[1024]));
nextWalOffset = walOffset + record.encoded().readableBytes();
record.encoded();
delta.onAppend(
record,
DefaultRecordOffset.of(1, walOffset, record.encoded().readableBytes()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,7 @@ private Optional<ObjectReader> mockObjectReader(
objectWriter.write(
range.streamId(),
List.of(
new StreamRecordBatch(range.streamId(), 0, range.startOffset(), (int) (range.endOffset() - range.startOffset()), Unpooled.buffer(1))
StreamRecordBatch.of(range.streamId(), 0, range.startOffset(), (int) (range.endOffset() - range.startOffset()), Unpooled.buffer(1))
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ public StreamRecordBatch next() {
buf.skipBytes(4);
}
currentBlockRecordCount.decrementAndGet();
return copy ? StreamRecordBatchCodec.duplicateDecode(buf) : StreamRecordBatchCodec.sliceRetainDecode(buf);
return StreamRecordBatch.parse(buf, copy);
}

@Override
Expand Down
8 changes: 2 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,6 @@ public void shutdown() {
public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch streamRecord) {
final long startTime = System.nanoTime();
CompletableFuture<Void> cf = new CompletableFuture<>();
// encoded before append to free heap ByteBuf.
streamRecord.encoded();
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, null, cf, context);
append0(context, writeRequest, false);
return cf.whenComplete((nil, ex) -> {
Expand Down Expand Up @@ -596,7 +594,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
appendCf = deltaWAL.append(new TraceContext(context), streamRecord);
} else {
StreamRecordBatch record = request.record;
StreamRecordBatch linkStreamRecord = toLinkRecord(record, context.linkRecord().retain());
StreamRecordBatch linkStreamRecord = toLinkRecord(record, context.linkRecord().retainedSlice());
appendCf = deltaWAL.append(new TraceContext(context), linkStreamRecord);
}

Expand Down Expand Up @@ -1072,9 +1070,7 @@ public void close() {
}

static StreamRecordBatch toLinkRecord(StreamRecordBatch origin, ByteBuf link) {
StreamRecordBatch record = new StreamRecordBatch(origin.getStreamId(), origin.getEpoch(), origin.getBaseOffset(), -origin.getCount(), link);
record.encoded();
return record;
return StreamRecordBatch.of(origin.getStreamId(), origin.getEpoch(), origin.getBaseOffset(), -origin.getCount(), link);
}

public static class DeltaWALUploadTaskContext {
Expand Down
8 changes: 5 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private CompletableFuture<AppendResult> append0(AppendContext context, RecordBat
return FutureUtil.failedFuture(new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable"));
}
long offset = nextOffset.getAndAdd(recordBatch.count());
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));
StreamRecordBatch streamRecordBatch = StreamRecordBatch.of(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));
CompletableFuture<AppendResult> cf = storage.append(context, streamRecordBatch).thenApply(nil -> {
updateConfirmOffset(offset + recordBatch.count());
return new DefaultAppendResult(offset);
Expand Down Expand Up @@ -522,6 +522,8 @@ public DefaultFetchResult(List<StreamRecordBatch> streamRecords, CacheAccessType

private static RecordBatch convert(StreamRecordBatch streamRecordBatch, boolean pooledBuf) {
ByteBuffer buf;
// We shouldn't access the StreamRecordBatch after release it.
int count = streamRecordBatch.getCount();
if (pooledBuf) {
buf = streamRecordBatch.getPayload().nioBuffer();
} else {
Expand All @@ -532,12 +534,12 @@ private static RecordBatch convert(StreamRecordBatch streamRecordBatch, boolean
return new RecordBatch() {
@Override
public int count() {
return streamRecordBatch.getCount();
return count;
}

@Override
public long baseTimestamp() {
return streamRecordBatch.getEpoch();
return 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@

package com.automq.stream.s3;

import com.automq.stream.ByteBufSeqAlloc;
import com.automq.stream.s3.model.StreamRecordBatch;

import io.netty.buffer.ByteBuf;

public class StreamRecordBatchCodec {
public static final byte MAGIC_V0 = 0x22;
public static final int HEADER_SIZE =
Expand All @@ -33,64 +28,11 @@ public class StreamRecordBatchCodec {
+ 8 // baseOffset
+ 4 // lastOffsetDelta
+ 4; // payload length

public static ByteBuf encode(StreamRecordBatch streamRecord, ByteBufSeqAlloc alloc) {
int totalLength = HEADER_SIZE + streamRecord.size(); // payload
// use sequential allocator to avoid memory fragmentation
ByteBuf buf = alloc.byteBuffer(totalLength);
buf.writeByte(MAGIC_V0);
buf.writeLong(streamRecord.getStreamId());
buf.writeLong(streamRecord.getEpoch());
buf.writeLong(streamRecord.getBaseOffset());
buf.writeInt(streamRecord.getCount());
buf.writeInt(streamRecord.size());
buf.writeBytes(streamRecord.getPayload().duplicate());
return buf;
}

/**
* Decode a stream record batch from a byte buffer and move the reader index.
* The returned stream record batch does NOT share the payload buffer with the input buffer.
*/
public static StreamRecordBatch duplicateDecode(ByteBuf buf) {
byte magic = buf.readByte(); // magic
if (magic != MAGIC_V0) {
throw new RuntimeException("Invalid magic byte " + magic);
}
long streamId = buf.readLong();
long epoch = buf.readLong();
long baseOffset = buf.readLong();
int lastOffsetDelta = buf.readInt();
int payloadLength = buf.readInt();
ByteBuf payload = ByteBufAlloc.byteBuffer(payloadLength, ByteBufAlloc.DECODE_RECORD);
buf.readBytes(payload);
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
}

/**
* Decode a stream record batch from a byte buffer and move the reader index.
* The returned stream record batch shares the payload buffer with the input buffer.
*/
public static StreamRecordBatch decode(ByteBuf buf, boolean retain) {
byte magic = buf.readByte(); // magic
if (magic != MAGIC_V0) {
throw new RuntimeException("Invalid magic byte " + magic);
}
long streamId = buf.readLong();
long epoch = buf.readLong();
long baseOffset = buf.readLong();
int lastOffsetDelta = buf.readInt();
int payloadLength = buf.readInt();
ByteBuf payload = retain ? buf.retainedSlice(buf.readerIndex(), payloadLength) : buf.slice(buf.readerIndex(), payloadLength);
buf.skipBytes(payloadLength);
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
}

public static StreamRecordBatch decode(ByteBuf buf) {
return decode(buf, false);
}

public static StreamRecordBatch sliceRetainDecode(ByteBuf buf) {
return decode(buf, true);
}
public static final int MAGIC_POS = 0;
public static final int STREAM_ID_POS = 1;
public static final int EPOCH_POS = STREAM_ID_POS + 8;
public static final int BASE_OFFSET_POS = EPOCH_POS + 8;
public static final int LAST_OFFSET_DELTA_POS = BASE_OFFSET_POS + 8;
public static final int PAYLOAD_LENGTH_POS = LAST_OFFSET_DELTA_POS + 4;
public static final int PAYLOAD_POS = PAYLOAD_LENGTH_POS + 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -286,7 +287,16 @@ public void run() {
return;
}
records.addAll(cfList.stream().map(CompletableFuture::join).toList());
records.forEach(r -> r.encoded(ENCODE_ALLOC));
ListIterator<StreamRecordBatch> it = records.listIterator();
while (it.hasNext()) {
StreamRecordBatch record = it.next();
try {
// Copy the record to the SeqAlloc to reduce fragmentation.
it.set(StreamRecordBatch.parse(record.encoded(), true, ENCODE_ALLOC));
} finally {
record.release();
}
}
loadCf.complete(null);
});
}).whenComplete((rst, ex) -> {
Expand Down
Loading
Loading