Skip to content

Commit 7e80e0a

Browse files
authored
feat: add max age (#65)
* feat: add max age * fix CI * fix format * update pr * add backoff * fix compile * fix CR
1 parent 3f5a19f commit 7e80e0a

File tree

6 files changed

+58
-55
lines changed

6 files changed

+58
-55
lines changed

.github/pull_request_template.md

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,7 @@
1-
# Which issue does this PR close?
1+
## Rationale
22

3-
Closes #
43

5-
# Rationale for this change
6-
7-
<!---
8-
Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
9-
Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
10-
-->
4+
## Detailed Changes
115

12-
# What changes are included in this PR?
136

14-
<!---
15-
There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR to help reviewer understand the structure.
16-
-->
17-
18-
# Are there any user-facing changes?
19-
20-
<!---
21-
Please mention if:
22-
23-
- there are user-facing changes that needs to update the documentation or configuration.
24-
- this is a breaking change to public APIs
25-
-->
26-
27-
# How does this change test
28-
29-
<!--
30-
Please describe how you test this change (like by unit test case, integration test or some other ways) if this change has touched the code.
31-
-->
7+
## Test Plan

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
fmt:
3+
mvn formatter:format

ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.util.List;
77
import java.util.Map;
8+
import java.util.Random;
89
import java.util.concurrent.ArrayBlockingQueue;
910
import java.util.concurrent.BlockingQueue;
1011
import java.util.concurrent.CompletableFuture;
@@ -68,7 +69,6 @@
6869

6970
/**
7071
* Grpc client implementation.
71-
*
7272
*/
7373
public class GrpcClient implements RpcClient {
7474

@@ -596,11 +596,29 @@ private Channel getCheckedChannel(final Endpoint endpoint, final Consumer<Throwa
596596
}
597597

598598
private ManagedChannel getChannel(final Endpoint endpoint, final boolean createIfAbsent) {
599+
IdChannel ch = null;
599600
if (createIfAbsent) {
600-
return this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel);
601+
ch = this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel);
601602
} else {
602-
return this.managedChannelPool.get(endpoint);
603+
ch = this.managedChannelPool.get(endpoint);
603604
}
605+
606+
long maxAge = this.opts.getConnectionMaxAgeMs();
607+
if (maxAge != 0 && ch != null) {
608+
long createTime = ch.getCreateTime();
609+
long now = System.currentTimeMillis();
610+
Random rand = new Random();
611+
// Add backoff here to avoid multiple connections retry at same time.
612+
long backoff = rand.nextInt(20_000); // max backoff 20s
613+
if (now - createTime > maxAge + backoff) {
614+
ch.shutdown();
615+
IdChannel newChannel = this.newChannel(endpoint);
616+
this.managedChannelPool.put(endpoint, newChannel);
617+
return newChannel;
618+
}
619+
}
620+
621+
return ch;
604622
}
605623

