Skip to content

Commit

Permalink
[fix][client] Orphan producer when concurrently calling producer clos…
Browse files Browse the repository at this point in the history
…ing and reconnection (#23853)

(cherry picked from commit 56adefa)
  • Loading branch information
poorbarcode authored and lhotari committed Jan 17, 2025
1 parent 16da87f commit 54a7efd
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import java.time.Duration;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -71,6 +72,46 @@ public Object[][] produceConf() {
};
}

/**
* Param1: Producer enableBatch or not
* Param2: Send in async way or not
*/
@DataProvider(name = "brokenPipeline")
public Object[][] brokenPipeline() {
return new Object[][]{
{true},
{false}
};
}

@Test(dataProvider = "brokenPipeline")
public void testProducerCloseCallback2(boolean brokenPipeline) throws Exception {
initClient();
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerClose")
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(0)
.enableBatching(false)
.create();
final TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
final TypedMessageBuilder<byte[]> value = messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8));
producer.getClientCnx().channel().config().setAutoRead(false);
final CompletableFuture<MessageId> completableFuture = value.sendAsync();
producer.closeAsync();
Thread.sleep(3000);
if (brokenPipeline) {
//producer.getClientCnx().channel().config().setAutoRead(true);
producer.getClientCnx().channel().close();
} else {
producer.getClientCnx().channel().config().setAutoRead(true);
}
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
System.out.println(1);
Assert.assertTrue(completableFuture.isDone());
});
}

