Skip to content

Commit

Permalink
fix: add unit test for SerializeUtil codec
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui committed Nov 9, 2023
1 parent 36e4ce2 commit 8ccd508
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import com.automq.rocketmq.cli.tools.CliUtils;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.exception.ControllerException;
Expand Down Expand Up @@ -175,6 +176,7 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException {
.setName(groupName)
.setMaxDeliveryAttempt(16)
.setGroupType(groupType)
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.build();

CompletableFuture<CreateGroupReply> groupCf = client.createGroup(mqAdmin.endpoint, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.generated.CheckPoint;
import com.automq.rocketmq.store.model.generated.ReceiptHandle;
import com.automq.rocketmq.store.model.operation.AckOperation;
import com.automq.rocketmq.store.model.operation.AckOperation.AckOperationType;
import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation;
import com.automq.rocketmq.store.model.operation.Operation;
import com.automq.rocketmq.store.model.operation.OperationSnapshot;
import com.automq.rocketmq.store.model.operation.PopOperation;
import com.automq.rocketmq.store.model.operation.PopOperation.PopOperationType;
import com.automq.rocketmq.store.model.operation.ResetConsumeOffsetOperation;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
Expand Down Expand Up @@ -211,4 +215,71 @@ void encodeOperationSnapshot() {
assertEquals(retryAckBitmap, decodedRetryAckBitmap);
assertEquals(operationSnapshot.getConsumerGroupMetadataList().get(0).getConsumeTimes(), decodedOperationSnapshot.getConsumerGroupMetadataList().get(0).getConsumeTimes());
}

@Test
public void testAckOperationCodec() throws StoreException {
AckOperation op = new AckOperation(1, 2, 3, 4, null,
5, 6, 7, AckOperationType.ACK_NORMAL);
byte[] data = SerializeUtil.encodeAckOperation(op);
Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4);
assertBasicOps(ops);
Assertions.assertTrue(ops instanceof AckOperation);
AckOperation ackOps = (AckOperation) ops;
Assertions.assertEquals(ackOps.consumerGroupId(), 5);
Assertions.assertEquals(ackOps.operationId(), 6);
Assertions.assertEquals(ackOps.ackOperationType(), AckOperationType.ACK_NORMAL);
}

private void assertBasicOps(Operation ops) {
Assertions.assertEquals(ops.topicId(), 1);
Assertions.assertEquals(ops.queueId(), 2);
Assertions.assertEquals(ops.operationStreamId(), 3);
Assertions.assertEquals(ops.snapshotStreamId(), 4);
Assertions.assertEquals(ops.operationTimestamp(), 7);
}

@Test
public void testChangeInvisibleTimeOperationCodec() throws StoreException {
ChangeInvisibleDurationOperation op = new ChangeInvisibleDurationOperation(1, 2, 3,
4, null, 5, 6, 8, 7);
byte[] data = SerializeUtil.encodeChangeInvisibleDurationOperation(op);
Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4);
assertBasicOps(ops);
Assertions.assertTrue(ops instanceof ChangeInvisibleDurationOperation);
ChangeInvisibleDurationOperation cOps = (ChangeInvisibleDurationOperation) ops;
Assertions.assertEquals(cOps.consumerGroupId(), 5);
Assertions.assertEquals(cOps.operationId(), 6);
Assertions.assertEquals(cOps.invisibleDuration(), 8);
}

@Test
public void testPopOperationCodec() throws StoreException {
PopOperation op = new PopOperation(1, 2, 3, 4, null, 5, 6,
8, 9, 7, true, PopOperationType.POP_ORDER);
byte[] data = SerializeUtil.encodePopOperation(op);
Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4);
assertBasicOps(ops);
Assertions.assertTrue(ops instanceof PopOperation);
PopOperation pOps = (PopOperation) ops;
Assertions.assertEquals(5, pOps.consumerGroupId());
Assertions.assertEquals(6, pOps.offset());
Assertions.assertEquals(8, pOps.count());
Assertions.assertEquals(9, pOps.invisibleDuration());
Assertions.assertTrue(pOps.isEndMark());
Assertions.assertEquals(PopOperationType.POP_ORDER, pOps.popOperationType());
}

@Test
public void testResetConsumeOffsetOperationCodec() throws StoreException {
ResetConsumeOffsetOperation op = new ResetConsumeOffsetOperation(1, 2, 3,
4, null, 5, 6, 7);
byte[] data = SerializeUtil.encodeResetConsumeOffsetOperation(op);
Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4);
assertBasicOps(ops);

Assertions.assertTrue(ops instanceof ResetConsumeOffsetOperation);
ResetConsumeOffsetOperation rOps = (ResetConsumeOffsetOperation) ops;
Assertions.assertEquals(5, rOps.consumerGroupId());
Assertions.assertEquals(6, rOps.offset());
}
}

0 comments on commit 8ccd508

Please sign in to comment.