Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.DefaultChannelId;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.vertx.core.impl.ConcurrentHashSet;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -154,6 +153,7 @@
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.EmptyVersion;
import org.apache.pulsar.common.topics.TopicList;
Expand Down Expand Up @@ -1212,14 +1212,9 @@ public void close() throws IOException {
private class ClientChannel implements Closeable {
private ClientChannelHelper clientChannelHelper = new ClientChannelHelper();
private ServerCnx serverCnx = new ServerCnx(pulsar);
private EmbeddedChannel channel = new EmbeddedChannel(DefaultChannelId.newInstance(),
new LengthFieldBasedFrameDecoder(
5 * 1024 * 1024,
0,
4,
0,
4),
serverCnx);
private EmbeddedChannel channel =
new EmbeddedChannel(DefaultChannelId.newInstance(), FrameDecoderUtil.createFrameDecoder(),
(ChannelHandler) serverCnx);
public ClientChannel() {
serverCnx.setAuthRole("");
}
Expand Down Expand Up @@ -2850,20 +2845,13 @@ public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
}

protected void resetChannel() throws Exception {
int maxMessageSize = 5 * 1024 * 1024;
if (channel != null && channel.isActive()) {
serverCnx.close();
channel.close().get();
}
serverCnx = new ServerCnx(pulsar);
serverCnx.setAuthRole("");
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
maxMessageSize,
0,
4,
0,
4),
(ChannelHandler) serverCnx);
channel = new EmbeddedChannel(FrameDecoderUtil.createFrameDecoder(), (ChannelHandler) serverCnx);
}

protected void setChannelConnected() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.Queue;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.protocol.FrameDecoderUtil;
import org.apache.pulsar.common.protocol.PulsarDecoder;

public class ClientChannelHelper {
Expand All @@ -61,7 +61,7 @@ public class ClientChannelHelper {

public ClientChannelHelper() {
int maxMessageSize = 5 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed.

channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4), decoder);
channel = new EmbeddedChannel(FrameDecoderUtil.createFrameDecoder(), decoder);
}

public Object getCommand(Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -61,7 +61,7 @@
@Test(groups = "broker-api")
public class PulsarMultiListenersWithInternalListenerNameTest extends MockedPulsarServiceBaseTest {
private final boolean withInternalListener;
private ExecutorService executorService;
private ScheduledExecutorService executorService;
private InetSocketAddress brokerAddress;
private InetSocketAddress brokerSslAddress;
private EventLoopGroup eventExecutors;
Expand All @@ -79,7 +79,7 @@ protected PulsarMultiListenersWithInternalListenerNameTest(boolean withInternalL
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
this.executorService = Executors.newFixedThreadPool(1);
this.executorService = Executors.newSingleThreadScheduledExecutor();
this.eventExecutors = new NioEventLoopGroup();
this.isTcpLookup = true;
String host = InetAddress.getLocalHost().getHostAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -61,7 +60,7 @@ public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService scheduleExecutor;
private final ScheduledExecutorService scheduleExecutor;
private final String listenerName;
private final int maxLookupRedirects;
private final ExecutorService lookupPinnedExecutor;
Expand All @@ -83,27 +82,29 @@ public class BinaryProtoLookupService implements LookupService {

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ScheduledExecutorService, ExecutorService)}
* instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService scheduleExecutor)
ScheduledExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, scheduleExecutor);
}

