Skip to content

Add rerouting error feedback refreshing logic #358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ public void calculateContinuousFailure(String tableName, String errorMsg) throws
if (failures.incrementAndGet() > runtimeContinuousFailureCeiling) {
logger.warn("refresh table entry {} while execute failed times exceeded {}, msg: {}",
tableName, runtimeContinuousFailureCeiling, errorMsg);
refreshMeta(tableName);
tableRoute.refreshMeta(tableName);
failures.set(0);
} else {
logger.warn("error msg: {}, current continues failure count: {}", errorMsg, failures);
Expand Down Expand Up @@ -968,14 +968,6 @@ public TableEntry getOrRefreshTableEntry(final String tableName, boolean forceRe
if (!forceRefresh) {
return tableRoute.getTableEntry(tableName);
}
return refreshMeta(tableName);
}

/**
* refresh table meta information except location
* @param tableName table name
* */
private TableEntry refreshMeta(String tableName) throws Exception {
return tableRoute.refreshMeta(tableName);
}

Expand Down Expand Up @@ -2102,6 +2094,16 @@ public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tab
"Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move .getReplica().getServer().ipToString());
throw new ObTableRoutingWrongException();
}
} else if (result != null && result.isRoutingWrong()) {
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta());
if (result.isNeedRefreshMeta()) {
tableRoute.refreshMeta(tableName);
}
if (request instanceof ObTableAbstractOperationRequest) {
long tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId();
tableRoute.refreshPartitionLocation(tableName, tabletId, null);
}
}
return result;
}
Expand Down Expand Up @@ -2365,7 +2367,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
needRefreshTabletLocation = true;
if (ex instanceof ObTableNeedFetchMetaException) {
// Refresh table info
refreshMeta(request.getTableName());
tableRoute.refreshMeta(request.getTableName());
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,42 +119,39 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
}

// decode ResultCode for response packet
boolean isRoutingWrong = false;
boolean isNeedRefreshMeta = false;
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: [error code:" + resultCode.getRcode() + "]"
+ resultCode.getErrMsg());
logger.debug(errMessage);
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
} else {
// Encountered an unexpected RoutingWrong error code,
// possibly due to the client error code version being behind the observer's version.
// Attempting a full refresh here
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
logger.warn("get unexpected error code: {}", errMessage);
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
}
}
if (resultCode.getRcode() != 0
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"meet exception: [error code:" + resultCode.getRcode() + "]"
+ resultCode.getErrMsg());
logger.debug(errMessage);
if (needFetchMeta(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
} else {
logger.warn(errMessage);
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
.getObTable().getPort(), response.getHeader().getTraceId1(), response
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
logger.debug("require_rerouting_: {}, need_refresh_kv_meta_: {}"
, response.getHeader().isRoutingWrong(), response.getHeader().isNeedRefreshKvMeta());
if (response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
if (resultCode.getRcode() != 0) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"meet exception: [error code:" + resultCode.getRcode() + "]"
+ resultCode.getErrMsg());
logger.debug(errMessage);
if (needFetchMeta(resultCode.getRcode())) {
throw new ObTableNeedFetchMetaException(errMessage, resultCode.getRcode());
} else if (needFetchPartitionLocation(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
} else {
logger.warn(errMessage);
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
.getObTable().getPort(), response.getHeader().getTraceId1(), response
.getHeader().getTraceId0(), resultCode.getRcode(), resultCode.getErrMsg());
}
} else if (resultCode.getRcode() == 0 && response.getHeader().isRoutingWrong()) {
// if distributed capability is supported and enabled
// just need to refresh table entry, no need to retry
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"meet exception and retry successfully in server: [require_rerouting :" + response.getHeader().isRoutingWrong()
+ ", need_refresh_kv_meta :" + response.getHeader().isNeedRefreshKvMeta() + "]");
logger.debug(errMessage);
isRoutingWrong = true;
if (response.getHeader().isNeedRefreshKvMeta()) {
isNeedRefreshMeta = true;
}
}
}

Expand All @@ -165,6 +162,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
.getHeader());
payload.setSequence(response.getHeader().getTraceId1());
payload.setUniqueId(response.getHeader().getTraceId0());
payload.setIsRoutingWrong(isRoutingWrong);
payload.setIsNeedRefreshMeta(isNeedRefreshMeta);
} else {
String errMessage = TraceUtil.formatTraceMessage(conn, response,
"receive unexpected command code: " + response.getCmdCode().value());
Expand Down Expand Up @@ -192,7 +191,7 @@ protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand reque
}

