Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@
########################################################

################### 操作信息整体配置 #####################
# 各操作的比例,按照顺序为 写入:Q1:Q2:Q3:Q4:Q5:Q6:Q7:Q8:Q9:Q10:Q11, 请注意使用英文冒号。比例中的每一项是整数。
# 各操作的比例,按照顺序为 写入:Q1:Q2:Q3:Q4:Q5:Q6:Q7:Q8:Q9:Q10:Q11:Q12, 请注意使用英文冒号。比例中的每一项是整数。
# Qi表示的查询如下:
# Q1 精确点查询 select v1... from data where time = ? and device in ?
# Q2 范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ?
Expand All @@ -328,7 +328,8 @@
# Q9 倒序范围查询(只限制起止时间)select v1... from data where time > ? and time < ? and device in ? order by time desc
# Q10 倒序带值过滤的范围查询 select v1... from data where time > ? and time < ? and v1 > ? and device in ? order by time desc
# Q11 分组聚合查询,倒序;目前仅支持iotdb、tdengine-3.0、influxdb v1
# OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:0
# Q12 基于前缀路径的快速 last 查询(iotdb-2.0专用)
OPERATION_PROPORTION=1:0:0:0:0:0:0:0:0:0:0:1

# 最长等待写时间,单位毫秒,即如果整个写操作在指定时间内没有返回,则终止此操作
# WRITE_OPERATION_TIMEOUT_MS=120000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ protected void doTest() {
case GROUP_BY_QUERY_ORDER_BY_TIME_DESC:
dbWrapper.groupByQueryOrderByDesc(queryWorkLoad.getGroupByQuery());
break;
case FAST_LAST_PREFIX_QUERY:
// 示例:自动生成一个前缀路径列表,可根据实际需求调整
java.util.List<String> prefixPaths = new java.util.ArrayList<>();
prefixPaths.add("root");
dbWrapper.fastLastPrefixQuery(prefixPaths);
break;
default:
LOGGER.error("Unsupported operation sensorType {}", operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum Operation {
RANGE_QUERY_ORDER_BY_TIME_DESC("RANGE_QUERY_DESC"),
VALUE_RANGE_QUERY_ORDER_BY_TIME_DESC("VALUE_RANGE_QUERY_DESC"),
GROUP_BY_QUERY_ORDER_BY_TIME_DESC("GROUP_BY_DESC"),
FAST_LAST_PREFIX_QUERY("FAST_LAST_PREFIX_QUERY"), // Q12: 基于前缀路径的快速 last 查询
VERIFICATION_QUERY("VERIFICATION_QUERY"),
DEVICE_QUERY("DEVICE_QUERY");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@
import java.util.stream.Collectors;

public class DBWrapper implements IDatabase {
/** Q12: Fast last query for prefix paths (IoTDB 2.0+) */
@Override
public Status fastLastPrefixQuery(List<String> prefixPaths) {
Status status = null;
Operation operation = Operation.FAST_LAST_PREFIX_QUERY;
String device =
(prefixPaths != null && !prefixPaths.isEmpty()) ? prefixPaths.get(0) : "No Prefix";
try {
List<Status> statuses = new ArrayList<>();
for (IDatabase database : databases) {
long start = System.nanoTime();
status = database.fastLastPrefixQuery(prefixPaths);
long end = System.nanoTime();
status.setTimeCost(end - start);
statuses.add(status);
}
for (Status sta : statuses) {
handleQueryOperation(sta, operation, device);
}
} catch (Exception e) {
handleUnexpectedQueryException(operation, e, device);
}
return status;
}

private static final Logger LOGGER = LoggerFactory.getLogger(DBWrapper.class);
private static Config config = ConfigDescriptor.getInstance().getConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
import java.util.List;

public interface IDatabase {
/**
* Q12: Fast last query for prefix paths (IoTDB 2.0+)
*
* @param prefixPaths 前缀路径列表
* @return status which contains successfully executed flag, error message and so on.
*/
Status fastLastPrefixQuery(List<String> prefixPaths);

/**
* Initialize any state for this DB. Called once per DB instance; there is one DB instance per
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import java.util.List;

public class FakeDB implements IDatabase {
@Override
public Status fastLastPrefixQuery(List<String> prefixPaths) {
return new Status(true, null, null);
}

@Override
public void init() throws TsdbException {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
import java.util.Set;

public class SelfCheck implements IDatabase {
@Override
public Status fastLastPrefixQuery(List<String> prefixPaths) {
return check(new Status(true, null, null));
}

private static final Config config = ConfigDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(SelfCheck.class);
private long loop = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,30 @@ public class TreeSessionManager extends SessionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TableSessionManager.class);
private final Session session;

/**
* 新增:基于前缀路径的快速 last 查询(第11种查询方式)
*
* @param prefixPath 例如 Arrays.asList("root", "sg1")
* @return SessionDataSet 查询结果
*/
public SessionDataSet executeFastLastDataQueryForOnePrefixPath(List<String> prefixPath)
throws IoTDBConnectionException, StatementExecutionException {
// 兼容未实现的 session 方法,优先用反射调用
try {
java.lang.reflect.Method method =
session.getClass().getMethod("executeFastLastDataQueryForOnePrefixPath", List.class);
Object result = method.invoke(session, prefixPath);
return (SessionDataSet) result;
} catch (NoSuchMethodException nsme) {
LOGGER.error("Session未实现executeFastLastDataQueryForOnePrefixPath方法", nsme);
throw new StatementExecutionException(
"Session未实现executeFastLastDataQueryForOnePrefixPath方法", nsme);
} catch (Exception e) {
LOGGER.error("executeFastLastDataQueryForOnePrefixPath failed", e);
throw new StatementExecutionException(e);
}
}

public TreeSessionManager(DBConfig dbConfig) {
super(dbConfig);
List<String> hostUrls = new ArrayList<>(dbConfig.getHOST().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,42 @@ public Status groupByQueryOrderByDesc(GroupByQuery groupByQuery) {
return executeQueryAndGetStatus(sql, Operation.GROUP_BY_QUERY_ORDER_BY_TIME_DESC);
}

/**
* Q12: 基于前缀路径的快速 last 查询(iotdb-2.0专用)
*
* @param prefixPaths 前缀路径列表
*/
public Status fastLastPrefixQuery(List<String> prefixPaths) {
// 仅支持 SessionStrategy,反射获取 sessionManager 字段
TreeSessionManager treeSessionManager = null;
if (dmlStrategy instanceof SessionStrategy) {
try {
java.lang.reflect.Field field = dmlStrategy.getClass().getDeclaredField("sessionManager");
field.setAccessible(true);
Object inner = field.get(dmlStrategy);
if (inner instanceof TreeSessionManager) {
treeSessionManager = (TreeSessionManager) inner;
}
} catch (Exception e) {
LOGGER.error("Q12 反射获取 sessionManager 失败", e);
}
}
if (treeSessionManager == null) {
return new Status(
false, 0, new Exception("Q12 仅支持 TreeSessionManager"), "Q12 仅支持 TreeSessionManager");
}
try {
Object result = treeSessionManager.executeFastLastDataQueryForOnePrefixPath(prefixPaths);
if (!config.isIS_QUIET_MODE()) {
LOGGER.info("Q12 fastLastPrefixQuery, prefixPaths: {}", prefixPaths);
}
return new Status(true, 0, "Q12 fastLastPrefixQuery executed", null);
} catch (Exception e) {
LOGGER.error("Q12 fastLastPrefixQuery error", e);
return new Status(false, 0, e, "Q12 fastLastPrefixQuery error");
}
}

/**
* Generate simple query header.
*
Expand Down