Skip to content

Commit 8bd412d

Browse files
suz-yangmedcll
andauthored
patch direct load bugfix (#402)
* modify direct-load MAX_QUERY_TIMEOUT to Integer.MAX_VALUE (#283) * direct load set connect timeout (#284) * fix obVersion in direct load (#320) * fix direct load trace id generator (#400) --------- Co-authored-by: medcll <[email protected]>
1 parent 837943d commit 8bd412d

File tree

4 files changed

+84
-50
lines changed

4 files changed

+84
-50
lines changed

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class ObDirectLoadConnection {
5252
private long heartBeatTimeout = 0;
5353
private long heartBeatInterval = 0;
5454

55+
private long connectTimeout = 0;
56+
5557
private boolean isInited = false;
5658
private boolean isClosed = false;
5759

@@ -61,7 +63,7 @@ public class ObDirectLoadConnection {
6163

6264
ObDirectLoadConnection(ObDirectLoadConnectionFactory connectionFactory) {
6365
this.connectionFactory = connectionFactory;
64-
this.traceId = ObDirectLoadTraceId.generateTraceId();
66+
this.traceId = ObDirectLoadTraceIdGenerator.generate();
6567
this.logger = ObDirectLoadLogger.getLogger(this.traceId);
6668
}
6769

@@ -137,6 +139,8 @@ private void fillParams(Builder builder) throws ObDirectLoadException {
137139
heartBeatInterval = builder.heartBeatInterval;
138140

139141
writeConnectionNum = builder.writeConnectionNum;
142+
143+
connectTimeout = builder.connectTimeout;
140144
}
141145

142146
private void initCheck() throws ObDirectLoadException {
@@ -167,25 +171,30 @@ private void initCheck() throws ObDirectLoadException {
167171
"Param 'heartBeatInterval' must not be greater than or equal to Param 'heartBeatTimeout', heartBeatTimeout:"
168172
+ heartBeatTimeout + ", heartBeatInterval:" + heartBeatInterval);
169173
}
174+
ObDirectLoadUtil.checkPositive(connectTimeout, "connectTimeout", logger);
170175
}
171176

172177
private void initProtocol() throws ObDirectLoadException {
173178
// 构造一个连接, 获取版本号
179+
long obVersion = 0;
174180
ObTable table = null;
175181
synchronized (connectionFactory) { // 防止并发访问ObGlobal.OB_VERSION
176182
ObGlobal.OB_VERSION = 0;
177183
try {
178184
Properties properties = new Properties();
179185
properties.setProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(),
180186
String.valueOf(1));
187+
properties.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(),
188+
String.valueOf(connectTimeout));
181189
table = new ObTable.Builder(ip, port)
182190
.setLoginInfo(tenantName, userName, password, databaseName)
183191
.setProperties(properties).build();
184192
} catch (Exception e) {
185193
throw new ObDirectLoadException(e);
186194
}
195+
obVersion = ObGlobal.OB_VERSION;
187196
}
188-
this.protocol = ObDirectLoadProtocolFactory.getProtocol(traceId, ObGlobal.OB_VERSION);
197+
this.protocol = ObDirectLoadProtocolFactory.getProtocol(traceId, obVersion);
189198
this.protocol.init();
190199
table.close();
191200
}
@@ -276,6 +285,8 @@ public static final class Builder {
276285

277286
private static final long MAX_HEART_BEAT_TIMEOUT = 1L * 365 * 24 * 3600 * 1000; // 1year
278287

288+
private long connectTimeout = 1000;
289+
279290
Builder(ObDirectLoadConnectionFactory connectionFactory) {
280291
this.connectionFactory = connectionFactory;
281292
}
@@ -306,12 +317,17 @@ public Builder setHeartBeatInfo(long heartBeatTimeout, long heartBeatInterval) {
306317
return this;
307318
}
308319

320+
public Builder setConnectTimeout(long connectTimeout) {
321+
this.connectTimeout = Math.min(connectTimeout, (long) Integer.MAX_VALUE);
322+
return this;
323+
}
324+
309325
public String toString() {
310326
return String
311327
.format(
312-
"{ip:\"%s\", port:%d, tenantName:\"%s\", userName:\"%s\", databaseName:\"%s\", writeConnectionNum:%d, heartBeatTimeout:%d, heartBeatInterval:%d}",
328+
"{ip:\"%s\", port:%d, tenantName:\"%s\", userName:\"%s\", databaseName:\"%s\", writeConnectionNum:%d, heartBeatTimeout:%d, heartBeatInterval:%d, connectTimeout:%d}",
313329
ip, port, tenantName, userName, databaseName, writeConnectionNum,
314-
heartBeatTimeout, heartBeatInterval);
330+
heartBeatTimeout, heartBeatInterval, connectTimeout);
315331
}
316332

317333
public ObDirectLoadConnection build() throws ObDirectLoadException {
@@ -370,6 +386,8 @@ private void initTables() throws ObDirectLoadException {
370386
Properties properties = new Properties();
371387
properties
372388
.setProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(), String.valueOf(1));
389+
properties.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(),
390+
String.valueOf(connection.connectTimeout));
373391
properties.setProperty(Property.RPC_EXECUTE_TIMEOUT.getKey(),
374392
String.valueOf(timeoutMillis));
375393
properties.setProperty(Property.RPC_OPERATION_TIMEOUT.getKey(),

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class ObDirectLoadStatement {
5757

5858
ObDirectLoadStatement(ObDirectLoadConnection connection) {
5959
this.connection = connection;
60-
this.traceId = ObDirectLoadTraceId.generateTraceId();
60+
this.traceId = ObDirectLoadTraceIdGenerator.generate();
6161
this.logger = ObDirectLoadLogger.getLogger(this.traceId);
6262
}
6363

@@ -309,7 +309,7 @@ public static final class Builder {
309309
private long maxErrorRowCount = 0;
310310
private String loadMethod = "full";
311311

312-
private static final long MAX_QUERY_TIMEOUT = 1L * 365 * 24 * 3600 * 1000; // 1year
312+
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
313313

314314
Builder(ObDirectLoadConnection connection) {
315315
this.connection = connection;

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadTraceId.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package com.alipay.oceanbase.rpc.direct_load;
1919

20-
import java.net.InetAddress;
21-
import java.util.concurrent.atomic.AtomicLong;
22-
2320
public class ObDirectLoadTraceId {
2421

2522
private final long uniqueId;
@@ -43,50 +40,9 @@ public long getSequence() {
4340
}
4441

4542
public static final ObDirectLoadTraceId DEFAULT_TRACE_ID;
46-
public static TraceIdGenerator traceIdGenerator;
4743

4844
static {
4945
DEFAULT_TRACE_ID = new ObDirectLoadTraceId(0, 0);
50-
traceIdGenerator = new TraceIdGenerator();
51-
}
52-
53-
public static ObDirectLoadTraceId generateTraceId() {
54-
return traceIdGenerator.generate();
5546
}
5647

57-
public static class TraceIdGenerator {
58-
59-
private final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
60-
61-
private final long uniqueId;
62-
private AtomicLong sequence;
63-
64-
public TraceIdGenerator() {
65-
long ip = 0;
66-
try {
67-
ip = ipToLong(InetAddress.getLocalHost().getHostAddress());
68-
} catch (Exception e) {
69-
logger.warn("get local host address failed", e);
70-
}
71-
long port = (long) (Math.random() % 65536) << 32;
72-
long isUserRequest = (1l << (32 + 16));
73-
long reserved = 0;
74-
uniqueId = ip | port | isUserRequest | reserved;
75-
sequence = new AtomicLong(0);
76-
}
77-
78-
private static long ipToLong(String strIp) {
79-
String[] ip = strIp.split("\\.");
80-
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
81-
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
82-
}
83-
84-
public ObDirectLoadTraceId generate() {
85-
long newSequence = System.currentTimeMillis() * 1000 + sequence.incrementAndGet()
86-
% 1000;
87-
return new ObDirectLoadTraceId(uniqueId, newSequence);
88-
}
89-
90-
};
91-
9248
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.direct_load;
19+
20+
import java.net.InetAddress;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
public class ObDirectLoadTraceIdGenerator {
24+
25+
private final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
26+
private static ObDirectLoadTraceIdGenerator instance = new ObDirectLoadTraceIdGenerator();
27+
28+
private final long uniqueId;
29+
private AtomicLong sequence;
30+
31+
public ObDirectLoadTraceIdGenerator() {
32+
long ip = 0;
33+
try {
34+
ip = ipToLong(InetAddress.getLocalHost().getHostAddress());
35+
} catch (Exception e) {
36+
logger.warn("get local host address failed", e);
37+
}
38+
long port = (long) (Math.random() % 65536) << 32;
39+
long isUserRequest = (1l << (32 + 16));
40+
long reserved = 0;
41+
uniqueId = ip | port | isUserRequest | reserved;
42+
sequence = new AtomicLong(0);
43+
}
44+
45+
private static long ipToLong(String strIp) {
46+
String[] ip = strIp.split("\\.");
47+
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
48+
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
49+
}
50+
51+
public ObDirectLoadTraceId generateNextId() {
52+
long newSequence = System.currentTimeMillis() * 1000 + sequence.incrementAndGet() % 1000;
53+
return new ObDirectLoadTraceId(uniqueId, newSequence);
54+
}
55+
56+
public static ObDirectLoadTraceId generate() {
57+
return instance.generateNextId();
58+
}
59+
60+
}

0 commit comments

Comments
 (0)