Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Introduce the last sent position to fix message ordering issues in Key_Shared (PIP-282) #21953

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3446,6 +3446,19 @@ public LongPairRangeSet<Position> getIndividuallyDeletedMessagesSet() {
return individualDeletedMessages;
}

public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
LongPairRangeSet.RangeProcessor<Position> processor) {
final Position mdp;
lock.readLock().lock();
try {
mdp = markDeletePosition;
individualDeletedMessages.forEach(processor);
} finally {
lock.readLock().unlock();
}
return mdp;
}

public boolean isMessageDeleted(Position position) {
return position.compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3491,7 +3491,7 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
* the position range
* @return the count of entries
*/
long getNumberOfEntries(Range<Position> range) {
public long getNumberOfEntries(Range<Position> range) {
Position fromPosition = range.lowerEndpoint();
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public class Consumer {

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private Position readPositionWhenJoining;
private Position lastSentPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
Expand Down Expand Up @@ -931,8 +931,8 @@ public ConsumerStatsImpl getStats() {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
if (lastSentPositionWhenJoining != null) {
stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString();
}
return stats;
}
Expand Down Expand Up @@ -1166,8 +1166,8 @@ public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) {
this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
}

public int getMaxUnackedMessages() {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1303,9 +1303,26 @@ public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
.getRecentlyJoinedConsumers();
if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
recentlyJoinedConsumers.forEach((k, v) -> {
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
// The dispatcher allows same name consumers
final StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("consumerName=").append(k.consumerName())
.append(", consumerId=").append(k.consumerId());
if (k.cnx() != null) {
stringBuilder.append(", address=").append(k.cnx().clientAddress());
}
subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString());
});
}
final String lastSentPosition = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
.getLastSentPosition();
if (lastSentPosition != null) {
subStats.lastSentPosition = lastSentPosition;
}
final String individuallySentPositions = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
.getIndividuallySentPositions();
if (individuallySentPositions != null) {
subStats.individuallySentPositions = individuallySentPositions;
}
}
subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.admin;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -56,6 +58,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
Expand All @@ -65,6 +68,7 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarServerException;
Expand All @@ -75,6 +79,8 @@
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.testcontext.SpyConfig;
import org.apache.pulsar.client.admin.GetStatsOptions;
Expand Down Expand Up @@ -139,7 +145,10 @@
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -3449,43 +3458,198 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception {
}

@Test
public void testGetReadPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString();
public void testGetLastSentPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

@Cleanup
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
consumer1.receive();
}

List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();
consumers.add(consumer);
}
@Cleanup
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getConsumers().size(), 2);
ConsumerStats consumerStats = subStats.getConsumers().get(0);
Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId() + 1).toString());
ConsumerStats consumerStats = subStats.getConsumers().stream()
.filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(),
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
}

@Test
public void testGetLastSentPosition() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPosition-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
final AtomicInteger counter = new AtomicInteger();
@Cleanup
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.messageListener((c, msg) -> {
try {
c.acknowledge(msg);
counter.getAndIncrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertNull(subStats.getLastSentPosition());

for (Consumer<byte[]> consumer : consumers) {
consumer.close();
final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
}

Awaitility.await().untilAsserted(() -> assertEquals(counter.get(), messages));

stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getLastSentPosition(), PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
}

@Test
public void testGetIndividuallySentPositions() throws Exception {
// The producer sends messages with two types of keys.
// The dispatcher sends keyA messages to consumer1.
// Consumer1 will not receive any messages. Its receiver queue size is 1.
// Consumer2 will receive and ack any messages immediately.

final String topic = "persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

final String consumer1Name = "c1";
final String consumer2Name = "c2";

@Cleanup
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.consumerName(consumer1Name)
.receiverQueueSize(1)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher();
final String keyA = "key-a";
final String keyB = "key-b";
final int hashA = Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());

final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
selectorField.setAccessible(true);
final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
selectorField.set(dispatcher, selector);

// the selector returns consumer1 if keyA
doAnswer((invocationOnMock -> {
final int hash = invocationOnMock.getArgument(0);

final String consumerName = hash == hashA ? consumer1Name : consumer2Name;
return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get();
})).when(selector).select(anyInt());

final AtomicInteger consumer2AckCounter = new AtomicInteger();
@Cleanup
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.consumerName(consumer2Name)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.messageListener((c, msg) -> {
try {
c.acknowledge(msg);
consumer2AckCounter.getAndIncrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.subscribe();

final LongPairRangeSet.LongPairConsumer<Position> positionRangeConverter = PositionFactory::create;
final LongPairRangeSet<Position> expectedIndividuallySentPositions = new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());

final Function<String, MessageIdImpl> sendFn = (key) -> {
try {
return (MessageIdImpl) producer.newMessage().key(key).value(("msg").getBytes()).send();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
};
final List<MessageIdImpl> messageIdList = new ArrayList<>();

// the dispatcher can send keyA message, but then consumer1's receiver queue will be full
messageIdList.add(sendFn.apply(keyA));

// the dispatcher can send messages other than keyA
messageIdList.add(sendFn.apply(keyA));
messageIdList.add(sendFn.apply(keyB));
messageIdList.add(sendFn.apply(keyA));
messageIdList.add(sendFn.apply(keyB));
messageIdList.add(sendFn.apply(keyB));

assertEquals(messageIdList.size(), 6);
Awaitility.await().untilAsserted(() -> assertEquals(consumer2AckCounter.get(), 3));

// set expected value
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(), messageIdList.get(1).getEntryId(),
messageIdList.get(2).getLedgerId(), messageIdList.get(2).getEntryId());
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(), messageIdList.get(3).getEntryId(),
messageIdList.get(5).getLedgerId(), messageIdList.get(5).getEntryId());

stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());
}

@Test
Expand Down
Loading
Loading