diff --git a/cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java b/cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java index f4067ccc6..c299732b5 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/ConsumeMessage.java @@ -21,6 +21,7 @@ import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.GroupType; +import apache.rocketmq.controller.v1.SubscriptionMode; import com.automq.rocketmq.cli.tools.CliUtils; import com.automq.rocketmq.common.PrefixThreadFactory; import com.automq.rocketmq.common.exception.ControllerException; @@ -175,6 +176,7 @@ private void prepareConsumerGroup(String consumerGroup) throws IOException { .setName(groupName) .setMaxDeliveryAttempt(16) .setGroupType(groupType) + .setSubMode(SubscriptionMode.SUB_MODE_POP) .build(); CompletableFuture groupCf = client.createGroup(mqAdmin.endpoint, request); diff --git a/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java b/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java index 8bce78dee..2c7910c39 100644 --- a/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java @@ -20,14 +20,18 @@ import com.automq.rocketmq.store.exception.StoreException; import com.automq.rocketmq.store.model.generated.CheckPoint; import com.automq.rocketmq.store.model.generated.ReceiptHandle; +import com.automq.rocketmq.store.model.operation.AckOperation; import com.automq.rocketmq.store.model.operation.AckOperation.AckOperationType; import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation; +import com.automq.rocketmq.store.model.operation.Operation; import com.automq.rocketmq.store.model.operation.OperationSnapshot; +import com.automq.rocketmq.store.model.operation.PopOperation; import com.automq.rocketmq.store.model.operation.PopOperation.PopOperationType; import com.automq.rocketmq.store.model.operation.ResetConsumeOffsetOperation; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentSkipListMap; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @@ -211,4 +215,71 @@ void encodeOperationSnapshot() { assertEquals(retryAckBitmap, decodedRetryAckBitmap); assertEquals(operationSnapshot.getConsumerGroupMetadataList().get(0).getConsumeTimes(), decodedOperationSnapshot.getConsumerGroupMetadataList().get(0).getConsumeTimes()); } + + @Test + public void testAckOperationCodec() throws StoreException { + AckOperation op = new AckOperation(1, 2, 3, 4, null, + 5, 6, 7, AckOperationType.ACK_NORMAL); + byte[] data = SerializeUtil.encodeAckOperation(op); + Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4); + assertBasicOps(ops); + Assertions.assertTrue(ops instanceof AckOperation); + AckOperation ackOps = (AckOperation) ops; + Assertions.assertEquals(ackOps.consumerGroupId(), 5); + Assertions.assertEquals(ackOps.operationId(), 6); + Assertions.assertEquals(ackOps.ackOperationType(), AckOperationType.ACK_NORMAL); + } + + private void assertBasicOps(Operation ops) { + Assertions.assertEquals(ops.topicId(), 1); + Assertions.assertEquals(ops.queueId(), 2); + Assertions.assertEquals(ops.operationStreamId(), 3); + Assertions.assertEquals(ops.snapshotStreamId(), 4); + Assertions.assertEquals(ops.operationTimestamp(), 7); + } + + @Test + public void testChangeInvisibleTimeOperationCodec() throws StoreException { + ChangeInvisibleDurationOperation op = new ChangeInvisibleDurationOperation(1, 2, 3, + 4, null, 5, 6, 8, 7); + byte[] data = SerializeUtil.encodeChangeInvisibleDurationOperation(op); + Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4); + assertBasicOps(ops); + Assertions.assertTrue(ops instanceof ChangeInvisibleDurationOperation); + ChangeInvisibleDurationOperation cOps = (ChangeInvisibleDurationOperation) ops; + Assertions.assertEquals(cOps.consumerGroupId(), 5); + Assertions.assertEquals(cOps.operationId(), 6); + Assertions.assertEquals(cOps.invisibleDuration(), 8); + } + + @Test + public void testPopOperationCodec() throws StoreException { + PopOperation op = new PopOperation(1, 2, 3, 4, null, 5, 6, + 8, 9, 7, true, PopOperationType.POP_ORDER); + byte[] data = SerializeUtil.encodePopOperation(op); + Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4); + assertBasicOps(ops); + Assertions.assertTrue(ops instanceof PopOperation); + PopOperation pOps = (PopOperation) ops; + Assertions.assertEquals(5, pOps.consumerGroupId()); + Assertions.assertEquals(6, pOps.offset()); + Assertions.assertEquals(8, pOps.count()); + Assertions.assertEquals(9, pOps.invisibleDuration()); + Assertions.assertTrue(pOps.isEndMark()); + Assertions.assertEquals(PopOperationType.POP_ORDER, pOps.popOperationType()); + } + + @Test + public void testResetConsumeOffsetOperationCodec() throws StoreException { + ResetConsumeOffsetOperation op = new ResetConsumeOffsetOperation(1, 2, 3, + 4, null, 5, 6, 7); + byte[] data = SerializeUtil.encodeResetConsumeOffsetOperation(op); + Operation ops = SerializeUtil.decodeOperation(ByteBuffer.wrap(data), null, 3, 4); + assertBasicOps(ops); + + Assertions.assertTrue(ops instanceof ResetConsumeOffsetOperation); + ResetConsumeOffsetOperation rOps = (ResetConsumeOffsetOperation) ops; + Assertions.assertEquals(5, rOps.consumerGroupId()); + Assertions.assertEquals(6, rOps.offset()); + } } \ No newline at end of file