/**
* @deprecated use {@link
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ScheduledExecutorService, ExecutorService)}
* instead.
*/
@Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService scheduleExecutor)
ScheduledExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
}
Expand All @@ -112,7 +113,7 @@ public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService scheduleExecutor,
ScheduledExecutorService scheduleExecutor,
ExecutorService lookupPinnedExecutor)
throws PulsarClientException {
this.client = client;
Expand Down Expand Up @@ -409,15 +410,16 @@ public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName
try {
return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> {
CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>();
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
long opTimeoutMs = client.getConfiguration().getOperationTimeoutMs();
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs * 2, TimeUnit.MILLISECONDS)
.setMax(1, TimeUnit.MINUTES)
.create();

long startTimeNanos = System.nanoTime();
long retryUntilNanos = startTimeNanos + TimeUnit.MILLISECONDS.toNanos(opTimeoutMs);
newFutureCreated.setValue(topicsFuture);
getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode,
getTopicsUnderNamespace(namespace, backoff, startTimeNanos, retryUntilNanos, topicsFuture, mode,
topicsPattern, topicsHash);
return topicsFuture;
});
Expand All @@ -433,50 +435,56 @@ public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName
private void getTopicsUnderNamespace(
NamespaceName namespace,
Backoff backoff,
AtomicLong remainingTime,
long startTimeNanos,
long retryUntilNanos,
CompletableFuture<GetTopicsResult> getTopicsResultFuture,
Mode mode,
String topicsPattern,
String topicsHash) {
long startTime = System.nanoTime();

client.getCnxPool().getConnection(serviceNameResolver).thenAcceptAsync(clientCnx -> {
client.getCnxPool().getConnection(serviceNameResolver).thenComposeAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);

clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> {
if (t != null) {
histoListTopics.recordFailure(System.nanoTime() - startTime);
getTopicsResultFuture.completeExceptionally(t);
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(namespace.toString(), requestId, mode,
topicsPattern, topicsHash);
return clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> {
client.getCnxPool().releaseConnection(clientCnx);
});
}, lookupPinnedExecutor).whenComplete((r, t) -> {
if (t != null) {
Throwable cause = FutureUtil.unwrapCompletionException(t);
if (cause instanceof PulsarClientException && !PulsarClientException.isRetriableError(cause)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it covered in the test here?

histoListTopics.recordFailure(System.nanoTime() - startTimeNanos);
getTopicsResultFuture.completeExceptionally(cause);
return;
}
long nowNanos = System.nanoTime();
if (nowNanos > retryUntilNanos) {
histoListTopics.recordFailure(System.nanoTime() - startTimeNanos);
log.warn("[namespace: {}] Error while getTopicsUnderNamespace -- No more retries left."
+ " Last error was {}", namespace, t.getMessage());
getTopicsResultFuture.completeExceptionally(new PulsarClientException.TimeoutException(
format("Could not get topics of namespace %s within configured timeout",
namespace.toString())));
} else {
histoListTopics.recordSuccess(System.nanoTime() - startTime);
if (log.isDebugEnabled()) {
log.debug("[namespace: {}] Success get topics list in request: {}",
namespace, requestId);
long nextDelay = backoff.next();
log.warn("[namespace: {}] Error while getTopicsUnderNamespace -- Will try again in"
+ " {} ms. Error was {}", namespace, nextDelay, t.getMessage());
if (!getTopicsResultFuture.isDone()) {
scheduleExecutor.schedule(() -> {
getTopicsUnderNamespace(namespace, backoff, startTimeNanos, retryUntilNanos,
getTopicsResultFuture, mode, topicsPattern, topicsHash);
}, nextDelay, TimeUnit.MILLISECONDS);
} else {
log.info("[namespace: {}] Ignoring retry in getTopicsUnderNamespace -- Future is already "
+ "completed", namespace);
}
getTopicsResultFuture.complete(r);
}
client.getCnxPool().releaseConnection(clientCnx);
});
}, lookupPinnedExecutor).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
new PulsarClientException.TimeoutException(
format("Could not get topics of namespace %s within configured timeout",
namespace.toString())));
return null;
} else {
histoListTopics.recordSuccess(System.nanoTime() - startTimeNanos);
if (log.isDebugEnabled()) {
log.debug("[namespace: {}] Success get topics list", namespace);
}
getTopicsResultFuture.complete(r);
}

((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
getTopicsUnderNamespace(namespace, backoff, remainingTime, getTopicsResultFuture,
mode, topicsPattern, topicsHash);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.Promise;
Expand All @@ -43,6 +42,7 @@
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -166,7 +166,7 @@ public class ClientCnx extends PulsarHandler {
@Getter(AccessLevel.PACKAGE)
private final Semaphore pendingLookupRequestSemaphore;
private final Semaphore maxLookupRequestSemaphore;
private final EventLoopGroup eventLoopGroup;
private final ScheduledExecutorService scheduledExecutor;

private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ClientCnx.class, "numberOfRejectRequests");
Expand Down Expand Up @@ -251,20 +251,20 @@ String getDescription() {
}

public ClientCnx(InstrumentProvider instrumentProvider,
ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
this(instrumentProvider, conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
ClientConfigurationData conf, ScheduledExecutorService scheduledExecutor) {
this(instrumentProvider, conf, scheduledExecutor, Commands.getCurrentProtocolVersion());
}

public ClientCnx(InstrumentProvider instrumentProvider, ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
int protocolVersion) {
public ClientCnx(InstrumentProvider instrumentProvider, ClientConfigurationData conf,
ScheduledExecutorService scheduledExecutor, int protocolVersion) {
super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), false);
this.maxLookupRequestSemaphore =
new Semaphore(conf.getMaxLookupRequest() - conf.getConcurrentLookupRequest(), false);
this.waitingLookupRequests = Queues.newConcurrentLinkedQueue();
this.authentication = conf.getAuthentication();
this.eventLoopGroup = eventLoopGroup;
this.scheduledExecutor = scheduledExecutor;
this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
this.operationTimeoutMs = conf.getOperationTimeoutMs();
this.state = State.None;
Expand All @@ -289,7 +289,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.localAddress = ctx.channel().localAddress();
this.remoteAddress = ctx.channel().remoteAddress();

this.timeoutTask = this.eventLoopGroup
this.timeoutTask = this.scheduledExecutor
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::checkRequestTimeout), operationTimeoutMs,
operationTimeoutMs, TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -759,7 +759,7 @@ private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(lon
if (firstOneWaiting != null) {
maxLookupRequestSemaphore.release();
// schedule a new lookup in.
eventLoopGroup.execute(() -> {
scheduledExecutor.execute(() -> {
long newId = firstOneWaiting.getLeft();
TimedCompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
addPendingLookupRequests(newId, newFuture);
Expand Down Expand Up @@ -1291,7 +1291,7 @@ private void incrementRejectsAndMaybeClose() {
long rejectedRequests = NUMBER_OF_REJECTED_REQUESTS_UPDATER.getAndIncrement(this);
if (rejectedRequests == 0) {
// schedule timer
eventLoopGroup.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(ClientCnx.this, 0),
scheduledExecutor.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(ClientCnx.this, 0),
rejectedRequestResetTimeSec, TimeUnit.SECONDS);
} else if (rejectedRequests >= maxNumberOfRejectedRequestPerConnection) {
log.error("{} Close connection because received {} rejected request in {} seconds ", ctx.channel(),
Expand Down
Loading
Loading