Skip to content

[task] setColumnFamilyTimeRange #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: hbase_2_x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
return result;
}

/*
* RenewLease.
*/
public void renewLease() throws Exception {
throw new IllegalStateException("renew only support stream query");
}

/*
* Next.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Map;

public enum ObQueryOperationType {
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2);
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3);

private int value;
private static Map<Integer, ObQueryOperationType> map = new HashMap<Integer, ObQueryOperationType>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,32 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
return buildPartitions(client, tableQuery, tableName);
}

// This function is designed for HBase-type requests.
// It is used to extend the session duration of a scan
@Override
public void renewLease() throws Exception {
if (!isEnd() && !expectant.isEmpty()) {
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
.iterator();
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
ObPair<Long, ObTableParam> partIdWithObTable = lastEntry.getValue();
// try access new partition, async will not remove useless expectant
ObTableParam obTableParam = partIdWithObTable.getRight();
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();

// refresh request info
queryRequest.setPartitionId(obTableParam.getPartitionId());
queryRequest.setTableId(obTableParam.getTableId());

// refresh async query request
asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW);
asyncRequest.setQuerySessionId(sessionId);
executeAsync(partIdWithObTable, asyncRequest);
} else {
throw new ObTableException("query end or expectant is null");
}
}

@Override
public boolean next() throws Exception {
checkStatus();
Expand Down
44 changes: 41 additions & 3 deletions src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package com.alipay.oceanbase.rpc.table;

import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.util.ObBytesString;
import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

import java.util.ArrayList;
import java.util.List;

import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader;

public class ObHBaseParams extends ObKVParamsBase {
Expand All @@ -32,11 +37,17 @@ public class ObHBaseParams extends ObKVParamsBase {
private static final int FLAG_ALLOW_PARTIAL_RESULTS = 1 << 0;
private static final int FLAG_IS_CACHE_BLOCK = 1 << 1;
private static final int FLAG_CHECK_EXISTENCE_ONLY = 1 << 2;
List<ObPair<ObBytesString, ObPair<Long, Long>>> timeRangeMap = new ArrayList<>();


public ObHBaseParams() {
pType = paramType.HBase;
}

public void addFamilyTimeRange(ObBytesString family, long min, long max) {
timeRangeMap.add(new ObPair<>(family, new ObPair<>(min, max)));
}

public ObKVParamsBase.paramType getType() {
return pType;
}
Expand Down Expand Up @@ -111,6 +122,21 @@ public byte[] encode() {
System.arraycopy(booleansToByteArray(), 0, bytes, idx, 1);
idx += 1;

int len = Serialization.getNeedBytes(timeRangeMap.size());
System.arraycopy(Serialization.encodeVi64(timeRangeMap.size()), 0, bytes, idx, len);
idx += len;
for (ObPair<ObBytesString, ObPair<Long, Long>> timeRange : timeRangeMap) {
len = Serialization.getNeedBytes(timeRange.getLeft());
System.arraycopy(Serialization.encodeBytesString(timeRange.getLeft()), 0, bytes, idx, len);
idx += len;
len = Serialization.getNeedBytes(timeRange.getRight().getLeft());
System.arraycopy(Serialization.encodeVi64(timeRange.getRight().getLeft()), 0, bytes, idx, len);
idx += len;
len = Serialization.getNeedBytes(timeRange.getRight().getRight());
System.arraycopy(Serialization.encodeVi64(timeRange.getRight().getRight()), 0, bytes, idx, len);
idx += len;
}

return bytes;
}

Expand All @@ -125,19 +151,31 @@ public Object decode(ByteBuf buf) {
caching = Serialization.decodeVi32(buf);
callTimeout = Serialization.decodeVi32(buf);
byteArrayToBooleans(buf);
long size = Serialization.decodeVi64(buf);
this.timeRangeMap = new ArrayList<>((int) size);
for (int i = 0; i < size; i++) {
this.timeRangeMap.add(new ObPair<>(Serialization.decodeBytesString(buf), new ObPair<>(Serialization.decodeVi64(buf), Serialization.decodeVi64(buf))));
}
return this;
}

public long getPayloadContentSize() {
return 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout)
+ 1; // all boolean to one byte
long contentSize = 1 + Serialization.getNeedBytes(caching) + Serialization.getNeedBytes(callTimeout)
+ 1 // all boolean to one byte
+ Serialization.getNeedBytes(timeRangeMap.size());
for (ObPair<ObBytesString, ObPair<Long, Long>> timeRange : timeRangeMap) {
contentSize += Serialization.getNeedBytes(timeRange.getLeft());
contentSize += Serialization.getNeedBytes(timeRange.getRight().getLeft());
contentSize += Serialization.getNeedBytes(timeRange.getRight().getRight());
}
return contentSize;
}

public String toString() {
return "ObParams: {\n pType = " + pType + ", \n caching = " + caching
+ ", \n callTimeout = " + callTimeout + ", \n allowPartialResult = "
+ allowPartialResults + ", \n isCacheBlock = " + isCacheBlock
+ ", \n checkExistenceOnly = " + checkExistenceOnly + "\n}\n";
+ ", \n checkExistenceOnly = " + checkExistenceOnly + ", \n timeRangeMap = " + timeRangeMap + "\n}\n";
}

}
Loading