606624
private IdChannel newChannel(final Endpoint endpoint) {

ceresdb-grpc/src/main/java/io/ceresdb/rpc/IdChannel.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
/**
1616
* A managed channel that has a channel id.
17-
*
1817
*/
1918
public class IdChannel extends ManagedChannel {
2019

21-
private static final AtomicLong ID_ALLOC = new AtomicLong();
20+
private static final AtomicLong ID_ALLOC = new AtomicLong();
21+
private final long createTime = System.currentTimeMillis();
2222

2323
private final long channelId;
2424
private final ManagedChannel channel;
@@ -32,6 +32,10 @@ public IdChannel(ManagedChannel channel) {
3232
this.channel = channel;
3333
}
3434

35+
public long getCreateTime() {
36+
return createTime;
37+
}
38+
3539
public long getChannelId() {
3640
return channelId;
3741
}
@@ -98,9 +102,7 @@ public void enterIdle() {
98102

99103
@Override
100104
public String toString() {
101-
return "IdChannel{" + //
102-
"channelId=" + channelId + //
103-
", channel=" + channel + //
104-
'}';
105+
return "IdChannel{" + "createTime=" + createTime + ", channelId=" + channelId + '}';
105106
}
107+
106108
}

ceresdb-rpc/src/main/java/io/ceresdb/rpc/RpcOptions.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public class RpcOptions implements Copiable<RpcOptions> {
9191

9292
private boolean logOnLimitChange = true;
9393

94+
/**
95+
* Max time in milliseconds a connection can live, 0 means forever.
96+
*/
97+
private long connectionMaxAgeMs = 0;
98+
9499
public int getDefaultRpcTimeout() {
95100
return defaultRpcTimeout;
96101
}
@@ -99,6 +104,14 @@ public void setDefaultRpcTimeout(int defaultRpcTimeout) {
99104
this.defaultRpcTimeout = defaultRpcTimeout;
100105
}
101106

107+
public long getConnectionMaxAgeMs() {
108+
return connectionMaxAgeMs;
109+
}
110+
111+
public void setConnectionMaxAgeMs(long connectionMaxAgeMs) {
112+
this.connectionMaxAgeMs = connectionMaxAgeMs;
113+
}
114+
102115
public int getRpcThreadPoolSize() {
103116
return rpcThreadPoolSize;
104117
}
@@ -238,29 +251,20 @@ public RpcOptions copy() {
238251
opts.smoothing = this.smoothing;
239252
opts.blockOnLimit = this.blockOnLimit;
240253
opts.logOnLimitChange = this.logOnLimitChange;
254+
opts.connectionMaxAgeMs = this.connectionMaxAgeMs;
241255
return opts;
242256
}
243257

244258
@Override
245259
public String toString() {
246-
return "RpcOptions{" + //
247-
"defaultRpcTimeout=" + defaultRpcTimeout + //
248-
", rpcThreadPoolSize=" + rpcThreadPoolSize + //
249-
", rpcThreadPoolQueueSize=" + rpcThreadPoolQueueSize + //
250-
", maxInboundMessageSize=" + maxInboundMessageSize + //
251-
", flowControlWindow=" + flowControlWindow + //
252-
", idleTimeoutSeconds=" + idleTimeoutSeconds + //
253-
", keepAliveTimeSeconds=" + keepAliveTimeSeconds + //
254-
", keepAliveTimeoutSeconds=" + keepAliveTimeoutSeconds + //
255-
", keepAliveWithoutCalls=" + keepAliveWithoutCalls + //
256-
", limitKind=" + limitKind + //
257-
", initialLimit=" + initialLimit + //
258-
", maxLimit=" + maxLimit + //
259-
", longRttWindow=" + longRttWindow + //
260-
", smoothing=" + smoothing + //
261-
", blockOnLimit=" + blockOnLimit + //
262-
", logOnLimitChange=" + logOnLimitChange + //
263-
'}';
260+
return "RpcOptions{" + "defaultRpcTimeout=" + defaultRpcTimeout + ", rpcThreadPoolSize=" + rpcThreadPoolSize
261+
+ ", rpcThreadPoolQueueSize=" + rpcThreadPoolQueueSize + ", maxInboundMessageSize="
262+
+ maxInboundMessageSize + ", flowControlWindow=" + flowControlWindow + ", idleTimeoutSeconds="
263+
+ idleTimeoutSeconds + ", keepAliveTimeSeconds=" + keepAliveTimeSeconds + ", keepAliveTimeoutSeconds="
264+
+ keepAliveTimeoutSeconds + ", keepAliveWithoutCalls=" + keepAliveWithoutCalls + ", limitKind="
265+
+ limitKind + ", initialLimit=" + initialLimit + ", maxLimit=" + maxLimit + ", longRttWindow="
266+
+ longRttWindow + ", smoothing=" + smoothing + ", blockOnLimit=" + blockOnLimit + ", logOnLimitChange="
267+
+ logOnLimitChange + ", connectionMaxAge=" + connectionMaxAgeMs + '}';
264268
}
265269

266270
public static RpcOptions newDefault() {

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
<proto-internal.version>1.0.0</proto-internal.version>
7878
<protobuf.version>3.21.7</protobuf.version>
7979
<!-- according to https://maven.apache.org/maven-ci-friendly.html -->
80-
<revision>1.0.3</revision>
80+
<revision>1.0.4</revision>
8181
<slf4j.version>1.7.21</slf4j.version>
8282
</properties>
8383

0 commit comments

Comments
 (0)