// schema changed
private boolean needFetchMeta(int errorCode, int pcode) {
private boolean needFetchMeta(int errorCode) {
return errorCode == ResultCodes.OB_SCHEMA_ERROR.errorCode
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
Expand All @@ -202,8 +201,7 @@ private boolean needFetchMeta(int errorCode, int pcode) {
|| errorCode == ResultCodes.OB_SCHEMA_EAGAIN.errorCode
|| errorCode == ResultCodes.OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH.errorCode
|| errorCode == ResultCodes.OB_GTS_NOT_READY.errorCode
|| errorCode == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
|| errorCode == ResultCodes.OB_ERR_OPERATION_ON_RECYCLE_OBJECT.errorCode;
}

private boolean needFetchPartitionLocation(int errorCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ private void validCachedObTableStatus(String tableName, TableEntry tableEntry, l
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
ObServerAddr addr = replica.getAddr();
ObTable obTable = tableRoster.getTable(addr);
if (obTable != null) {
if (obTable != null && !obTable.isValid()) {
obTable.setValid();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public enum Property {

RPC_LOGIN_TRY_TIMES("rpc.login.try.times", 3, "请求RPC登录的尝试次数"),

RPC_OPERATION_TIMEOUT("rpc.operation.timeout", 2000L, "OB内部执行RPC请求的超时时间"),
RPC_OPERATION_TIMEOUT("rpc.operation.timeout", 10000L, "OB内部执行RPC请求的超时时间"),

// [ObTable][CONNECTION_POOL]
SERVER_CONNECTION_POOL_SIZE("server.connection.pool.size", 1, "单个SERVER的连接数"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ public void setRoutingWrong() {
flag |= REQUIRE_REROUTING_FLAG;
}

/*
* need to refresh kv table meta
* */
public boolean isNeedRefreshKvMeta() {
return (flag & IS_KV_REQUEST_FALG) != 0;
}


/*
* Set stream next.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public abstract class AbstractPayload implements ObPayload {
private long uniqueId;
private long sequence;
private Integer channelId = null;
private boolean isRoutingWrong = false; // flag means tableEntry location or meta need to be refreshed
private boolean isNeedRefreshMeta = false; // flag means tableEntry meta need to be refreshed
protected long tenantId = 1;
private long version = 1;
protected long timeout = RPC_OPERATION_TIMEOUT.getDefaultLong();
Expand All @@ -66,6 +68,38 @@ public long getTimeout() {
return timeout;
}

/*
* Get isRoutingWrong
* */
@Override
public boolean isRoutingWrong() {
return this.isRoutingWrong;
}

/*
* Set isRoutingWrong
* */
@Override
public void setIsRoutingWrong(boolean isRoutingWrong) {
this.isRoutingWrong = isRoutingWrong;
}

/*
* Get isNeedRefreshMeta
* */
@Override
public boolean isNeedRefreshMeta() {
return this.isNeedRefreshMeta;
}

/*
* Set isNeedRefreshMeta
* */
@Override
public void setIsNeedRefreshMeta(boolean isNeedRefreshMeta) {
this.isNeedRefreshMeta = isNeedRefreshMeta;
}

/*
* Get version.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public interface ObPayload extends ObUnisVersion {
*/
long getTimeout();

boolean isRoutingWrong();

void setIsRoutingWrong(boolean isRoutingWrong);

boolean isNeedRefreshMeta();

void setIsNeedRefreshMeta(boolean isNeedRefreshMeta);

/*
* set sequence
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
.getServer().ipToString());
throw new ObTableRoutingWrongException();
}
} else if (result != null && result.isRoutingWrong()) {
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta());
TableEntry tableEntry = result.isNeedRefreshMeta() ?
client.getOrRefreshTableEntry(indexTableName, true) :
client.getOrRefreshTableEntry(indexTableName, false);
long tabletId = client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft());
client.refreshTableLocationByTabletId(indexTableName, tabletId);
}
}
client.resetExecuteContinuousFailureCount(indexTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ public void partitionExecute(ObTableOperationResult[] results,
.ipToString());
throw new ObTableRoutingWrongException();
}
} else if (result != null && result.isRoutingWrong()) {
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
subObTable.getIp(), subObTable.getPort(), tableName, result.isNeedRefreshMeta());
TableEntry entry = result.isNeedRefreshMeta() ?
obTableClient.getOrRefreshTableEntry(tableName, true) :
obTableClient.getOrRefreshTableEntry(tableName, false);
long tabletId = obTableClient.getTabletIdByPartId(entry, originPartId);
obTableClient.refreshTableLocationByTabletId(tableName, tabletId);
}
subObTableBatchOperationResult = (ObTableBatchOperationResult) result;
obTableClient.resetExecuteContinuousFailureCount(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,15 @@ public void partitionExecute(ObTableSingleOpResult[] results,
"Rerouting return IP is {}", moveResponse.getReplica().getServer().ipToString(), move.getReplica().getServer().ipToString());
throw new ObTableRoutingWrongException();
}
} else if (result != null && result.isRoutingWrong()) {
// retry successfully in server and need to refresh client cache
logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}",
subObTable.getIp(), subObTable.getPort(), realTableName, result.isNeedRefreshMeta());
if (result.isNeedRefreshMeta()) {
obTableClient.getOrRefreshTableEntry(realTableName, true);
}
// TODO: 如果是不需要全部刷新地址的错误,全部刷新地址会降低效率。如何确定出错的 tablet_id 并刷新?
obTableClient.refreshTabletLocationBatch(realTableName);
}
subLSOpResult = (ObTableLSOpResult) result;
obTableClient.resetExecuteContinuousFailureCount(realTableName);
Expand Down