Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.optimizer.predicate.Predicate;
import com.aliyun.odps.table.read.TableBatchReadSession;
import com.aliyun.odps.table.read.TableReadSessionBuilder;
Expand Down Expand Up @@ -100,6 +101,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private int readTimeout;
private int retryTimes;

private boolean onlyPartitionEqualityPredicate = false;

@Setter
private SelectedPartitions selectedPartitions = null;

Expand Down Expand Up @@ -177,6 +180,12 @@ private void createRequiredColumns() {
*/
TableBatchReadSession createTableBatchReadSession(List<PartitionSpec> requiredPartitionSpecs) throws IOException {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
return createTableBatchReadSession(requiredPartitionSpecs, mcCatalog.getSplitOption());
}

TableBatchReadSession createTableBatchReadSession(
List<PartitionSpec> requiredPartitionSpecs, SplitOptions splitOptions) throws IOException {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();

readTimeout = mcCatalog.getReadTimeout();
connectTimeout = mcCatalog.getConnectTimeout();
Expand All @@ -186,7 +195,7 @@ TableBatchReadSession createTableBatchReadSession(List<PartitionSpec> requiredPa

return scanBuilder.identifier(table.getTableIdentifier())
.withSettings(mcCatalog.getSettings())
.withSplitOptions(mcCatalog.getSplitOption())
.withSplitOptions(splitOptions)
.requiredPartitionColumns(requiredPartitionColumns)
.requiredDataColumns(orderedRequiredDataColumns)
.withFilterPredicate(filterPredicate)
Expand Down Expand Up @@ -315,6 +324,51 @@ protected void convertPredicate() {
}
this.filterPredicate = filterPredicate;
}

this.onlyPartitionEqualityPredicate = checkOnlyPartitionEqualityPredicate();
}

private boolean checkOnlyPartitionEqualityPredicate() {
if (conjuncts.isEmpty()) {
return true;
}
Set<String> partitionColumns =
table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet());
for (Expr expr : conjuncts) {
if (expr instanceof BinaryPredicate) {
BinaryPredicate bp = (BinaryPredicate) expr;
if (bp.getOp() != BinaryPredicate.Operator.EQ) {
return false;
}
if (!(bp.getChild(0) instanceof SlotRef) || !(bp.getChild(1) instanceof LiteralExpr)) {
return false;
}
String colName = ((SlotRef) bp.getChild(0)).getColumnName();
if (!partitionColumns.contains(colName)) {
return false;
}
} else if (expr instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) expr;
if (inPredicate.isNotIn()) {
return false;
}
if (!(inPredicate.getChild(0) instanceof SlotRef)) {
return false;
}
String colName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
if (!partitionColumns.contains(colName)) {
return false;
}
for (int i = 1; i < inPredicate.getChildren().size(); i++) {
if (!(inPredicate.getChild(i) instanceof LiteralExpr)) {
return false;
}
}
} else {
return false;
}
}
return true;
}

