From 2dfb56ab197400882fb18fa5df04d99aab91feb1 Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Fri, 19 Jul 2024 12:37:41 +0900 Subject: [PATCH] [fix][broker] Introduce the last sent position to fix message ordering issues in Key_Shared (PIP-282) (#21953) --- .../mledger/impl/ManagedCursorImpl.java | 13 + .../mledger/impl/ManagedLedgerImpl.java | 2 +- .../pulsar/broker/service/Consumer.java | 10 +- ...tStickyKeyDispatcherMultipleConsumers.java | 195 +++++++-- .../persistent/PersistentSubscription.java | 19 +- .../pulsar/broker/admin/AdminApiTest.java | 196 ++++++++- ...ckyKeyDispatcherMultipleConsumersTest.java | 357 ++++++++++++++++ .../broker/stats/ConsumerStatsTest.java | 2 +- .../client/api/KeySharedSubscriptionTest.java | 399 +++++++++++++++++- .../common/policies/data/ConsumerStats.java | 4 +- .../policies/data/SubscriptionStats.java | 6 + .../data/stats/ConsumerStatsImpl.java | 10 +- .../data/stats/SubscriptionStatsImpl.java | 6 + 13 files changed, 1158 insertions(+), 61 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 4ef9678f3e1809..f99ee957e025af 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3472,6 +3472,19 @@ public LongPairRangeSet getIndividuallyDeletedMessagesSet() { return individualDeletedMessages; } + public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition( + LongPairRangeSet.RangeProcessor processor) { + final Position mdp; + lock.readLock().lock(); + try { + mdp = markDeletePosition; + individualDeletedMessages.forEach(processor); + } finally { + lock.readLock().unlock(); + } + return mdp; + } + public boolean isMessageDeleted(Position position) { lock.readLock().lock(); try { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b7734906f7553e..209bf57b24f0fd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3497,7 +3497,7 @@ private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UU * the position range * @return the count of entries */ - long getNumberOfEntries(Range range) { + public long getNumberOfEntries(Range range) { Position fromPosition = range.lowerEndpoint(); boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; Position toPosition = range.upperEndpoint(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 02e21c44c9179f..dca64395d8674e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -145,7 +145,7 @@ public class Consumer { private static final double avgPercent = 0.9; private boolean preciseDispatcherFlowControl; - private Position readPositionWhenJoining; + private Position lastSentPositionWhenJoining; private final String clientAddress; // IP address only, no port number included private final MessageId startMessageId; private final boolean isAcknowledgmentAtBatchIndexLevelEnabled; @@ -931,8 +931,8 @@ public ConsumerStatsImpl getStats() { stats.unackedMessages = unackedMessages; stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs; stats.avgMessagesPerEntry = getAvgMessagesPerEntry(); - if (readPositionWhenJoining != null) { - stats.readPositionWhenJoining = readPositionWhenJoining.toString(); + if (lastSentPositionWhenJoining != null) { + stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString(); } return stats; } @@ -1166,8 +1166,8 @@ public boolean isPreciseDispatcherFlowControl() { return preciseDispatcherFlowControl; } - public void setReadPositionWhenJoining(Position readPositionWhenJoining) { - this.readPositionWhenJoining = readPositionWhenJoining; + public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) { + this.lastSentPositionWhenJoining = lastSentPositionWhenJoining; } public int getMaxUnackedMessages() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 766f45ad9908c6..91cec1f8e90710 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -34,9 +34,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.collections4.MapUtils; import org.apache.pulsar.broker.ServiceConfiguration; @@ -55,6 +58,8 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,12 +78,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi */ private final LinkedHashMap recentlyJoinedConsumers; + /** + * The lastSentPosition and the individuallySentPositions are not thread safe. + */ + @Nullable + private Position lastSentPosition; + private final LongPairRangeSet individuallySentPositions; + private static final LongPairRangeSet.LongPairConsumer positionRangeConverter = PositionFactory::create; + PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); + this.individuallySentPositions = + allowOutOfOrderDelivery ? null : new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter); this.keySharedMode = ksm.getKeySharedMode(); switch (this.keySharedMode) { case AUTO_SPLIT: @@ -124,15 +139,18 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { }) ).thenRun(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { - Position readPositionWhenJoining = cursor.getReadPosition(); - consumer.setReadPositionWhenJoining(readPositionWhenJoining); - // If this was the 1st consumer, or if all the messages are already acked, then we - // don't need to do anything special - if (!allowOutOfOrderDelivery - && recentlyJoinedConsumers != null - && consumerList.size() > 1 - && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { - recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); + if (!allowOutOfOrderDelivery) { + final Position lastSentPositionWhenJoining = updateIfNeededAndGetLastSentPosition(); + if (lastSentPositionWhenJoining != null) { + consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining); + // If this was the 1st consumer, or if all the messages are already acked, then we + // don't need to do anything special + if (recentlyJoinedConsumers != null + && consumerList.size() > 1 + && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { + recentlyJoinedConsumers.put(consumer, lastSentPositionWhenJoining); + } + } } } }); @@ -148,10 +166,16 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE // eventually causing all consumers to get stuck. selector.removeConsumer(consumer); super.removeConsumer(consumer); - if (recentlyJoinedConsumers != null) { + if (!allowOutOfOrderDelivery && recentlyJoinedConsumers != null) { recentlyJoinedConsumers.remove(consumer); if (consumerList.size() == 1) { recentlyJoinedConsumers.clear(); + } else if (consumerList.isEmpty()) { + // The subscription removes consumers if rewind or reset cursor operations are called. + // The dispatcher must clear lastSentPosition and individuallySentPositions because + // these operations trigger re-sending messages. + lastSentPosition = null; + individuallySentPositions.clear(); } if (removeConsumersFromRecentJoinedConsumers() || !redeliveryMessages.isEmpty()) { readMoreEntries(); @@ -193,9 +217,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis return false; } - // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. - // This may happen when consumer closed. See issue #12885 for details. if (!allowOutOfOrderDelivery) { + // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. + // This may happen when consumer closed. See issue #12885 for details. NavigableSet messagesToReplayNow = this.getMessagesToReplayNow(1); if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) { Position replayPosition = messagesToReplayNow.first(); @@ -229,6 +253,24 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } } + + // Update if the markDeletePosition move forward + updateIfNeededAndGetLastSentPosition(); + + // Should not access to individualDeletedMessages from outside managed cursor + // because it doesn't guarantee thread safety. + if (lastSentPosition == null) { + if (cursor.getMarkDeletedPosition() != null) { + lastSentPosition = ((ManagedCursorImpl) cursor) + .processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(range -> { + final Position lower = range.lowerEndpoint(); + final Position upper = range.upperEndpoint(); + individuallySentPositions.addOpenClosed(lower.getLedgerId(), lower.getEntryId(), + upper.getLedgerId(), upper.getEntryId()); + return true; + }); + } + } } final Map> groupedEntries = localGroupedEntries.get(); @@ -280,12 +322,24 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } if (messagesForC > 0) { - // remove positions first from replay list first : sendMessages recycles entries - if (readType == ReadType.Replay) { - for (int i = 0; i < messagesForC; i++) { - Entry entry = entriesWithSameKey.get(i); + final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger()); + for (int i = 0; i < messagesForC; i++) { + final Entry entry = entriesWithSameKey.get(i); + // remove positions first from replay list first : sendMessages recycles entries + if (readType == ReadType.Replay) { redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); } + // Add positions to individuallySentPositions if necessary + if (!allowOutOfOrderDelivery) { + final Position position = entry.getPosition(); + // Store to individuallySentPositions even if lastSentPosition is null + if ((lastSentPosition == null || position.compareTo(lastSentPosition) > 0) + && !individuallySentPositions.contains(position.getLedgerId(), position.getEntryId())) { + final Position previousPosition = managedLedger.getPreviousPosition(position); + individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(), + previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId()); + } + } } SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); @@ -311,6 +365,61 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } + // Update the last sent position and remove ranges from individuallySentPositions if necessary + if (!allowOutOfOrderDelivery && lastSentPosition != null) { + final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger()); + com.google.common.collect.Range range = individuallySentPositions.firstRange(); + + // If the upper bound is before the last sent position, we need to move ahead as these + // individuallySentPositions are now irrelevant. + if (range != null && range.upperEndpoint().compareTo(lastSentPosition) <= 0) { + individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(), + lastSentPosition.getEntryId()); + range = individuallySentPositions.firstRange(); + } + + if (range != null) { + // If the lowerBound is ahead of the last sent position, + // verify if there are any entries in-between. + if (range.lowerEndpoint().compareTo(lastSentPosition) <= 0 || managedLedger + .getNumberOfEntries(com.google.common.collect.Range.openClosed(lastSentPosition, + range.lowerEndpoint())) <= 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] Found a position range to last sent: {}", name, range); + } + Position newLastSentPosition = range.upperEndpoint(); + Position positionAfterNewLastSent = managedLedger + .getNextValidPosition(newLastSentPosition); + // sometime ranges are connected but belongs to different ledgers + // so, they are placed sequentially + // eg: (2:10..3:15] can be returned as (2:10..2:15],[3:0..3:15]. + // So, try to iterate over connected range and found the last non-connected range + // which gives new last sent position. + final Position lastConfirmedEntrySnapshot = managedLedger.getLastConfirmedEntry(); + if (lastConfirmedEntrySnapshot != null) { + while (positionAfterNewLastSent.compareTo(lastConfirmedEntrySnapshot) <= 0) { + if (individuallySentPositions.contains(positionAfterNewLastSent.getLedgerId(), + positionAfterNewLastSent.getEntryId())) { + range = individuallySentPositions.rangeContaining( + positionAfterNewLastSent.getLedgerId(), positionAfterNewLastSent.getEntryId()); + newLastSentPosition = range.upperEndpoint(); + positionAfterNewLastSent = managedLedger.getNextValidPosition(newLastSentPosition); + // check if next valid position is also deleted and part of the deleted-range + continue; + } + break; + } + } + + if (lastSentPosition.compareTo(newLastSentPosition) < 0) { + lastSentPosition = newLastSentPosition; + } + individuallySentPositions.removeAtMost(lastSentPosition.getLedgerId(), + lastSentPosition.getEntryId()); + } + } + } + // acquire message-dispatch permits for already delivered messages acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); @@ -351,10 +460,10 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List= 0) { + if ((entries.get(i)).compareTo(maxLastSentPosition) > 0) { // We have already crossed the divider line. All messages in the list are now // newer than what we can currently dispatch to this consumer return i; @@ -416,11 +525,9 @@ private boolean removeConsumersFromRecentJoinedConsumers() { boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false; Position mdp = cursor.getMarkDeletedPosition(); if (mdp != null) { - Position nextPositionOfTheMarkDeletePosition = - ((ManagedLedgerImpl) cursor.getManagedLedger()).getNextValidPosition(mdp); while (itr.hasNext()) { Map.Entry entry = itr.next(); - if (entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) { + if (entry.getValue().compareTo(mdp) <= 0) { itr.remove(); hasConsumerRemovedFromTheRecentJoinedConsumers = true; } else { @@ -431,6 +538,18 @@ private boolean removeConsumersFromRecentJoinedConsumers() { return hasConsumerRemovedFromTheRecentJoinedConsumers; } + @Nullable + private synchronized Position updateIfNeededAndGetLastSentPosition() { + if (lastSentPosition == null) { + return null; + } + final Position mdp = cursor.getMarkDeletedPosition(); + if (mdp != null && mdp.compareTo(lastSentPosition) > 0) { + lastSentPosition = mdp; + } + return lastSentPosition; + } + @Override protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { if (isDispatcherStuckOnReplays) { @@ -551,6 +670,30 @@ public LinkedHashMap getRecentlyJoinedConsumers() { return recentlyJoinedConsumers; } + public synchronized String getLastSentPosition() { + if (lastSentPosition == null) { + return null; + } + return lastSentPosition.toString(); + } + + @VisibleForTesting + public Position getLastSentPositionField() { + return lastSentPosition; + } + + public synchronized String getIndividuallySentPositions() { + if (individuallySentPositions == null) { + return null; + } + return individuallySentPositions.toString(); + } + + @VisibleForTesting + public LongPairRangeSet getIndividuallySentPositionsField() { + return individuallySentPositions; + } + public Map> getConsumerKeyHashRanges() { return selector.getConsumerKeyHashRanges(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a1d51668ca808f..77aa5f82c39142 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1305,9 +1305,26 @@ public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) { .getRecentlyJoinedConsumers(); if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) { recentlyJoinedConsumers.forEach((k, v) -> { - subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString()); + // The dispatcher allows same name consumers + final StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("consumerName=").append(k.consumerName()) + .append(", consumerId=").append(k.consumerId()); + if (k.cnx() != null) { + stringBuilder.append(", address=").append(k.cnx().clientAddress()); + } + subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString()); }); } + final String lastSentPosition = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher) + .getLastSentPosition(); + if (lastSentPosition != null) { + subStats.lastSentPosition = lastSentPosition; + } + final String individuallySentPositions = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher) + .getIndividuallySentPositions(); + if (individuallySentPositions != null) { + subStats.individuallySentPositions = individuallySentPositions; + } } subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange(); subStats.nonContiguousDeletedMessagesRangesSerializedSize = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 1c83941d6e7211..5432b8a430d631 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.admin; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -56,6 +58,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Response.Status; @@ -65,6 +68,7 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarServerException; @@ -75,6 +79,8 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.broker.testcontext.SpyConfig; import org.apache.pulsar.client.admin.GetStatsOptions; @@ -139,7 +145,10 @@ import org.apache.pulsar.common.policies.data.TopicHashPositions; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.Murmur3_32Hash; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.awaitility.Awaitility; @@ -3449,8 +3458,8 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception { } @Test - public void testGetReadPositionWhenJoining() throws Exception { - final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString(); + public void testGetLastSentPositionWhenJoining() throws Exception { + final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" + UUID.randomUUID().toString(); final String subName = "my-sub"; @Cleanup Producer producer = pulsarClient.newProducer() @@ -3458,34 +3467,189 @@ public void testGetReadPositionWhenJoining() throws Exception { .enableBatching(false) .create(); + @Cleanup + final Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .subscribe(); + final int messages = 10; MessageIdImpl messageId = null; for (int i = 0; i < messages; i++) { messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes()); + consumer1.receive(); } - List> consumers = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - Consumer consumer = pulsarClient.newConsumer() - .topic(topic) - .subscriptionType(SubscriptionType.Key_Shared) - .subscriptionName(subName) - .subscribe(); - consumers.add(consumer); - } + @Cleanup + final Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .subscribe(); TopicStats stats = admin.topics().getStats(topic); Assert.assertEquals(stats.getSubscriptions().size(), 1); SubscriptionStats subStats = stats.getSubscriptions().get(subName); Assert.assertNotNull(subStats); Assert.assertEquals(subStats.getConsumers().size(), 2); - ConsumerStats consumerStats = subStats.getConsumers().get(0); - Assert.assertEquals(consumerStats.getReadPositionWhenJoining(), - PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId() + 1).toString()); + ConsumerStats consumerStats = subStats.getConsumers().stream() + .filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get(); + Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(), + PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString()); + } + + @Test + public void testGetLastSentPosition() throws Exception { + final String topic = "persistent://prop-xyz/ns1/testGetLastSentPosition-" + UUID.randomUUID().toString(); + final String subName = "my-sub"; + @Cleanup + final Producer producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + final AtomicInteger counter = new AtomicInteger(); + @Cleanup + final Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .messageListener((c, msg) -> { + try { + c.acknowledge(msg); + counter.getAndIncrement(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .subscribe(); + + TopicStats stats = admin.topics().getStats(topic); + Assert.assertEquals(stats.getSubscriptions().size(), 1); + SubscriptionStats subStats = stats.getSubscriptions().get(subName); + Assert.assertNotNull(subStats); + Assert.assertNull(subStats.getLastSentPosition()); - for (Consumer consumer : consumers) { - consumer.close(); + final int messages = 10; + MessageIdImpl messageId = null; + for (int i = 0; i < messages; i++) { + messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes()); } + + Awaitility.await().untilAsserted(() -> assertEquals(counter.get(), messages)); + + stats = admin.topics().getStats(topic); + Assert.assertEquals(stats.getSubscriptions().size(), 1); + subStats = stats.getSubscriptions().get(subName); + Assert.assertNotNull(subStats); + Assert.assertEquals(subStats.getLastSentPosition(), PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString()); + } + + @Test + public void testGetIndividuallySentPositions() throws Exception { + // The producer sends messages with two types of keys. + // The dispatcher sends keyA messages to consumer1. + // Consumer1 will not receive any messages. Its receiver queue size is 1. + // Consumer2 will receive and ack any messages immediately. + + final String topic = "persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" + UUID.randomUUID().toString(); + final String subName = "my-sub"; + @Cleanup + final Producer producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + + final String consumer1Name = "c1"; + final String consumer2Name = "c2"; + + @Cleanup + final Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .consumerName(consumer1Name) + .receiverQueueSize(1) + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .subscribe(); + + final PersistentStickyKeyDispatcherMultipleConsumers dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher(); + final String keyA = "key-a"; + final String keyB = "key-b"; + final int hashA = Murmur3_32Hash.getInstance().makeHash(keyA.getBytes()); + + final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector"); + selectorField.setAccessible(true); + final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher)); + selectorField.set(dispatcher, selector); + + // the selector returns consumer1 if keyA + doAnswer((invocationOnMock -> { + final int hash = invocationOnMock.getArgument(0); + + final String consumerName = hash == hashA ? consumer1Name : consumer2Name; + return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get(); + })).when(selector).select(anyInt()); + + final AtomicInteger consumer2AckCounter = new AtomicInteger(); + @Cleanup + final Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .consumerName(consumer2Name) + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .messageListener((c, msg) -> { + try { + c.acknowledge(msg); + consumer2AckCounter.getAndIncrement(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .subscribe(); + + final LongPairRangeSet.LongPairConsumer positionRangeConverter = PositionFactory::create; + final LongPairRangeSet expectedIndividuallySentPositions = new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter); + + TopicStats stats = admin.topics().getStats(topic); + Assert.assertEquals(stats.getSubscriptions().size(), 1); + SubscriptionStats subStats = stats.getSubscriptions().get(subName); + Assert.assertNotNull(subStats); + Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString()); + + final Function sendFn = (key) -> { + try { + return (MessageIdImpl) producer.newMessage().key(key).value(("msg").getBytes()).send(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }; + final List messageIdList = new ArrayList<>(); + + // the dispatcher can send keyA message, but then consumer1's receiver queue will be full + messageIdList.add(sendFn.apply(keyA)); + + // the dispatcher can send messages other than keyA + messageIdList.add(sendFn.apply(keyA)); + messageIdList.add(sendFn.apply(keyB)); + messageIdList.add(sendFn.apply(keyA)); + messageIdList.add(sendFn.apply(keyB)); + messageIdList.add(sendFn.apply(keyB)); + + assertEquals(messageIdList.size(), 6); + Awaitility.await().untilAsserted(() -> assertEquals(consumer2AckCounter.get(), 3)); + + // set expected value + expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(), messageIdList.get(1).getEntryId(), + messageIdList.get(2).getLedgerId(), messageIdList.get(2).getEntryId()); + expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(), messageIdList.get(3).getEntryId(), + messageIdList.get(5).getLedgerId(), messageIdList.get(5).getEntryId()); + + stats = admin.topics().getStats(topic); + Assert.assertEquals(stats.getSubscriptions().size(), 1); + subStats = stats.getSubscriptions().get(subName); + Assert.assertNotNull(subStats); + Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index a70b3ce7a42f64..1a205d0f686d50 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -35,14 +35,19 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -50,12 +55,14 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -72,11 +79,14 @@ import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -84,6 +94,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { private PulsarService pulsarMock; private BrokerService brokerMock; + private ManagedLedgerImpl ledgerMock; private ManagedCursorImpl cursorMock; private Consumer consumerMock; private PersistentTopic topicMock; @@ -135,9 +146,44 @@ public void setup() throws Exception { doReturn(topicName).when(topicMock).getName(); doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies(); + ledgerMock = mock(ManagedLedgerImpl.class); + doAnswer((invocationOnMock -> { + final Position position = invocationOnMock.getArgument(0); + if (position.getEntryId() > 0) { + return PositionFactory.create(position.getLedgerId(), position.getEntryId() - 1); + } else { + fail("Undefined behavior on mock"); + return PositionFactory.EARLIEST; + } + })).when(ledgerMock).getPreviousPosition(any(Position.class)); + doAnswer((invocationOnMock -> { + final Position position = invocationOnMock.getArgument(0); + return PositionFactory.create(position.getLedgerId(), position.getEntryId() < 0 ? 0 : position.getEntryId() + 1); + })).when(ledgerMock).getNextValidPosition(any(Position.class)); + doAnswer((invocationOnMock -> { + final Range range = invocationOnMock.getArgument(0); + Position fromPosition = range.lowerEndpoint(); + boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; + Position toPosition = range.upperEndpoint(); + boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; + + long count = 0; + + if (fromPosition.getLedgerId() == toPosition.getLedgerId()) { + // If the 2 positions are in the same ledger + count = toPosition.getEntryId() - fromPosition.getEntryId() - 1; + count += fromIncluded ? 1 : 0; + count += toIncluded ? 1 : 0; + } else { + fail("Undefined behavior on mock"); + } + return count; + })).when(ledgerMock).getNumberOfEntries(any()); + cursorMock = mock(ManagedCursorImpl.class); doReturn(null).when(cursorMock).getLastIndividualDeletedRange(); doReturn(subscriptionName).when(cursorMock).getName(); + doReturn(ledgerMock).when(cursorMock).getManagedLedger(); consumerMock = mock(Consumer.class); channelMock = mock(ChannelPromise.class); @@ -465,6 +511,317 @@ public void testMessageRedelivery() throws Exception { allEntries.forEach(entry -> entry.release()); } + + + @DataProvider(name = "initializeLastSentPosition") + private Object[][] initialLastSentPositionProvider() { + return new Object[][] { { false }, { true } }; + } + + @Test(dataProvider = "initializeLastSentPosition") + public void testLastSentPositionAndIndividuallySentPositions(final boolean initializeLastSentPosition) throws Exception { + final Position initialLastSentPosition = PositionFactory.create(1, 10); + final LongPairRangeSet expectedIndividuallySentPositions + = new ConcurrentOpenLongPairRangeSet<>(4096, PositionFactory::create); + + final Field lastSentPositionField = PersistentStickyKeyDispatcherMultipleConsumers.class + .getDeclaredField("lastSentPosition"); + lastSentPositionField.setAccessible(true); + final LongPairRangeSet individuallySentPositions = persistentDispatcher.getIndividuallySentPositionsField(); + final Supplier clearPosition = () -> { + try { + lastSentPositionField.set(persistentDispatcher, initializeLastSentPosition ? initialLastSentPosition : null); + individuallySentPositions.clear(); + expectedIndividuallySentPositions.clear(); + } catch (Throwable e) { + return e; + } + return null; + }; + if (!initializeLastSentPosition) { + doReturn(initialLastSentPosition).when(cursorMock).getMarkDeletedPosition(); + doAnswer(invocationOnMock -> { + // skip copy operation + return initialLastSentPosition; + }).when(cursorMock).processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(any()); + } + + // Assume the range sequence is [1:0, 1:19], [2:0, 2:19], ..., [10:0, 10:19] + doAnswer((invocationOnMock -> { + final Position position = invocationOnMock.getArgument(0); + if (position.getEntryId() > 0) { + return PositionFactory.create(position.getLedgerId(), position.getEntryId() - 1); + } else if (position.getLedgerId() > 0) { + return PositionFactory.create(position.getLedgerId() - 1, 19); + } else { + throw new NullPointerException(); + } + })).when(ledgerMock).getPreviousPosition(any(Position.class)); + doAnswer((invocationOnMock -> { + final Position position = invocationOnMock.getArgument(0); + if (position.getEntryId() < 19) { + return PositionFactory.create(position.getLedgerId(), position.getEntryId() + 1); + } else { + return PositionFactory.create(position.getLedgerId() + 1, 0); + } + })).when(ledgerMock).getNextValidPosition(any(Position.class)); + doReturn(PositionFactory.create(10, 19)).when(ledgerMock).getLastConfirmedEntry(); + doAnswer((invocationOnMock -> { + final Range range = invocationOnMock.getArgument(0); + Position fromPosition = range.lowerEndpoint(); + boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; + Position toPosition = range.upperEndpoint(); + boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; + + if (fromPosition.getLedgerId() == toPosition.getLedgerId()) { + // If the 2 positions are in the same ledger + long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1; + count += fromIncluded ? 1 : 0; + count += toIncluded ? 1 : 0; + return count; + } else { + long count = 0; + // If the from & to are pointing to different ledgers, then we need to : + // 1. Add the entries in the ledger pointed by toPosition + count += toPosition.getEntryId(); + count += toIncluded ? 1 : 0; + + // 2. Add the entries in the ledger pointed by fromPosition + count += 20 - (fromPosition.getEntryId() + 1); + count += fromIncluded ? 1 : 0; + + // 3. Add the whole ledgers entries in between + for (long i = fromPosition.getLedgerId() + 1; i < toPosition.getLedgerId(); i++) { + count += 20; + } + + return count; + } + })).when(ledgerMock).getNumberOfEntries(any()); + assertEquals(ledgerMock.getNextValidPosition(PositionFactory.create(1, 0)), PositionFactory.create(1, 1)); + assertEquals(ledgerMock.getNextValidPosition(PositionFactory.create(1, 19)), PositionFactory.create(2, 0)); + assertEquals(ledgerMock.getPreviousPosition(PositionFactory.create(2, 0)), PositionFactory.create(1, 19)); + assertThrows(NullPointerException.class, () -> ledgerMock.getPreviousPosition(PositionFactory.create(0, 0))); + assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed( + PositionFactory.create(1, 0), PositionFactory.create(1, 0))), 0); + assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed( + PositionFactory.create(1, -1), PositionFactory.create(1, 9))), 10); + assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed( + PositionFactory.create(1, 19), PositionFactory.create(2, -1))), 0); + assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed( + PositionFactory.create(1, 19), PositionFactory.create(2, 9))), 10); + assertEquals(ledgerMock.getNumberOfEntries(Range.openClosed( + PositionFactory.create(1, -1), PositionFactory.create(3, 19))), 60); + + // Add a consumer + final Consumer consumer1 = mock(Consumer.class); + doReturn("consumer1").when(consumer1).consumerName(); + when(consumer1.getAvailablePermits()).thenReturn(1000); + doReturn(true).when(consumer1).isWritable(); + doReturn(channelMock).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + persistentDispatcher.addConsumer(consumer1); + + /* + On single ledger + */ + + // Expected individuallySentPositions (isp): [(1:-1, 1:8]] (init) -> [(1:-1, 1:9]] (update) -> [] (remove) + // Expected lastSentPosition (lsp): 1:10 (init) -> 1:10 (remove) + // upper bound and the new entry are less than initial last sent position + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, -1, 1, 8); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 9, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + + // isp: [(1:-1, 1:9]] -> [(1:-1, 1:10]] -> [] + // lsp: 1:10 -> 1:10 + // upper bound is less than initial last sent position + // upper bound and the new entry are less than or equal to initial last sent position + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, -1, 1, 9); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 10, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + + // isp: [(1:-1, 1:2], (1:3, 1:4], (1:5, 1:6]] -> [(1:-1, 1:2], (1:3, 1:4], (1:5, 1:6], (1:9, 1:10]] -> [] + // lsp: 1:10 -> 1:10 + // upper bound and the new entry are less than or equal to initial last sent position + // individually sent positions has multiple ranges + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, -1, 1, 2); + individuallySentPositions.addOpenClosed(1, 3, 1, 4); + individuallySentPositions.addOpenClosed(1, 5, 1, 6); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 10, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + + // isp: [(1:-1, 1:10]] -> [(1:-1, 1:11]] -> [] + // lsp: 1:10 -> 1:11 + // upper bound is less than or equal to initial last sent position + // the new entry is next position of initial last sent position + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, -1, 1, 10); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), PositionFactory.create(1, 11).toString()); + + // isp: [(1:-1, 1:9]] -> [(1:-1, 1:9], (1:10, 1:11]] -> [] + // lsp: 1:10 -> 1:11 + // upper bound is less than initial last sent position + // the new entry is next position of initial last sent position + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, -1, 1, 9); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), PositionFactory.create(1, 11).toString()); + + // isp: [(1:11, 1:15]] -> [(1:10, 1:15]] -> [] + // lsp: 1:10 -> 1:15 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry is next position of initial last sent position + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 15); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), PositionFactory.create(1, 15).toString()); + + // isp: [(1:11, 1:15]] -> [(1:10, 1:16]] -> [] + // lsp: 1:10 -> 1:16 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entries contain next position of initial last sent position + // first of the new entries is less than initial last sent position + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 15); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 9, createMessage("test", 1)), + EntryImpl.create(1, 11, createMessage("test", 2)), + EntryImpl.create(1, 16, createMessage("test", 3))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), PositionFactory.create(1, 16).toString()); + + // isp: [(1:11, 1:15]] -> [(1:11, 1:15]] -> [(1:11, 1:15]] + // lsp: 1:10 -> 1:10 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry isn't next position of initial last sent position + // the range contains the new entry + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 15); + expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 15); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 15, createMessage("test", 1))), true); + assertEquals(individuallySentPositions.toString(), expectedIndividuallySentPositions.toString()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + + // isp: [(1:11, 1:15]] -> [(1:11, 1:16]] -> [(1:11, 1:16]] + // lsp: 1:10 -> 1:10 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry isn't next position of initial last sent position + // the range doesn't contain the new entry + // the new entry is next position of upper bound + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 15); + expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 16); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 16, createMessage("test", 1))), true); + assertEquals(individuallySentPositions.toString(), expectedIndividuallySentPositions.toString()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + + // isp: [(1:11, 1:15]] -> [(1:11, 1:15], (1:16, 1:17]] -> [(1:11, 1:15], (1:16, 1:17]] + // lsp: 1:10 -> 1:10 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry isn't next position of initial last sent position + // the range doesn't contain the new entry + // the new entry isn't next position of upper bound + // the new entry is same ledger + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 15); + expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 15); + expectedIndividuallySentPositions.addOpenClosed(1, 16, 1, 17); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 17, createMessage("test", 1))), true); + assertEquals(individuallySentPositions.toString(), expectedIndividuallySentPositions.toString()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + + /* + On multiple contiguous ledgers + */ + + // isp: [(1:11, 1:18]] -> [(1:11, 1:18], (2:-1, 2:0]] -> [(1:11, 1:18], (2:-1, 2:0]] + // lsp: 1:10 -> 1:10 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry isn't next position of initial last sent position + // the range doesn't contain the new entry + // the new entry isn't next position of upper bound + // the new entry isn't same ledger + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 18); + expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 18); + expectedIndividuallySentPositions.addOpenClosed(2, -1, 2, 0); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(2, 0, createMessage("test", 1))), true); + assertEquals(individuallySentPositions.toString(), expectedIndividuallySentPositions.toString()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + + // isp: [(1:11, 1:19], (2:-1, 2:0]] -> [(1:10, 1:19], (2:-1, 2:0]] -> [] + // lsp: 1:10 -> 2:0 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry is next position of initial last sent position + // the new entry isn't same ledger + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 19); + individuallySentPositions.addOpenClosed(2, -1, 2, 0); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), PositionFactory.create(2, 0).toString()); + + // isp: [(1:11, 1:19], (2:-1, 2:19], (3:-1, 3:0]] -> [(1:10, 1:19], (2:-1, 2:19], (3:-1, 3:0]] -> [] + // lsp: 1:10 -> 3:0 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry is next position of initial last sent position + // the new entry isn't same ledger + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 19); + individuallySentPositions.addOpenClosed(2, -1, 2, 19); + individuallySentPositions.addOpenClosed(3, -1, 3, 0); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(1, 11, createMessage("test", 1))), true); + assertTrue(individuallySentPositions.isEmpty()); + assertEquals(persistentDispatcher.getLastSentPosition(), PositionFactory.create(3, 0).toString()); + + // isp: [(1:11, 1:19], (2:-1, 2:0]] -> [(1:11, 1:19], (2:-1, 2:1]] -> [(1:11, 1:19], (2:-1, 2:1]] + // lsp: 1:10 -> 1:10 + // upper bound is greater than initial last sent position + // the range doesn't contain next position of initial last sent position + // the new entry isn't next position of initial last sent position + // the new entry isn't same ledger + assertNull(clearPosition.get()); + individuallySentPositions.addOpenClosed(1, 11, 1, 19); + individuallySentPositions.addOpenClosed(2, -1, 2, 0); + expectedIndividuallySentPositions.addOpenClosed(1, 11, 1, 19); + expectedIndividuallySentPositions.addOpenClosed(2, -1, 2, 1); + persistentDispatcher.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType.Normal, + Arrays.asList(EntryImpl.create(2, 1, createMessage("test", 1))), true); + assertEquals(individuallySentPositions.toString(), expectedIndividuallySentPositions.toString()); + assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); + } + private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 5b2998216e8e1c..14403765105b9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -233,7 +233,7 @@ public void testConsumerStatsOutput() throws Exception { "unackedMessages", "avgMessagesPerEntry", "blockedConsumerOnUnackedMsgs", - "readPositionWhenJoining", + "lastSentPositionWhenJoining", "lastAckedTime", "lastAckedTimestamp", "lastConsumedTime", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 92c51da64d39d3..e8fd5378316730 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.client.api; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -26,6 +29,7 @@ import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -33,6 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -49,6 +54,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.bookkeeper.mledger.Position; @@ -56,17 +62,24 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.Murmur3_32Hash; +import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.slf4j.Logger; @@ -1096,13 +1109,21 @@ public void testAllowOutOfOrderDeliveryChangedAfterAllConsumerDisconnected() thr final String topicName = "persistent://public/default/change-allow-ooo-delivery-" + UUID.randomUUID(); final String subName = "my-sub"; - Consumer consumer = pulsarClient.newConsumer() + final Consumer consumer1 = pulsarClient.newConsumer() .topic(topicName) .subscriptionName(subName) .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(true)) .subscribe(); + @Cleanup + final Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .create(); + producer.send("message".getBytes()); + Awaitility.await().untilAsserted(() -> assertNotNull(consumer1.receive(100, TimeUnit.MILLISECONDS))); + CompletableFuture> future = pulsar.getBrokerService().getTopicIfExists(topicName); assertTrue(future.isDone()); assertTrue(future.get().isPresent()); @@ -1110,14 +1131,18 @@ public void testAllowOutOfOrderDeliveryChangedAfterAllConsumerDisconnected() thr PersistentStickyKeyDispatcherMultipleConsumers dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subName).getDispatcher(); assertTrue(dispatcher.isAllowOutOfOrderDelivery()); - consumer.close(); + assertNull(dispatcher.getLastSentPositionField()); + assertNull(dispatcher.getIndividuallySentPositionsField()); + consumer1.close(); - consumer = pulsarClient.newConsumer() + final Consumer consumer2 = pulsarClient.newConsumer() .topic(topicName) .subscriptionName(subName) .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false)) .subscribe(); + producer.send("message".getBytes()); + Awaitility.await().untilAsserted(() -> assertNotNull(consumer2.receive(100, TimeUnit.MILLISECONDS))); future = pulsar.getBrokerService().getTopicIfExists(topicName); assertTrue(future.isDone()); @@ -1125,7 +1150,9 @@ public void testAllowOutOfOrderDeliveryChangedAfterAllConsumerDisconnected() thr topic = future.get().get(); dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subName).getDispatcher(); assertFalse(dispatcher.isAllowOutOfOrderDelivery()); - consumer.close(); + assertNotNull(dispatcher.getLastSentPositionField()); + assertNotNull(dispatcher.getIndividuallySentPositionsField()); + consumer2.close(); } @Test(timeOut = 30_000) @@ -1199,6 +1226,370 @@ public void testCheckConsumersWithSameName() throws Exception { l.await(); } + @DataProvider(name = "preSend") + private Object[][] preSendProvider() { + return new Object[][] { { false }, { true } }; + } + + @Test(timeOut = 30_000, dataProvider = "preSend") + public void testCheckBetweenSkippingAndRecentlyJoinedConsumers(boolean preSend) throws Exception { + conf.setSubscriptionKeySharedUseConsistentHashing(true); + + final String topicName = "persistent://public/default/recently-joined-consumers-" + UUID.randomUUID(); + final String subName = "my-sub"; + + @Cleanup + final Producer p = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + if (preSend) { + // verify that the test succeeds even if the topic has a message + p.send("msg"); + } + + final Supplier> cb = () -> pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.autoSplitHashRange() + .setAllowOutOfOrderDelivery(false)); + + // create 2 consumers + final String c1ConsumerName = "c1"; + @Cleanup + final Consumer c1 = cb.get().consumerName(c1ConsumerName).receiverQueueSize(1).subscribe(); + @Cleanup + final Consumer c2 = cb.get().consumerName("c2").receiverQueueSize(1000).subscribe(); + + final PersistentStickyKeyDispatcherMultipleConsumers dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher(); + final Field recentlyJoinedConsumersField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("recentlyJoinedConsumers"); + recentlyJoinedConsumersField.setAccessible(true); + final LinkedHashMap recentlyJoinedConsumers = (LinkedHashMap) recentlyJoinedConsumersField.get(dispatcher); + final String keyA = "key-a"; + final int hashA = Murmur3_32Hash.getInstance().makeHash(keyA.getBytes()); + final Map hashConsumerMap = new HashMap<>(); + hashConsumerMap.put(hashA, c1.getConsumerName()); + + // enforce the selector will return c1 if keyA + final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector"); + selectorField.setAccessible(true); + final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher)); + selectorField.set(dispatcher, selector); + doAnswer((invocationOnMock -> { + final int hash = invocationOnMock.getArgument(0); + final String consumerName = hashConsumerMap.getOrDefault(hash, c2.getConsumerName()); + return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get(); + })).when(selector).select(anyInt()); + + // send and receive + Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c -> c.getConsumerName().equals(c1ConsumerName)).findFirst().get().getAvailablePermits(), 1)); + final MessageIdImpl msg0Id = (MessageIdImpl) p.newMessage().key(keyA).value("msg-0").send(); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c -> c.getConsumerName().equals(c1ConsumerName)).findFirst().get().getAvailablePermits(), 0)); + + final MessageIdImpl msg1Id = (MessageIdImpl) p.newMessage().key(keyA).value("msg-1").send(); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 2)); + + final Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("redeliveryMessages"); + redeliveryMessagesField.setAccessible(true); + final MessageRedeliveryController redeliveryMessages = (MessageRedeliveryController) redeliveryMessagesField.get(dispatcher); + + final Set replayMsgSet = redeliveryMessages.getMessagesToReplayNow(3); + assertEquals(replayMsgSet.size(), 1); + final Position replayMsg = replayMsgSet.stream().findAny().get(); + assertEquals(replayMsg, PositionFactory.create(msg1Id.getLedgerId(), msg1Id.getEntryId())); + + // add c3 + final String c3ConsumerName = "c3"; + hashConsumerMap.put(hashA, c3ConsumerName); + @Cleanup + final Consumer c3 = cb.get().consumerName(c3ConsumerName).subscribe(); + final List> c3Msgs = new ArrayList<>(); + final org.apache.pulsar.broker.service.Consumer c3Broker = dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(c3ConsumerName)).findFirst().get(); + assertEquals(recentlyJoinedConsumers.get(c3Broker), PositionFactory.create(msg0Id.getLedgerId(), msg0Id.getEntryId())); + + // None of messages are sent to c3. + Message c3Msg = c3.receive(100, TimeUnit.MILLISECONDS); + assertNull(c3Msg); + + // Disconnect c1 + c1.close(); + + c3Msg = c3.receive(100, TimeUnit.MILLISECONDS); + assertNotNull(c3Msg); + c3Msgs.add(c3Msg); + // The mark delete position will move forward. Then remove c3 from recentlyJoinedConsumers. + c3.acknowledge(c3Msg); + Awaitility.await().untilAsserted(() -> assertNull(recentlyJoinedConsumers.get(c3Broker))); + c3Msg = c3.receive(100, TimeUnit.MILLISECONDS); + assertNotNull(c3Msg); + c3Msgs.add(c3Msg); + c3.acknowledge(c3Msg); + + // check ordering + assertTrue(c3Msgs.get(0).getMessageId().compareTo(c3Msgs.get(1).getMessageId()) < 0); + } + + @Test(timeOut = 30_000) + public void testLastSentPositionWhenRecreatingDispatcher() throws Exception { + // The lastSentPosition and individuallySentPositions should be initialized + // by the markDeletedPosition and individuallyDeletedMessages. + final String topicName = "persistent://public/default/rewind-" + UUID.randomUUID(); + final String subName = "my-sub"; + + final int numMessages = 9; + final List keys = Arrays.asList("key-a", "key-b", "key-c"); + final AtomicInteger receiveCounter = new AtomicInteger(); + final AtomicInteger ackCounter = new AtomicInteger(); + + @Cleanup + final Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topicName) + .enableBatching(false) + .create(); + + final Supplier> cb = () -> pulsarClient.newConsumer(Schema.INT32) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.autoSplitHashRange() + .setAllowOutOfOrderDelivery(false)); + + @Cleanup + final Consumer c1 = cb.get().messageListener((c, msg) -> { + if (keys.get(0).equals(msg.getKey())) { + try { + c.acknowledge(msg); + ackCounter.getAndIncrement(); + } catch (PulsarClientException e) { + fail(e.getMessage()); + } + } + receiveCounter.getAndIncrement(); + }).subscribe(); + + PersistentStickyKeyDispatcherMultipleConsumers dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher(); + LongPairRangeSet individuallySentPositionsField = dispatcher.getIndividuallySentPositionsField(); + final ManagedCursorImpl cursor = (ManagedCursorImpl) ((PersistentSubscription) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName)).getCursor(); + final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger(); + + MessageIdImpl msgId = null; + for (int i = 0; i < numMessages; i++) { + msgId = (MessageIdImpl) producer.newMessage().key(keys.get(i % keys.size())).value(i).send(); + } + + // wait for consumption + Awaitility.await().untilAsserted(() -> assertEquals(receiveCounter.get(), numMessages)); + assertEquals(ackCounter.get(), numMessages / keys.size()); + assertEquals(dispatcher.getLastSentPositionField(), PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId())); + assertTrue(individuallySentPositionsField.isEmpty()); + receiveCounter.set(0); + ackCounter.set(0); + + // create expected values + final Position expectedLastSentPosition = ledger.getNextValidPosition(cursor.getMarkDeletedPosition()); + final ConcurrentOpenLongPairRangeSet + expectedIndividuallySentPositions = new ConcurrentOpenLongPairRangeSet<>(4096, PositionFactory::create); + cursor.getIndividuallyDeletedMessagesSet().forEach(range -> { + final Position lower = range.lowerEndpoint(); + final Position upper = range.upperEndpoint(); + expectedIndividuallySentPositions.addOpenClosed(lower.getLedgerId(), lower.getEntryId(), upper.getLedgerId(), upper.getEntryId()); + return true; + }); + + // modify subscription type to close current dispatcher + admin.topics().createSubscription(topicName, "sub-alt", MessageId.earliest); + c1.close(); + @Cleanup + final Consumer c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Exclusive) + .subscribe(); + c2.close(); + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getType(), SubscriptionType.Exclusive.toString()); + + @Cleanup + final Consumer c3 = cb.get().receiverQueueSize(0).subscribe(); + dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher(); + individuallySentPositionsField = dispatcher.getIndividuallySentPositionsField(); + + assertNull(dispatcher.getLastSentPositionField()); + assertTrue(individuallySentPositionsField.isEmpty()); + + assertNotNull(c3.receive()); + + // validate the individuallySentPosition is initialized by the individuallyDeletedMessages + // if it is not initialized expectedly, it has sent-hole of key-c messages because key-c messages are not scheduled to be dispatched to some consumer(already acked). + assertEquals(dispatcher.getLastSentPositionField(), expectedLastSentPosition); + assertEquals(individuallySentPositionsField.toString(), expectedIndividuallySentPositions.toString()); + } + + @Test(timeOut = 30_000) + public void testLastSentPositionWhenResettingCursor() throws Exception { + // The lastSentPosition and individuallySentPositions should be cleared if reset-cursor operation is executed. + final String nsName = "public/default"; + final String topicName = "persistent://" + nsName + "/reset-cursor-" + UUID.randomUUID(); + final String subName = "my-sub"; + + final int numMessages = 10; + final List keys = Arrays.asList("key-a", "key-b"); + final AtomicInteger ackCounter = new AtomicInteger(); + + @Cleanup + final Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topicName) + .enableBatching(false) + .create(); + + final Supplier> cb = () -> pulsarClient.newConsumer(Schema.INT32) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .receiverQueueSize(0) + .keySharedPolicy(KeySharedPolicy.autoSplitHashRange() + .setAllowOutOfOrderDelivery(false)); + + @Cleanup + final Consumer c1 = cb.get().consumerName("c1").subscribe(); + @Cleanup + final Consumer c2 = cb.get().consumerName("c2").subscribe(); + + // set retention policy + admin.namespaces().setRetention(nsName, new RetentionPolicies(1, 1024 * 1024)); + + // enforce the selector will return c1 if keys.get(0) + final PersistentStickyKeyDispatcherMultipleConsumers dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher(); + final int hashA = Murmur3_32Hash.getInstance().makeHash(keys.get(0).getBytes()); + final Map hashConsumerMap = new HashMap<>(); + hashConsumerMap.put(hashA, c1.getConsumerName()); + final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector"); + selectorField.setAccessible(true); + final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher)); + selectorField.set(dispatcher, selector); + doAnswer((invocationOnMock -> { + final int hash = invocationOnMock.getArgument(0); + final String consumerName = hashConsumerMap.getOrDefault(hash, c2.getConsumerName()); + return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get(); + })).when(selector).select(anyInt()); + + for (int i = 0; i < numMessages; i++) { + producer.newMessage().key(keys.get(i % keys.size())).value(i).send(); + } + + // consume some messages + for (int i = 0; i < numMessages / keys.size(); i++) { + final Message msg = c2.receive(); + if (msg != null) { + c2.acknowledge(msg); + ackCounter.getAndIncrement(); + } + } + assertEquals(ackCounter.get(), numMessages / keys.size()); + + // store current lastSentPosition for comparison + final LongPairRangeSet individuallySentPositionsField = dispatcher.getIndividuallySentPositionsField(); + assertNotNull(dispatcher.getLastSentPositionField()); + assertFalse(individuallySentPositionsField.isEmpty()); + + // reset cursor and receive a message + admin.topics().resetCursor(topicName, subName, MessageId.earliest, true); + + // validate the lastSentPosition and individuallySentPositions are cleared after resetting cursor + assertNull(dispatcher.getLastSentPositionField()); + assertTrue(individuallySentPositionsField.isEmpty()); + } + + @Test(timeOut = 30_000) + public void testLastSentPositionWhenSkipping() throws Exception { + // The lastSentPosition and individuallySentPositions should be updated if skip operation is executed. + // There are updated to follow the new markDeletedPosition. + final String topicName = "persistent://public/default/skip-" + UUID.randomUUID(); + final String subName = "my-sub"; + + final int numMessages = 10; + final List keys = Arrays.asList("key-a", "key-b"); + final int numSkip = 2; + final AtomicInteger ackCounter = new AtomicInteger(); + + @Cleanup + final Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topicName) + .enableBatching(false) + .create(); + + final Supplier> cb = () -> pulsarClient.newConsumer(Schema.INT32) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.autoSplitHashRange() + .setAllowOutOfOrderDelivery(false)) + .receiverQueueSize(0); + + @Cleanup + final Consumer c1 = cb.get().consumerName("c1").subscribe(); + @Cleanup + final Consumer c2 = cb.get().consumerName("c2").subscribe(); + + // enforce the selector will return c1 if keys.get(0) + final PersistentStickyKeyDispatcherMultipleConsumers dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher(); + final int hashA = Murmur3_32Hash.getInstance().makeHash(keys.get(0).getBytes()); + final Map hashConsumerMap = new HashMap<>(); + hashConsumerMap.put(hashA, c1.getConsumerName()); + final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector"); + selectorField.setAccessible(true); + final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher)); + selectorField.set(dispatcher, selector); + doAnswer((invocationOnMock -> { + final int hash = invocationOnMock.getArgument(0); + final String consumerName = hashConsumerMap.getOrDefault(hash, c2.getConsumerName()); + return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get(); + })).when(selector).select(anyInt()); + + final List positionList = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + final MessageIdImpl msgId = (MessageIdImpl) producer.newMessage().key(keys.get(i % keys.size())).value(i).send(); + positionList.add(PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId())); + } + + // consume some messages + for (int i = 0; i < numSkip; i++) { + final Message msg = c2.receive(); + if (msg != null) { + c2.acknowledge(msg); + ackCounter.getAndIncrement(); + } + } + assertEquals(ackCounter.get(), numSkip); + final ManagedCursorImpl managedCursor = ((ManagedCursorImpl) ((PersistentSubscription) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName)).getCursor()); + Awaitility.await().untilAsserted(() -> assertEquals(managedCursor.getIndividuallyDeletedMessagesSet().size(), 2)); + + // store current lastSentPosition for comparison + final Position lastSentPositionBeforeSkip = dispatcher.getLastSentPositionField(); + final LongPairRangeSet individuallySentPositionsField = dispatcher.getIndividuallySentPositionsField(); + assertNotNull(lastSentPositionBeforeSkip); + assertFalse(individuallySentPositionsField.isEmpty()); + + // skip messages and receive a message + admin.topics().skipMessages(topicName, subName, numSkip); + final MessageIdImpl msgIdAfterSkip = (MessageIdImpl) c1.receive().getMessageId(); + final Position positionAfterSkip = PositionFactory.create(msgIdAfterSkip.getLedgerId(), + msgIdAfterSkip.getEntryId()); + assertEquals(positionAfterSkip, positionList.get(4)); + + // validate the lastSentPosition is updated to the new markDeletedPosition + // validate the individuallySentPositions is updated expectedly (removeAtMost the new markDeletedPosition) + final Position lastSentPosition = dispatcher.getLastSentPositionField(); + assertNotNull(lastSentPosition); + assertTrue(lastSentPosition.compareTo(lastSentPositionBeforeSkip) > 0); + assertEquals(lastSentPosition, positionList.get(4)); + assertTrue(individuallySentPositionsField.isEmpty()); + } private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) { if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index d2d3600df96ed5..5f2cf7b209ee94 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -72,8 +72,8 @@ public interface ConsumerStats { /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */ boolean isBlockedConsumerOnUnackedMsgs(); - /** The read position of the cursor when the consumer joining. */ - String getReadPositionWhenJoining(); + /** The last sent position of the cursor when the consumer joining. */ + String getLastSentPositionWhenJoining(); /** Address of this consumer. */ String getAddress(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index d4850adaa6f221..cabef1ca9602db 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -118,6 +118,12 @@ public interface SubscriptionStats { /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ Map getConsumersAfterMarkDeletePosition(); + /** The last sent position of the cursor. This is for Key_Shared subscription. */ + String getLastSentPosition(); + + /** Set of individually sent ranges. This is for Key_Shared subscription. */ + String getIndividuallySentPositions(); + /** SubscriptionProperties (key/value strings) associated with this subscribe. */ Map getSubscriptionProperties(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index de36b330b7f1ab..b4c5d21e6926ef 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -77,8 +77,8 @@ public class ConsumerStatsImpl implements ConsumerStats { /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */ public boolean blockedConsumerOnUnackedMsgs; - /** The read position of the cursor when the consumer joining. */ - public String readPositionWhenJoining; + /** The last sent position of the cursor when the consumer joining. */ + public String lastSentPositionWhenJoining; /** Address of this consumer. */ private String address; @@ -113,7 +113,7 @@ public ConsumerStatsImpl add(ConsumerStatsImpl stats) { this.availablePermits += stats.availablePermits; this.unackedMessages += stats.unackedMessages; this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs; - this.readPositionWhenJoining = stats.readPositionWhenJoining; + this.lastSentPositionWhenJoining = stats.lastSentPositionWhenJoining; return this; } @@ -141,8 +141,8 @@ public void setClientVersion(String clientVersion) { this.clientVersion = clientVersion; } - public String getReadPositionWhenJoining() { - return readPositionWhenJoining; + public String getLastSentPositionWhenJoining() { + return lastSentPositionWhenJoining; } public String getLastAckedTime() { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index a8ea0060629a07..ab4d07c7ae486d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -126,6 +126,12 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ public Map consumersAfterMarkDeletePosition; + /** The last sent position of the cursor. This is for Key_Shared subscription. */ + public String lastSentPosition; + + /** Set of individually sent ranges. This is for Key_Shared subscription. */ + public String individuallySentPositions; + /** The number of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRanges;