Skip to content

Commit f02bcdf

Browse files
authored
[client] Support initial cluster with retries to avoid initial failed when bootstrap server only contains one available address (#2024)
1 parent 23042f3 commit f02bcdf

File tree

3 files changed

+263
-99
lines changed

3 files changed

+263
-99
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.apache.fluss.config.ConfigOptions;
2727
import org.apache.fluss.config.Configuration;
2828
import org.apache.fluss.exception.FlussRuntimeException;
29+
import org.apache.fluss.exception.NetworkException;
2930
import org.apache.fluss.exception.PartitionNotExistException;
3031
import org.apache.fluss.exception.RetriableException;
32+
import org.apache.fluss.exception.StaleMetadataException;
3133
import org.apache.fluss.metadata.PhysicalTablePath;
3234
import org.apache.fluss.metadata.TableBucket;
3335
import org.apache.fluss.metadata.TableInfo;
@@ -38,7 +40,6 @@
3840
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
3941
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
4042
import org.apache.fluss.rpc.gateway.TabletServerGateway;
41-
import org.apache.fluss.utils.ExceptionUtils;
4243

4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
@@ -56,12 +57,14 @@
5657
import java.util.stream.Collectors;
5758

5859
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
60+
import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
5961

6062
/** The updater to initialize and update client metadata. */
6163
public class MetadataUpdater {
6264
private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class);
6365

64-
private static final int MAX_RETRY_TIMES = 5;
66+
private static final int MAX_RETRY_TIMES = 3;
67+
private static final int RETRY_INTERVAL_MS = 100;
6568

6669
private final RpcClient rpcClient;
6770
protected volatile Cluster cluster;
@@ -270,7 +273,7 @@ public void updateMetadata(
270273
tablePartitionIds);
271274
}
272275
} catch (Exception e) {
273-
Throwable t = ExceptionUtils.stripExecutionException(e);
276+
Throwable t = stripExecutionException(e);
274277
if (t instanceof RetriableException || t instanceof TimeoutException) {
275278
LOG.warn("Failed to update metadata, but the exception is re-triable.", t);
276279
} else if (t instanceof PartitionNotExistException) {
@@ -292,10 +295,33 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
292295
Cluster cluster = null;
293296
Exception lastException = null;
294297
for (InetSocketAddress address : inetSocketAddresses) {
298+
ServerNode serverNode = null;
295299
try {
296-
cluster = tryToInitializeCluster(rpcClient, address);
297-
break;
300+
serverNode =
301+
new ServerNode(
302+
-1,
303+
address.getHostString(),
304+
address.getPort(),
305+
ServerType.COORDINATOR);
306+
ServerNode finalServerNode = serverNode;
307+
AdminReadOnlyGateway adminReadOnlyGateway =
308+
GatewayClientProxy.createGatewayProxy(
309+
() -> finalServerNode, rpcClient, AdminReadOnlyGateway.class);
310+
if (inetSocketAddresses.size() == 1) {
311+
// if there is only one bootstrap server, we can retry to connect to it.
312+
cluster =
313+
tryToInitializeClusterWithRetries(
314+
rpcClient, serverNode, adminReadOnlyGateway, MAX_RETRY_TIMES);
315+
} else {
316+
cluster = tryToInitializeCluster(adminReadOnlyGateway);
317+
break;
318+
}
298319
} catch (Exception e) {
320+
// We should dis-connected with the bootstrap server id to make sure the next
321+
// retry can rebuild the connection.
322+
if (serverNode != null) {
323+
rpcClient.disconnect(serverNode.uid());
324+
}
299325
LOG.error(
300326
"Failed to initialize fluss client connection to bootstrap server: {}",
301327
address,
@@ -306,24 +332,64 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
306332

307333
if (cluster == null && lastException != null) {
308334
String errorMsg =
309-
"Failed to initialize fluss client connection to server because no "
310-
+ "bootstrap server is validate. bootstrap servers: "
311-
+ inetSocketAddresses;
335+
"Failed to initialize fluss client connection to bootstrap servers: "
336+
+ inetSocketAddresses
337+
+ ". \nReason: "
338+
+ lastException.getMessage();
312339
LOG.error(errorMsg);
313340
throw new IllegalStateException(errorMsg, lastException);
314341
}
315342

316343
return cluster;
317344
}
318345

319-
private static Cluster tryToInitializeCluster(RpcClient rpcClient, InetSocketAddress address)
346+
@VisibleForTesting
347+
static @Nullable Cluster tryToInitializeClusterWithRetries(
348+
RpcClient rpcClient,
349+
ServerNode serverNode,
350+
AdminReadOnlyGateway gateway,
351+
int maxRetryTimes)
352+
throws Exception {
353+
int retryCount = 0;
354+
while (retryCount <= maxRetryTimes) {
355+
try {
356+
return tryToInitializeCluster(gateway);
357+
} catch (Exception e) {
358+
Throwable cause = stripExecutionException(e);
359+
// in case of bootstrap is recovering, we should retry to connect.
360+
if (!(cause instanceof StaleMetadataException || cause instanceof NetworkException)
361+
|| retryCount >= maxRetryTimes) {
362+
throw e;
363+
}
364+
365+
// We should dis-connected with the bootstrap server id to make sure the next
366+
// retry can rebuild the connection.
367+
rpcClient.disconnect(serverNode.uid());
368+
369+
long delayMs = (long) (RETRY_INTERVAL_MS * Math.pow(2, retryCount));
370+
LOG.warn(
371+
"Failed to connect to bootstrap server: {} (retry {}/{}). Retrying in {} ms.",
372+
serverNode,
373+
retryCount + 1,
374+
maxRetryTimes,
375+
delayMs,
376+
e);
377+
378+
try {
379+
Thread.sleep(delayMs);
380+
} catch (InterruptedException ex) {
381+
Thread.currentThread().interrupt();
382+
throw new RuntimeException("Interrupted during retry sleep", ex);
383+
}
384+
retryCount++;
385+
}
386+
}
387+
388+
return null;
389+
}
390+
391+
private static Cluster tryToInitializeCluster(AdminReadOnlyGateway adminReadOnlyGateway)
320392
throws Exception {
321-
ServerNode serverNode =
322-
new ServerNode(
323-
-1, address.getHostString(), address.getPort(), ServerType.COORDINATOR);
324-
AdminReadOnlyGateway adminReadOnlyGateway =
325-
GatewayClientProxy.createGatewayProxy(
326-
() -> serverNode, rpcClient, AdminReadOnlyGateway.class);
327393
return sendMetadataRequestAndRebuildCluster(adminReadOnlyGateway, Collections.emptySet());
328394
}
329395

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.client.admin.Admin;
23+
import org.apache.fluss.cluster.Cluster;
24+
import org.apache.fluss.cluster.ServerNode;
25+
import org.apache.fluss.config.Configuration;
26+
import org.apache.fluss.metadata.TablePath;
27+
import org.apache.fluss.rpc.RpcClient;
28+
import org.apache.fluss.server.testutils.FlussClusterExtension;
29+
30+
import org.junit.jupiter.api.Test;
31+
import org.junit.jupiter.api.extension.RegisterExtension;
32+
33+
import java.util.Collections;
34+
import java.util.List;
35+
36+
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
37+
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
38+
import static org.assertj.core.api.Assertions.assertThat;
39+
40+
/** IT test for update metadata of {@link MetadataUpdater}. */
41+
class MetadataUpdaterITCase {
42+
43+
@RegisterExtension
44+
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
45+
FlussClusterExtension.builder().setNumOfTabletServers(2).build();
46+
47+
@Test
48+
void testRebuildClusterNTimes() throws Exception {
49+
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
50+
Connection conn = ConnectionFactory.createConnection(clientConf);
51+
Admin admin = conn.getAdmin();
52+
TablePath tablePath = TablePath.of("fluss", "test");
53+
admin.createTable(tablePath, DATA1_TABLE_DESCRIPTOR, true).get();
54+
admin.close();
55+
conn.close();
56+
57+
RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
58+
MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, rpcClient);
59+
// update metadata
60+
metadataUpdater.updateMetadata(Collections.singleton(tablePath), null, null);
61+
Cluster cluster = metadataUpdater.getCluster();
62+
63+
// repeat 20K times to reproduce StackOverflowError if there is
64+
// any N levels UnmodifiableCollection
65+
for (int i = 0; i < 20000; i++) {
66+
cluster =
67+
sendMetadataRequestAndRebuildCluster(
68+
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(),
69+
true,
70+
cluster,
71+
null,
72+
null,
73+
null);
74+
}
75+
}
76+
77+
@Test
78+
void testUpdateWithEmptyMetadataResponse() throws Exception {
79+
RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
80+
MetadataUpdater metadataUpdater =
81+
new MetadataUpdater(FLUSS_CLUSTER_EXTENSION.getClientConfig(), rpcClient);
82+
83+
// update metadata
84+
metadataUpdater.updateMetadata(null, null, null);
85+
Cluster cluster = metadataUpdater.getCluster();
86+
87+
List<ServerNode> expectedServerNodes = FLUSS_CLUSTER_EXTENSION.getTabletServerNodes();
88+
assertThat(expectedServerNodes).hasSize(2);
89+
assertThat(cluster.getAliveTabletServerList()).isEqualTo(expectedServerNodes);
90+
91+
// then, stop coordinator server, can still update metadata
92+
FLUSS_CLUSTER_EXTENSION.stopCoordinatorServer();
93+
metadataUpdater.updateMetadata(null, null, null);
94+
assertThat(cluster.getAliveTabletServerList()).isEqualTo(expectedServerNodes);
95+
96+
// start a new tablet server, the tablet server will return empty metadata
97+
// response since no coordinator server to send newest metadata to the tablet server
98+
int newServerId = 2;
99+
FLUSS_CLUSTER_EXTENSION.startTabletServer(newServerId);
100+
101+
// we mock a new cluster with only server 1 so that it'll only send request
102+
// to server 1, which will return empty resonate
103+
Cluster newCluster =
104+
new Cluster(
105+
Collections.singletonMap(
106+
newServerId,
107+
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().get(newServerId)),
108+
null,
109+
Collections.emptyMap(),
110+
Collections.emptyMap(),
111+
Collections.emptyMap(),
112+
Collections.emptyMap());
113+
114+
metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
115+
// shouldn't update metadata to empty since the empty metadata will be ignored
116+
metadataUpdater.updateMetadata(null, null, null);
117+
assertThat(metadataUpdater.getCluster().getAliveTabletServers())
118+
.isEqualTo(newCluster.getAliveTabletServers())
119+
.hasSize(1);
120+
121+
// recover the coordinator
122+
FLUSS_CLUSTER_EXTENSION.startCoordinatorServer();
123+
}
124+
}

0 commit comments

Comments
 (0)