private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException {
Expand Down Expand Up @@ -576,14 +630,23 @@ protected Map<String, String> getLocationProperties() throws UserException {

private List<Split> getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws IOException {
List<Split> result = new ArrayList<>();

long t0 = System.currentTimeMillis();
String scanSessionSerialize = serializeSession(tableBatchReadSession);
long t1 = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplitByTableSession: serializeSession cost {} ms, "
+ "serialized size: {} bytes", t1 - t0, scanSessionSerialize.length());

InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner();
long t2 = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplitByTableSession: getInputSplitAssigner cost {} ms", t2 - t1);

long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime();

MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();

if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {

long t3 = System.currentTimeMillis();
for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) {
MaxComputeSplit maxComputeSplit =
new MaxComputeSplit(BYTE_SIZE_PATH,
Expand All @@ -599,7 +662,10 @@ private List<Split> getSplitByTableSession(TableBatchReadSession tableBatchReadS

result.add(maxComputeSplit);
}
LOG.info("MaxComputeScanNode getSplitByTableSession: byte_size getAllSplits+build cost {} ms, "
+ "splits size: {}", System.currentTimeMillis() - t3, result.size());
} else {
long t3 = System.currentTimeMillis();
long totalRowCount = assigner.getTotalRowCount();

long recordsPerSplit = mcCatalog.getSplitRowCount();
Expand All @@ -619,17 +685,27 @@ private List<Split> getSplitByTableSession(TableBatchReadSession tableBatchReadS

result.add(maxComputeSplit);
}
LOG.info("MaxComputeScanNode getSplitByTableSession: row_offset getSplitByRowOffset+build cost {} ms, "
+ "splits size: {}, totalRowCount: {}", System.currentTimeMillis() - t3, result.size(),
totalRowCount);
}

return result;
}

@Override
public List<Split> getSplits(int numBackends) throws UserException {
long startTime = System.currentTimeMillis();
List<Split> result = new ArrayList<>();
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
long getOdpsTableTime = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplits: getOdpsTable cost {} ms", getOdpsTableTime - startTime);

if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return result;
}
long getFileNumTime = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplits: getFileNum cost {} ms", getFileNumTime - getOdpsTableTime);

createRequiredColumns();

Expand All @@ -649,11 +725,71 @@ public List<Split> getSplits(int numBackends) throws UserException {
}

try {
TableBatchReadSession tableBatchReadSession = createTableBatchReadSession(requiredPartitionSpecs);
result = getSplitByTableSession(tableBatchReadSession);
long beforeSession = System.currentTimeMillis();
if (sessionVariable.enableMcLimitSplitOptimization
&& onlyPartitionEqualityPredicate && hasLimit()) {
result = getSplitsWithLimitOptimization(requiredPartitionSpecs);
} else {
TableBatchReadSession tableBatchReadSession = createTableBatchReadSession(requiredPartitionSpecs);
long afterSession = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplits: createTableBatchReadSession cost {} ms, "
+ "partitionSpecs size: {}", afterSession - beforeSession, requiredPartitionSpecs.size());

result = getSplitByTableSession(tableBatchReadSession);
long afterSplit = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplits: getSplitByTableSession cost {} ms, "
+ "splits size: {}", afterSplit - afterSession, result.size());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
LOG.info("MaxComputeScanNode getSplits: total cost {} ms", System.currentTimeMillis() - startTime);
return result;
}

private List<Split> getSplitsWithLimitOptimization(
List<PartitionSpec> requiredPartitionSpecs) throws IOException {
long startTime = System.currentTimeMillis();

SplitOptions rowOffsetOptions = SplitOptions.newBuilder()
.SplitByRowOffset()
.withCrossPartition(false)
.build();

TableBatchReadSession tableBatchReadSession =
createTableBatchReadSession(requiredPartitionSpecs, rowOffsetOptions);
long afterSession = System.currentTimeMillis();
LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: "
+ "createTableBatchReadSession cost {} ms", afterSession - startTime);

String scanSessionSerialize = serializeSession(tableBatchReadSession);
InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner();
long totalRowCount = assigner.getTotalRowCount();

LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: "
+ "totalRowCount={}, limit={}", totalRowCount, getLimit());

List<Split> result = new ArrayList<>();
if (totalRowCount <= 0) {
return result;
}

long rowsToRead = Math.min(getLimit(), totalRowCount);
long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime();
com.aliyun.odps.table.read.split.InputSplit split =
assigner.getSplitByRowOffset(0, rowsToRead);

MaxComputeSplit maxComputeSplit = new MaxComputeSplit(
ROW_OFFSET_PATH, 0, rowsToRead, totalRowCount,
modificationTime, null, Collections.emptyList());
maxComputeSplit.scanSerialize = scanSessionSerialize;
maxComputeSplit.splitType = SplitType.ROW_OFFSET;
maxComputeSplit.sessionId = split.getSessionId();
result.add(maxComputeSplit);

LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: "
+ "total cost {} ms, 1 split with {} rows",
System.currentTimeMillis() - startTime, rowsToRead);
return result;
}

Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2787,13 +2787,26 @@ public boolean isEnableHboNonStrictMatchingMode() {
public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids";

public static final String ENABLE_EXTERNAL_TABLE_BATCH_MODE = "enable_external_table_batch_mode";

public static final String ENABLE_MC_LIMIT_SPLIT_OPTIMIZATION = "enable_mc_limit_split_optimization";
@VariableMgr.VarAttr(
name = ENABLE_EXTERNAL_TABLE_BATCH_MODE,
fuzzy = true,
description = {"使能外表的 batch mode 功能", "Enable the batch mode function of the external table."},
needForward = true)
public boolean enableExternalTableBatchMode = true;

@VariableMgr.VarAttr(
name = ENABLE_MC_LIMIT_SPLIT_OPTIMIZATION,
fuzzy = true,
description = {"开启 MaxCompute 表 LIMIT 查询的 split 优化。当查询仅包含分区等值条件且带有 LIMIT 时,"
+ "使用 row_offset 策略减少 split 数量以加速查询。",
"Enable split optimization for LIMIT queries on MaxCompute tables. "
+ "When the query contains only partition equality predicates with LIMIT, "
+ "use row_offset strategy to reduce split count for faster query execution."},
needForward = true)
public boolean enableMcLimitSplitOptimization = false;

@VariableMgr.VarAttr(name = SKEW_REWRITE_AGG_BUCKET_NUM, needForward = true,
description = {"bucketNum 参数控制 count(distinct) 倾斜优化的数据分布。决定不同值在 worker 间的分配方式,"
+ "值越大越能处理极端倾斜但增加 shuffle 开销,值越小网络开销越低但可能无法完全解决倾斜。",
Expand Down
Loading