Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Support full text index #249

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,10 @@ public void setEntityType(ObTableEntityType entityType) {
super.setEntityType(entityType);
tableClientQuery.setEntityType(entityType);
}

@Override
public TableQuery setSearchText(String searchText) {
tableClientQuery.setSearchText(searchText);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,38 @@
import java.util.Map;

public enum ObIndexType {
IndexTypeIsNot(0), IndexTypeNormalLocal(1), IndexTypeUniqueLocal(2), IndexTypeNormalGlobal(3), IndexTypeUniqueGlobal(
4), IndexTypePrimary(
5), IndexTypeDomainCtxcat(
6), IndexTypeNormalGlobalLocalStorage(
7), IndexTypeUniqueGlobalLocalStorage(
8), IndexTypeSpatialLocal(
10), IndexTypeSpatialGlobal(
11), IndexTypeSpatialGlobalLocalStorage(
12), IndexTypeMax(
13);
IndexTypeIsNot(0),
IndexTypeNormalLocal(1),
IndexTypeUniqueLocal(2),
IndexTypeNormalGlobal(3),
IndexTypeUniqueGlobal(4),
IndexTypePrimary(5),
IndexTypeDomainCtxcat(6),
IndexTypeNormalGlobalLocalStorage(7),
IndexTypeUniqueGlobalLocalStorage(8),
IndexTypeSpatialLocal(10),
IndexTypeSpatialGlobal(11),
IndexTypeSpatialGlobalLocalStorage(12),
IndexTypeRowkeyDocIdLocal(13),
IndexTypeDocIdRowkeyLocal(14),
IndexTypeFtsIndexLocal(15),
IndexTypeFtsDocWordLocal(16),
/*
IndexTypeDocIdRowkeyGlobal(17),
IndexTypeFtsIndexGlobal(18),
IndexTypeFtsDocWordGlobal(19),
IndexTypeDocIdRowkeyGlobalLocalStorage(20),
IndexTypeFtsIndexGlobalLocalStorage(21),
IndexTypeFtsDocWordGlobalLocalStorage(22),
IndexTypeNormalMultivalueLocal(23),
IndexTypeUniqueMultivalueLocal(24),
IndexTypeVecRowkeyVidLocal(25),
IndexTypeVecVidRowkeyLocal(26),
IndexTypeVecDeltaBufferLocal(27),
IndexTypeVecIndexIdLocal(28),
IndexTypeVecIndexSnapshotDataLocal(29),
*/
IndexTypeMax(30);

private int value;
private static Map<Integer, ObIndexType> map = new HashMap<Integer, ObIndexType>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;

import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
import com.alipay.oceanbase.rpc.table.ObFTSParams;
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
import com.alipay.oceanbase.rpc.table.ObKVParams;
import com.alipay.oceanbase.rpc.table. ObKVParams;
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

Expand Down Expand Up @@ -67,11 +69,12 @@ public class ObTableQuery extends AbstractPayload {

private static final byte[] HTABLE_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
private boolean isHbaseQuery = false;
private boolean isFTSQuery = false;
private List<String> scanRangeColumns = new LinkedList<String>();

private List<ObTableAggregationSingle> aggregations = new LinkedList<>();

private ObKVParams obKVParams;
private ObKVParams obKVParams = null;

public void adjustStartKey(List<ObObj> key) throws IllegalArgumentException {
List<ObNewRange> keyRanges = getKeyRanges();
Expand Down Expand Up @@ -223,7 +226,7 @@ public byte[] encode() {
idx += len;
}

if (isHbaseQuery && obKVParams != null) {
if (obKVParams != null) { // hbaseQuery or FTSQuery will use obKVParams
len = (int) obKVParams.getPayloadSize();
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
idx += len;
Expand Down Expand Up @@ -290,7 +293,11 @@ public Object decode(ByteBuf buf) {
String agg_column = Serialization.decodeVString(buf);
this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column));
}
if (isHbaseQuery) {

buf.markReaderIndex();
if (buf.readByte() > 0) {
// read pType if is exists
buf.resetReaderIndex();
obKVParams = new ObKVParams();
this.obKVParams.decode(buf);
}
Expand Down Expand Up @@ -325,7 +332,7 @@ public long getPayloadContentSize() {
} else {
contentSize += HTABLE_DUMMY_BYTES.length;
}
if (isHbaseQuery && obKVParams != null) {
if (obKVParams != null) {
contentSize += obKVParams.getPayloadSize();
} else {
contentSize += HTABLE_DUMMY_BYTES.length;
Expand Down Expand Up @@ -545,7 +552,22 @@ public void setObKVParams(ObKVParams obKVParams) {
this.obKVParams = obKVParams;
}

public void setSearchText(String searchText) {
if (this.isHbaseQuery) {
throw new FeatureNotSupportedException("Hbase query not support full text search currently");
}
if (this.obKVParams == null) {
obKVParams = new ObKVParams();
}
ObFTSParams ftsParams = (ObFTSParams)obKVParams.getObParams(ObKVParamsBase.paramType.FTS);
ftsParams.setSearchText(searchText);
this.obKVParams.setObParamsBase(ftsParams);
this.isFTSQuery = true;
}

public ObKVParams getObKVParams() {
return obKVParams;
}

public boolean isFTSQuery() { return isFTSQuery; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
queryRequest.setPartitionId(obTableParam.getPartitionId());
queryRequest.setTableId(obTableParam.getTableId());
if (operationTimeout > 0) {
queryRequest.setTimeout(operationTimeout);
asyncRequest.setTimeout(operationTimeout);
} else {
queryRequest.setTimeout(obTableParam.getObTable().getObTableOperationTimeout());
asyncRequest.setTimeout(obTableParam.getObTable().getObTableOperationTimeout());
}

// refresh async query request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
import com.alipay.oceanbase.rpc.table.api.Table;
import com.alipay.oceanbase.rpc.table.api.TableQuery;

import java.util.Arrays;
Expand Down Expand Up @@ -184,6 +185,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
return this;
}

@Override
public TableQuery setSearchText(String searchText) {
this.tableQuery.setSearchText(searchText);
return this;
}

public String getIndexTableName() {
return indexTableName;
}
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/table/ObFTSParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.table;

import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

public class ObFTSParams extends ObKVParamsBase {
String searchText = null;
public ObFTSParams() {
pType = paramType.FTS;
}

public paramType getType() {
return pType;
}

public void setSearchText(String searchText) {
this.searchText = searchText;
}

public String getSearchText() { return this.searchText; }

public byte[] encode() {
byte[] bytes = new byte[(int) getPayloadContentSize()];
int idx = 0;
byte[] b = new byte[] { (byte)pType.ordinal() };
System.arraycopy(b, 0, bytes, idx, 1);
idx += 1;
int len = Serialization.getNeedBytes(searchText);
System.arraycopy(Serialization.encodeVString(searchText), 0, bytes, idx, len);
return bytes;
}

public Object decode(ByteBuf buf) {
// pType is read by ObKVParams
this.searchText = Serialization.decodeVString(buf);
return this;
}

public long getPayloadContentSize() {
return 1 /* pType*/ + Serialization.getNeedBytes(searchText);
}

public String toString() {
return "ObFtsParams: {\n pType = " + pType + ", \n searchText = " + searchText
+ "\n}\n";
}
}
3 changes: 3 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public ObKVParamsBase getObParams(ObKVParamsBase.paramType pType) {
case HBase:
return new ObHBaseParams();
case Redis:
throw new RuntimeException("Currently does not support redis type");
case FTS:
return new ObFTSParams();
default:
throw new RuntimeException("Currently does not support other types except HBase");
}
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,31 @@

package com.alipay.oceanbase.rpc.table;

import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus;
import io.netty.buffer.ByteBuf;

import java.util.HashMap;
import java.util.Map;

public abstract class ObKVParamsBase {
public enum paramType {
HBase((byte) 0), Redis((byte) 1);
HBase((byte) 0), Redis((byte) 1), FTS((byte) 2);
private final byte value;
private static final Map<Integer, paramType> map = new HashMap<Integer, paramType>();

static {
for (paramType type : paramType.values()) {
map.put(type.ordinal(), type);
}
}

public static paramType valueOf(int value) { return map.get(value); }

paramType(byte value) {
this.value = value;
}

public byte getValue() {
return value;
}
public byte getValue() { return value; }
}

public int byteSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback<Abstract
// fill a whole range if no range is added explicitly.
if (tableQuery.getKeyRanges().isEmpty()) {
tableQuery.addKeyRange(ObNewRange.getWholeRange());
} else if (tableQuery.isFTSQuery()) {
// Currently, fulltext query only support scan all partitions
tableQuery.getKeyRanges().clear();
tableQuery.addKeyRange(ObNewRange.getWholeRange());
if (tableQuery.getIndexName() == null || tableQuery.getIndexName().isEmpty()
|| tableQuery.getIndexName().equalsIgnoreCase("primary")) {
throw new IllegalArgumentException(
"use fulltext search but specified index name is not fulltext index");
}
}

// init partitionObTables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,6 @@ public interface TableQuery {
TableQuery setScanRangeColumns(String... columns);

void clear();

TableQuery setSearchText(String searchText);
}
Loading
Loading