Skip to content

Commit 9be2d22

Browse files
committed
add ObParams
1 parent 72ceac7 commit 9be2d22

File tree

6 files changed

+844
-542
lines changed

6 files changed

+844
-542
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 533 additions & 533 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

20+
import com.alipay.oceanbase.rpc.table.ObKVParams;
2021
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2122
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
2223
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
@@ -64,6 +65,8 @@ public class ObTableQuery extends AbstractPayload {
6465

6566
private List<ObTableAggregationSingle> aggregations = new LinkedList<>();
6667

68+
private ObKVParams obKVParams;
69+
6770
/*
6871
* Check filter.
6972
*/
@@ -173,6 +176,15 @@ public byte[] encode() {
173176
idx += len;
174177
}
175178

179+
if (isHbaseQuery) {
180+
len = (int) obKVParams.getPayloadSize();
181+
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
182+
idx += len;
183+
} else {
184+
len = HTABLE_FILTER_DUMMY_BYTES.length;
185+
System.arraycopy(HTABLE_FILTER_DUMMY_BYTES, 0, bytes, idx, len);
186+
}
187+
176188
return bytes;
177189
}
178190

@@ -230,6 +242,10 @@ public Object decode(ByteBuf buf) {
230242
String agg_column = Serialization.decodeVString(buf);
231243
this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column));
232244
}
245+
if (isHbaseQuery) {
246+
obKVParams = new ObKVParams();
247+
this.obKVParams.decode(buf);
248+
}
233249
return this;
234250
}
235251