@Test(timeOut = 10_000)
public void testProducerCloseCallback() throws Exception {
initClient();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-api")
public class ProducerReconnectionTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testConcurrencyReconnectAndClose() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
admin.topics().createNonPartitionedTopic(topicName);
PulsarClientImpl client = (PulsarClientImpl) pulsarClient;

// Create producer which will run with special steps.
ProducerBuilderImpl<byte[]> producerBuilder = (ProducerBuilderImpl<byte[]>) client.newProducer()
.blockIfQueueFull(false).maxPendingMessages(1).producerName("p1")
.enableBatching(true).topic(topicName);
CompletableFuture<Producer<byte[]>> producerFuture = new CompletableFuture<>();
AtomicBoolean reconnectionStartTrigger = new AtomicBoolean();
CountDownLatch reconnectingSignal = new CountDownLatch(1);
CountDownLatch closedSignal = new CountDownLatch(1);
ProducerImpl<byte[]> producer = new ProducerImpl<>(client, topicName, producerBuilder.getConf(), producerFuture,
-1, Schema.BYTES, null, Optional.empty()) {
@Override
ConnectionHandler initConnectionHandler() {
ConnectionHandler connectionHandler = super.initConnectionHandler();
ConnectionHandler spyConnectionHandler = spy(connectionHandler);
doAnswer(invocation -> {
boolean result = (boolean) invocation.callRealMethod();
if (reconnectionStartTrigger.get()) {
log.info("[testConcurrencyReconnectAndClose] verified state for reconnection");
reconnectingSignal.countDown();
closedSignal.await();
log.info("[testConcurrencyReconnectAndClose] reconnected");
}
return result;
}).when(spyConnectionHandler).isValidStateForReconnection();
return spyConnectionHandler;
}
};
log.info("[testConcurrencyReconnectAndClose] producer created");
producerFuture.get(5, TimeUnit.SECONDS);

// Reconnect.
log.info("[testConcurrencyReconnectAndClose] trigger a reconnection");
ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService().getTopic(topicName, false).join()
.get().getProducers().values().iterator().next().getCnx();
reconnectionStartTrigger.set(true);
serverCnx.ctx().close();
producer.sendAsync("1".getBytes(StandardCharsets.UTF_8));
Awaitility.await().untilAsserted(() -> {
assertNotEquals(producer.getPendingQueueSize(), 0);
});

// Close producer when reconnecting.
reconnectingSignal.await();
log.info("[testConcurrencyReconnectAndClose] producer close");
producer.closeAsync();
Awaitility.await().untilAsserted(() -> {
HandlerState.State state1 = producer.getState();
assertTrue(state1 == HandlerState.State.Closed || state1 == HandlerState.State.Closing);
});
// give another thread time to call "signalToChangeStateToConnecting.await()".
closedSignal.countDown();

// Wait for reconnection.
Thread.sleep(3000);

HandlerState.State state2 = producer.getState();
log.info("producer state: {}", state2);
assertTrue(state2 == HandlerState.State.Closed || state2 == HandlerState.State.Closing);
assertEquals(producer.getPendingQueueSize(), 0);

// Verify: ref is expected.
producer.close();
admin.topics().delete(topicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Optional;
Expand Down Expand Up @@ -192,13 +193,12 @@ public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDela
duringConnect.set(false);
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
if (!state.changeToConnecting()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
state.topic, state.getHandlerName(), state.getState());
return;
}
long delayMs = initialConnectionDelayMs.orElse(backoff.next());
state.setState(State.Connecting);
log.info("[{}] [{}] Closed connection {} -- Will try again in {} s, hostUrl: {}",
state.topic, state.getHandlerName(), cnx.channel(), delayMs / 1000.0, hostUrl.orElse(null));
state.client.timer().newTimeout(timeout -> {
Expand Down Expand Up @@ -232,7 +232,8 @@ protected long switchClientCnx(ClientCnx clientCnx) {
return EPOCH_UPDATER.incrementAndGet(this);
}

private boolean isValidStateForReconnection() {
@VisibleForTesting
public boolean isValidStateForReconnection() {
State state = this.state.getState();
switch (state) {
case Uninitialized:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,20 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
"The number of producer sessions closed", topic, Attributes.empty());

this.connectionHandler = new ConnectionHandler(this,
this.connectionHandler = initConnectionHandler();
setChunkMaxMessageSize();
grabCnx();
producersOpenedCounter.increment();
}

ConnectionHandler initConnectionHandler() {
return new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
.create(),
this);
setChunkMaxMessageSize();
grabCnx();
producersOpenedCounter.increment();
this);
}

private void setChunkMaxMessageSize() {
Expand Down Expand Up @@ -1151,7 +1155,7 @@ public CompletableFuture<Void> handleOnce() {


@Override
public CompletableFuture<Void> closeAsync() {
public synchronized CompletableFuture<Void> closeAsync() {
final State currentState = getAndUpdateState(state -> {
if (state == State.Closed) {
return state;
Expand Down Expand Up @@ -1179,11 +1183,11 @@ public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
cnx.removeProducer(producerId);
closeAndClearPendingMessages();
if (exception == null || !cnx.ctx().channel().isActive()) {
// Either we've received the success response for the close producer command from the broker, or the
// connection did break in the meantime. In any case, the producer is gone.
log.info("[{}] [{}] Closed Producer", topic, producerName);
closeAndClearPendingMessages();
closeFuture.complete(null);
} else {
closeFuture.completeExceptionally(exception);
Expand Down Expand Up @@ -1795,6 +1799,12 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
// Because the state could have been updated while retrieving the connection, we set it back to connecting,
// as long as the change from current state to connecting is a valid state change.
if (!changeToConnecting()) {
if (getState() == State.Closing || getState() == State.Closed) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
failPendingMessages(cnx,
new PulsarClientException.ProducerFencedException("producer has been closed"));
}
return CompletableFuture.completedFuture(null);
}
// We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating
Expand Down Expand Up @@ -1855,6 +1865,8 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
cnx.removeProducer(producerId);
failPendingMessages(cnx,
new PulsarClientException.ProducerFencedException("producer has been closed"));
cnx.channel().close();
future.complete(null);
return;
Expand Down Expand Up @@ -2025,7 +2037,7 @@ private void closeProducerTasks() {

private void resendMessages(ClientCnx cnx, long expectedEpoch) {
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (this) {
synchronized (ProducerImpl.this) {
if (getState() == State.Closing || getState() == State.Closed) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
Expand Down Expand Up @@ -2181,7 +2193,7 @@ public void run(Timeout timeout) throws Exception {
* This fails and clears the pending messages with the given exception. This method should be called from within the
* ProducerImpl object mutex.
*/
private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
private synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
if (cnx == null) {
final AtomicInteger releaseCount = new AtomicInteger();
final boolean batchMessagingEnabled = isBatchMessagingEnabled();
Expand Down Expand Up @@ -2333,7 +2345,7 @@ private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) {
}
}

protected void processOpSendMsg(OpSendMsg op) {
protected synchronized void processOpSendMsg(OpSendMsg op) {
if (op == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,8 @@
import static org.testng.Assert.*;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import io.netty.util.HashedWheelTimer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -74,35 +66,4 @@ public void testPopulateMessageSchema() {
assertTrue(producer.populateMessageSchema(msg, null));
verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
}

@Test
public void testClearPendingMessageWhenCloseAsync() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
Mockito.doReturn(1L).when(client).newProducerId();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setStatsIntervalSeconds(-1);
Mockito.doReturn(clientConf).when(client).getConfiguration();
Mockito.doReturn(new InstrumentProvider(null)).when(client).instrumentProvider();
ConnectionPool connectionPool = mock(ConnectionPool.class);
Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon();
Mockito.doReturn(connectionPool).when(client).getCnxPool();
HashedWheelTimer timer = mock(HashedWheelTimer.class);
Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), Mockito.anyLong(), Mockito.any());
Mockito.doReturn(timer).when(client).timer();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setSendTimeoutMs(-1);
ProducerImpl<?> producer = Mockito.spy(new ProducerImpl<>(client, "topicName", producerConf, null, 0, null, null, Optional.empty()));

// make sure throw exception when send request to broker
ClientCnx clientCnx = mock(ClientCnx.class);
CompletableFuture<ProducerResponse> tCompletableFuture = new CompletableFuture<>();
tCompletableFuture.completeExceptionally(new PulsarClientException("error"));
when(clientCnx.sendRequestWithId(Mockito.any(), Mockito.anyLong())).thenReturn(tCompletableFuture);
Mockito.doReturn(clientCnx).when(producer).cnx();

// run closeAsync and verify
CompletableFuture<Void> voidCompletableFuture = producer.closeAsync();
verify(producer).closeAndClearPendingMessages();
}

}

0 comments on commit 54a7efd

Please sign in to comment.