Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class DecomposeRepeatWithPreAggregation extends DefaultPlanRewriter<Disti
public static final DecomposeRepeatWithPreAggregation INSTANCE = new DecomposeRepeatWithPreAggregation();
private static final Set<Class<? extends AggregateFunction>> SUPPORT_AGG_FUNCTIONS =
ImmutableSet.of(Sum.class, Sum0.class, Min.class, Max.class, AnyValue.class, Count.class);
private static final int DECOMPOSE_REPEAT_THRESHOLD = 3;

@Override
public Plan rewriteRoot(Plan plan, JobContext jobContext) {
Expand Down Expand Up @@ -404,7 +405,7 @@ private int canOptimize(LogicalAggregate<? extends Plan> aggregate, ConnectConte
// This is an empirical threshold: when there are too few grouping sets,
// the overhead of creating CTE and union may outweigh the benefits.
// The value 3 is chosen heuristically based on practical experience.
if (groupingSets.size() <= connectContext.getSessionVariable().decomposeRepeatThreshold) {
if (groupingSets.size() <= DECOMPOSE_REPEAT_THRESHOLD) {
return -1;
}
return findMaxGroupingSetIndex(groupingSets);
Expand Down Expand Up @@ -492,6 +493,7 @@ private LogicalCTEProducer<LogicalAggregate<Plan>> constructProducer(LogicalAggr
LogicalCTEProducer<LogicalAggregate<Plan>> producer =
new LogicalCTEProducer<>(ctx.statementContext.getNextCTEId(), preAggClone);
ctx.cteProducerList.add(producer);
producer.accept(new StatsDerive(false), new DeriveContext());
return producer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class AggregateUtils {
public static final double MID_CARDINALITY_THRESHOLD = 0.01;
public static final double HIGH_CARDINALITY_THRESHOLD = 0.1;
public static final int LOW_NDV_THRESHOLD = 1024;
public static final int NDV_INSTANCE_BALANCE_MULTIPLIER = 512;

public static AggregateFunction tryConvertToMultiDistinct(AggregateFunction function) {
if (function instanceof SupportMultiDistinct && function.isDistinct()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,6 @@ public class SessionVariable implements Serializable, Writable {

public static final String SKEW_REWRITE_AGG_BUCKET_NUM = "skew_rewrite_agg_bucket_num";
public static final String AGG_SHUFFLE_USE_PARENT_KEY = "agg_shuffle_use_parent_key";
public static final String DECOMPOSE_REPEAT_THRESHOLD = "decompose_repeat_threshold";
public static final String DECOMPOSE_REPEAT_SHUFFLE_INDEX_IN_MAX_GROUP
= "decompose_repeat_shuffle_index_in_max_group";

Expand Down Expand Up @@ -3437,8 +3436,6 @@ public boolean isEnableESParallelScroll() {
)
public boolean useV3StorageFormat = false;

@VariableMgr.VarAttr(name = DECOMPOSE_REPEAT_THRESHOLD)
public int decomposeRepeatThreshold = 3;
@VariableMgr.VarAttr(name = DECOMPOSE_REPEAT_SHUFFLE_INDEX_IN_MAX_GROUP)
public int decomposeRepeatShuffleIndexInMaxGroup = -1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,6 @@ public static boolean isBalanced(ColumnStatistic columnStatistic, double rowCoun
double balanceFactor = maxHotValueCntIncludeNull == 0
? Double.MAX_VALUE : rowsPerInstance / maxHotValueCntIncludeNull;
// The larger this factor is, the more balanced the data.
return balanceFactor > 2.0 && ndv > instanceNum * 3 && ndv > AggregateUtils.LOW_NDV_THRESHOLD;
return balanceFactor > 2.0 && ndv > instanceNum * AggregateUtils.NDV_INSTANCE_BALANCE_MULTIPLIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ public void testChoosePreAggShuffleKeyPartitionExprs() throws Exception {
}

/** Helper: build Statistics with column ndv for given expressions. */
private static Statistics statsWithNdv(Map<Expression, Double> exprToNdv) {
private static Statistics statsWithNdv(Map<Expression, Double> exprToNdv, int rows) {
Map<Expression, ColumnStatistic> map = new HashMap<>();
for (Map.Entry<Expression, Double> e : exprToNdv.entrySet()) {
ColumnStatistic col = new ColumnStatisticBuilder(1)
Expand All @@ -586,7 +586,7 @@ private static Statistics statsWithNdv(Map<Expression, Double> exprToNdv) {
.build();
map.put(e.getKey(), col);
}
return new Statistics(100, map);
return new Statistics(rows, map);
}

@Test
Expand All @@ -609,10 +609,10 @@ public void testChooseByAppearanceThenNdv() throws Exception {
);

Map<Expression, Double> exprToNdv = new HashMap<>();
exprToNdv.put(a, 400.0);
exprToNdv.put(b, 6000.0);
exprToNdv.put(c, 2000.0);
Statistics stats = statsWithNdv(exprToNdv);
exprToNdv.put(a, 4000.0);
exprToNdv.put(b, 60000.0);
exprToNdv.put(c, 20000.0);
Statistics stats = statsWithNdv(exprToNdv, 60000);

@SuppressWarnings("unchecked")
Optional<Expression> chosen = (Optional<Expression>) method.invoke(
Expand All @@ -628,14 +628,14 @@ public void testChooseByAppearanceThenNdv() throws Exception {

@SuppressWarnings("unchecked")
Optional<Expression> chosen2 = (Optional<Expression>) method.invoke(
rule, groupingSets, -1, candidates, stats, 1000);
rule, groupingSets, -1, candidates, stats, 50);
Assertions.assertTrue(chosen2.isPresent());
Assertions.assertEquals(b, chosen2.get());

// inputStats null -> chooseByNdv returns empty for every group -> empty
@SuppressWarnings("unchecked")
Optional<Expression> emptyNullStats = (Optional<Expression>) method.invoke(
rule, groupingSets, -1, candidates, null, 1000);
rule, groupingSets, -1, candidates, null, 50);
Assertions.assertFalse(emptyNullStats.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@
1 3 2 2 2 1
1 3 2 2 2 1

-- !nest_rewrite --
\N \N \N \N
1 \N \N \N
1 \N \N \N
1 \N \N \N
1 \N \N 10
1 2 \N \N
1 2 1 \N
1 2 1 1
1 2 3 \N
1 2 3 3
1 2 3 4
1 2 3 7
1 3 \N \N
1 3 2 \N
1 3 2 2

-- !upper_ref --
11 1 2 1
12 1 3 \N
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@
// under the License.

suite("decompose_repeat") {
// sql "set disable_nereids_rules='DECOMPOSE_REPEAT';"
sql "set disable_nereids_rules='DECOMPOSE_REPEAT';"
sql "drop table if exists t1;"
sql "create table t1(a int, b int, c int, d int) distributed by hash(a) properties('replication_num'='1');"
sql "insert into t1 values(1,2,3,4),(1,2,3,3),(1,2,1,1),(1,3,2,2);"
order_qt_sum "select a,b,c,sum(d) from t1 group by rollup(a,b,c);"
order_qt_agg_func_gby_key_same_col "select a,b,c,d,sum(d) from t1 group by rollup(a,b,c,d);"
order_qt_multi_agg_func "select a,b,c,sum(d),sum(c),max(a) from t1 group by rollup(a,b,c,d);"
// maybe this problem:DORIS-24075
// order_qt_nest_rewrite """
// select a,b,c,c1 from (
// select a,b,c,d,sum(d) c1 from t1 group by grouping sets((a,b,c),(a,b,c,d),(a),(a,b,c,c))
// ) t group by rollup(a,b,c,c1);
// """
order_qt_nest_rewrite """
select a,b,c,c1 from (
select a,b,c,d,sum(d) c1 from t1 group by grouping sets((a,b,c),(a,b,c,d),(a),(a,b,c,c))
) t group by rollup(a,b,c,c1);
"""
order_qt_upper_ref """
select c1+10,a,b,c from (select a,b,c,sum(d) c1 from t1 group by rollup(a,b,c)) t group by c1+10,a,b,c;
"""
Expand Down