@@ -258,6 +274,7 @@ public long getPayloadContentSize() {
258274

259275
if (isHbaseQuery) {
260276
contentSize += hTableFilter.getPayloadSize();
277+
contentSize += obKVParams.getPayloadSize();
261278
} else {
262279
contentSize += HTABLE_FILTER_DUMMY_BYTES.length;
263280
}
@@ -466,4 +483,13 @@ public void setScanRangeColumns(String... scanRangeColumns) {
466483
public void setScanRangeColumns(List<String> scanRangeColumns) {
467484
this.scanRangeColumns = scanRangeColumns;
468485
}
486+
487+
public void setObKVParams(ObKVParams obKVParams) {
488+
this.isHbaseQuery = true;
489+
this.obKVParams = obKVParams;
490+
}
491+
492+
public ObKVParams getObKVParams() {
493+
return obKVParams;
494+
}
469495
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult {
4040
private static final Logger logger = LoggerFactory
41-
.getLogger(ObTableClientQueryStreamResult.class);
41+
.getLogger(ObTableClientQueryStreamResult.class);
4242
protected ObTableClient client;
4343
private boolean isEnd = true;
4444
private long sessionId = Constants.OB_INVALID_ID;
@@ -65,7 +65,7 @@ public void init() throws Exception {
6565
// send to first tablet
6666
if (!expectant.isEmpty()) {
6767
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
68-
.iterator();
68+
.iterator();
6969
Map.Entry<Long, ObPair<Long, ObTableParam>> firstEntry = it.next();
7070
referToNewPartition(firstEntry.getValue());
7171
if (isEnd())
@@ -81,7 +81,7 @@ protected void cacheResultRows(ObTableQueryAsyncResult tableQueryResult) {
8181
}
8282

8383
protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam> partIdWithObTable)
84-
throws Exception {
84+
throws Exception {
8585
ObTableParam obTableParam = partIdWithObTable.getRight();
8686
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
8787

@@ -106,7 +106,7 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
106106
}
107107

108108
protected ObTableQueryAsyncResult referToLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTable)
109-
throws Exception {
109+
throws Exception {
110110
ObTableParam obTableParam = partIdWithObTable.getRight();
111111
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
112112

@@ -125,7 +125,7 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair<Long, ObTablePa
125125
}
126126

127127
protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTable)
128-
throws Exception {
128+
throws Exception {
129129
ObTableParam obTableParam = partIdWithObTable.getRight();
130130
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
131131

@@ -159,7 +159,7 @@ public boolean next() throws Exception {
159159
// secondly, refer to the last stream result
160160
if (!isEnd() && !expectant.isEmpty()) {
161161
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
162-
.iterator();
162+
.iterator();
163163
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
164164
// try access new partition, async will not remove useless expectant
165165
referToLastStreamResult(lastEntry.getValue());
@@ -177,7 +177,7 @@ public boolean next() throws Exception {
177177
// lastly, refer to the new partition
178178
boolean hasNext = false;
179179
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
180-
.iterator();
180+
.iterator();
181181
while (it.hasNext()) {
182182
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
183183
// try access new partition, async will not remove useless expectant
@@ -220,7 +220,7 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
220220

221221
// execute request
222222
ObTableQueryAsyncResult result = (ObTableQueryAsyncResult) commonExecute(this.client,
223-
logger, partIdWithObTable, streamRequest, connectionRef);
223+
logger, partIdWithObTable, streamRequest, connectionRef);
224224

225225
// cache result
226226
cacheResultRows(result);
@@ -247,7 +247,7 @@ public void close() throws Exception {
247247
// send end packet to last tablet
248248
if (!isEnd() && !expectant.isEmpty()) {
249249
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
250-
.iterator();
250+
.iterator();
251251
// get the last tablet
252252
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
253253
// try access new partition, async will not remove useless expectant
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.table;
19+
20+
import com.alipay.oceanbase.rpc.util.Serialization;
21+
import io.netty.buffer.ByteBuf;
22+
23+
import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader;
24+
25+
public class ObHBaseParams extends ObKVParamsBase {
26+
int caching = -1; // 限制scan返回的行的数量
27+
int callTimeout = -1; // scannerLeasePeriodTimeout,代表客户端scan的单个rpc超时时间以及服务端的scan的超时时间的一部分
28+
boolean allowPartialResults = true; // 是否允许行内部分返回
29+
boolean isCacheBlock = false; // 是否启用缓存
30+
boolean checkExistenceOnly = false; // 查看是否存在不返回数据
31+
32+
public ObHBaseParams() {
33+
pType = paramType.HBase;
34+
}
35+
36+
public ObKVParamsBase.paramType getType() {
37+
return pType;
38+
}
39+
40+
public void setCaching(int caching) {
41+
this.caching = caching;
42+
}
43+
44+
public void setCallTimeout(int callTimeout) {
45+
this.callTimeout = callTimeout;
46+
}
47+
48+
public void setAllowPartialResults(boolean allowPartialResults) {
49+
this.allowPartialResults = allowPartialResults;
50+
}
51+
52+
public void setCacheBlock(boolean isCacheBlock) {
53+
this.isCacheBlock = isCacheBlock;
54+
}
55+
56+
public void setCheckExistenceOnly(boolean checkExistenceOnly) {
57+
this.checkExistenceOnly = checkExistenceOnly;
58+
}
59+
60+
private int getContentSize() {
61+
return 4 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout)
62+
+ 1;
63+
}
64+
65+
public int getCaching() {
66+
return caching;
67+
}
68+
69+
public int getCallTimeout() {
70+
return callTimeout;
71+
}
72+
73+
public boolean getAllowPartialResults() {
74+
return allowPartialResults;
75+
}
76+
77+
public boolean getCacheBlock() {
78+
return isCacheBlock;
79+
}
80+
81+
public boolean isCheckExistenceOnly() {
82+
return checkExistenceOnly;
83+
}
84+
85+
// encode all boolean type to one byte
86+
public byte[] booleansToByteArray() {
87+
byte[] bytes = new byte[1]; // 1 byte for 4 booleans
88+
89+
if (allowPartialResults)
90+
bytes[0] |= 0x01; // 00000010
91+
if (isCacheBlock)
92+
bytes[0] |= 0x02; // 00000100
93+
if (checkExistenceOnly)
94+
bytes[0] |= 0x04; // 00001000
95+
96+
return bytes;
97+
}
98+
99+
public byte[] encode() {
100+
byte[] bytes = new byte[(int) getPayloadContentSize()];
101+
int idx = 0;
102+
103+
byte[] b = new byte[] { (byte) pType.ordinal() };
104+
System.arraycopy(b, 0, bytes, idx, 1);
105+
idx += 1;
106+
System.arraycopy(Serialization.encodeVi32(caching), 0, bytes, idx,
107+
Serialization.getNeedBytes(caching));
108+
idx += Serialization.getNeedBytes(caching);
109+
System.arraycopy(Serialization.encodeVi32(callTimeout), 0, bytes, idx,
110+
Serialization.getNeedBytes(callTimeout));
111+
idx += Serialization.getNeedBytes(callTimeout);
112+
System.arraycopy(booleansToByteArray(), 0, bytes, idx, 1);
113+
idx += 1;
114+
115+
return bytes;
116+
}
117+
118+
public void byteArrayToBooleans(ByteBuf bytes) {
119+
byte b = bytes.readByte();
120+
allowPartialResults = (b & 0x01) != 0;
121+
isCacheBlock = (b & 0x02) != 0;
122+
checkExistenceOnly = (b & 0x04) != 0;
123+
}
124+
125+
public Object decode(ByteBuf buf) {
126+
caching = Serialization.decodeVi32(buf);
127+
callTimeout = Serialization.decodeVi32(buf);
128+
byteArrayToBooleans(buf);
129+
return this;
130+
}
131+
132+
public long getPayloadContentSize() {
133+
return 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout)
134+
+ 1; // all boolean to one byte
135+
}
136+
137+
public String toString() {
138+
return "ObParams: {\n pType = " + pType + ", \n caching = " + caching
139+
+ ", \n callTimeout = " + callTimeout + ", \n allowPartialResult = "
140+
+ allowPartialResults + ", \n isCacheBlock = " + isCacheBlock
141+
+ ", \n checkExistenceOnly = " + checkExistenceOnly + "\n}\n";
142+
}
143+
144+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.table;
19+
20+
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
21+
import io.netty.buffer.ByteBuf;
22+
23+
import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader;
24+
import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength;
25+
26+
public class ObKVParams extends AbstractPayload {
27+
28+
public ObKVParamsBase obKVParamsBase;
29+
30+
public ObKVParamsBase getObParams(ObKVParamsBase.paramType pType) {
31+
switch (pType) {
32+
case HBase:
33+
return new ObHBaseParams();
34+
case Redis:
35+
default:
36+
throw new RuntimeException("Currently does not support other types except HBase");
37+
}
38+
}
39+
40+
public void setObParamsBase(ObKVParamsBase obKVParamsBase) {
41+
this.obKVParamsBase = obKVParamsBase;
42+
}
43+
44+
public ObKVParamsBase getObParamsBase() {
45+
return obKVParamsBase;
46+
}
47+
48+
@Override
49+
public byte[] encode() {
50+
byte[] bytes = new byte[(int) getPayloadSize()];
51+
int idx = 0;
52+
53+
// 0. encode header
54+
int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize());
55+
System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes,
56+
idx, headerLen);
57+
idx += headerLen;
58+
59+
int len = (int) obKVParamsBase.getPayloadContentSize();
60+
System.arraycopy(obKVParamsBase.encode(), 0, bytes, idx, len);
61+
62+
return bytes;
63+
}
64+
65+
public Object decode(ByteBuf buf) {
66+
super.decode(buf);
67+
byte b = buf.readByte();
68+
ObKVParamsBase.paramType pType = ObKVParamsBase.paramType.values()[b];
69+
obKVParamsBase = getObParams(pType);
70+
obKVParamsBase.decode(buf);
71+
return this;
72+
}
73+
74+
@Override
75+
public long getPayloadContentSize() {
76+
return obKVParamsBase.getPayloadContentSize();
77+
}
78+
}

0 commit comments

Comments
 (0)