Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shaded sofa dependency #209

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,39 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<!-- 和 package 阶段绑定 -->
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shade</shadedClassifierName>
<!-- Any name that makes sense -->
</configuration>
</execution>
</executions>
<configuration>
<relocations>
<relocation>
<pattern>com.alipay.sofa.common</pattern>
<shadedPattern>com.shaded.alipay.sofa.common</shadedPattern>
</relocation>
<relocation>
<pattern>com.alipay.remoting</pattern>
<shadedPattern>com.shaded.alipay.remoting</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
</project>
270 changes: 189 additions & 81 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand All @@ -139,7 +139,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
}
}
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
if (resultCode.getRcode() != 0
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand Down Expand Up @@ -193,6 +194,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
}

Expand Down
399 changes: 283 additions & 116 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

import static com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -53,7 +54,9 @@ public class TableEntry {
// partition location
private TableEntryKey tableEntryKey = null;
private volatile ObPartitionEntry partitionEntry = null;


public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();

/*
* Is valid.
*/
Expand Down Expand Up @@ -218,8 +221,6 @@ public void prepare() throws IllegalArgumentException {
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
+ tableEntryKey);
partitionInfo.prepare();
checkArgument(partitionEntry != null,
"partition table partition entry is not ready. key" + tableEntryKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class ObPartitionEntry {
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();

// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
private Map<Long, Long> tabletLsIdMap = new HashMap<>();

// tabelt id -> (PartitionLocation, LsId)
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();


public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
}

public Map<Long, ObPartitionLocation> getPartitionLocation() {
return partitionLocation;
}
Expand All @@ -39,6 +49,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
this.partitionLocation = partitionLocation;
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }

/*
* Get partition location with part id.
*/
Expand Down Expand Up @@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
public String toString() {
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.location.model.partition;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;

public class ObPartitionLocationInfo {
private ObPartitionLocation partitionLocation = null;
private Long tabletLsId = OB_INVALID_ID;
private Long lastUpdateTime = 0L;
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public AtomicBoolean initialized = new AtomicBoolean(false);
public final CountDownLatch initializationLatch = new CountDownLatch(1);

public ObPartitionLocation getPartitionLocation() {
rwLock.readLock().lock();
try {
return partitionLocation;
} finally {
rwLock.readLock().unlock();
}
}

public void updateLocation(ObPartitionLocation newLocation) {
this.partitionLocation = newLocation;
this.lastUpdateTime = System.currentTimeMillis();
}

public Long getTabletLsId() {
return tabletLsId;
}

public void setTabletLsId(Long tabletLsId) {
this.tabletLsId = tabletLsId;
}

public Long getLastUpdateTime() {
rwLock.readLock().lock();
try {
return lastUpdateTime;
} finally {
rwLock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public enum Property {
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),

// [ObTable][OTHERS]
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),

/*
* other config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ public enum ResultCodes {
OB_CLUSTER_NO_MATCH(-4666), //
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
OB_ERR_ZONE_NOT_EMPTY(-4668), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), //
OB_LS_NOT_EXIST(-4719), //
OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST(-4723), //
OB_TABLET_NOT_EXIST(-4725), //
OB_ERR_PARSER_INIT(-5000), //
OB_ERR_PARSE_SQL(-5001), //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;

import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.location.model.ObReadConsistency;
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
import com.alipay.oceanbase.rpc.location.model.TableEntry;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
Expand Down Expand Up @@ -51,7 +53,6 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
protected volatile boolean closed = false;
protected volatile List<ObObj> row = null;
protected volatile int rowIndex = -1;
// 调整它的startKey
protected ObTableQuery tableQuery;
protected long operationTimeout = -1;
protected String tableName;
Expand Down Expand Up @@ -324,10 +325,7 @@ public boolean next() throws Exception {

} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
// Adjust the start key and refresh the expectant
this.tableQuery.adjustStartKey(currentStartKey);
setExpectant(refreshPartition(tableQuery, tableName));

// Reset the iterator to start over
it = expectant.entrySet().iterator();
referPartition.clear(); // Clear the referPartition if needed
Expand Down Expand Up @@ -362,7 +360,7 @@ public boolean next() throws Exception {
}

protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new HashMap<>();
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new LinkedHashMap<>();
String indexName = tableQuery.getIndexName();
String indexTableName = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package com.alipay.oceanbase.rpc.stream;

import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTableNeedFetchAllException;
import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException;
import com.alipay.oceanbase.rpc.location.model.TableEntry;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
Expand Down Expand Up @@ -84,8 +86,10 @@ public void init() throws Exception {
it = expectant.entrySet().iterator();
retryTimes++;
if (retryTimes > maxRetries) {
RUNTIME.error("Fail to get refresh table entry response after {}", retryTimes);
throw new ObTableRetryExhaustedException("Fail to get refresh table entry response after " + retryTimes);
RUNTIME.error("Fail to get refresh table entry response after {}",
retryTimes);
throw new ObTableRetryExhaustedException(
"Fail to get refresh table entry response after " + retryTimes);

}
} else {
Expand Down Expand Up @@ -199,11 +203,21 @@ public boolean next() throws Exception {
referToLastStreamResult(lastEntry.getValue());
} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
this.asyncRequest.getObTableQueryRequest().getTableQuery()
.adjustStartKey(currentStartKey);
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
.getTableQuery(), tableName));
setEnd(true);

TableEntry entry = client.getOrRefreshTableEntry(tableName, false, false, false);
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
if (ObGlobal.obVsnMajor() >= 4
&& entry.isPartitionTable()
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
this.asyncRequest.getObTableQueryRequest().getTableQuery()
.adjustStartKey(currentStartKey);
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
.getTableQuery(), tableName));
setEnd(true);
} else {
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
.getTableQuery(), tableName));
}
} else {
throw e;
}
Expand All @@ -230,15 +244,22 @@ public boolean next() throws Exception {
referToNewPartition(entry.getValue());
} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
this.asyncRequest.getObTableQueryRequest().getTableQuery()
.adjustStartKey(currentStartKey);
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
.getTableQuery(), tableName));
TableEntry tableEntry = client.getOrRefreshTableEntry(tableName, false, false, false);
if (ObGlobal.obVsnMajor() >= 4
&& tableEntry.isPartitionTable()
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType().isRangePart()) {
this.asyncRequest.getObTableQueryRequest().getTableQuery()
.adjustStartKey(currentStartKey);
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
.getTableQuery(), tableName));
}
it = expectant.entrySet().iterator();
retryTimes++;
if (retryTimes > client.getTableEntryRefreshTryTimes()) {
RUNTIME.error("Fail to get refresh table entry response after {}", retryTimes);
throw new ObTableRetryExhaustedException("Fail to get refresh table entry response after " + retryTimes);
RUNTIME.error("Fail to get refresh table entry response after {}",
retryTimes);
throw new ObTableRetryExhaustedException(
"Fail to get refresh table entry response after " + retryTimes);
}
continue;
} else {
Expand Down
Loading
Loading