Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
import org.apache.druid.segment.metadata.IndexingStateCache;
import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
Expand All @@ -64,8 +66,8 @@
import org.apache.druid.server.compaction.InlineReindexingRuleProvider;
import org.apache.druid.server.compaction.MostFragmentedIntervalFirstPolicy;
import org.apache.druid.server.compaction.ReindexingDeletionRule;
import org.apache.druid.server.compaction.ReindexingSegmentGranularityRule;
import org.apache.druid.server.compaction.ReindexingTuningConfigRule;
import org.apache.druid.server.compaction.ReindexingIndexSpecRule;
import org.apache.druid.server.compaction.ReindexingPartitioningRule;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
Expand Down Expand Up @@ -188,9 +190,10 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.withTuningConfig(
createTuningConfigWithPartitionsSpec(
new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false)
)
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false))
.build()
)
.build();

Expand All @@ -211,9 +214,10 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null)
)
.withTuningConfig(
createTuningConfigWithPartitionsSpec(
new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false)
)
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false))
.build()
)
.build();

Expand Down Expand Up @@ -381,9 +385,10 @@ public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFinger
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.withTuningConfig(
createTuningConfigWithPartitionsSpec(
new DimensionRangePartitionsSpec(1000, null, List.of("item"), false)
)
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(1000, null, List.of("item"), false))
.build()
)
.build();

Expand Down Expand Up @@ -434,24 +439,21 @@ public void test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompac
);
Assertions.assertEquals(16, getNumSegmentsWith(Granularities.FIFTEEN_MINUTE));

ReindexingSegmentGranularityRule hourRule = new ReindexingSegmentGranularityRule(
ReindexingPartitioningRule hourRule = new ReindexingPartitioningRule(
"hourRule",
"Compact to HOUR granularity for data older than 1 days",
Period.days(1),
Granularities.HOUR
Granularities.HOUR,
new DimensionRangePartitionsSpec(1000, null, List.of("item"), false),
null
);
ReindexingSegmentGranularityRule dayRule = new ReindexingSegmentGranularityRule(
ReindexingPartitioningRule dayRule = new ReindexingPartitioningRule(
"dayRule",
"Compact to DAY granularity for data older than 7 days",
Period.days(7),
Granularities.DAY
);

ReindexingTuningConfigRule tuningConfigRule = new ReindexingTuningConfigRule(
"tuningConfigRule",
"Use dimension range partitioning with max 1000 rows per segment",
Period.days(1),
createTuningConfigWithPartitionsSpec(new DimensionRangePartitionsSpec(1000, null, List.of("item"), false))
Granularities.DAY,
new DimensionRangePartitionsSpec(1000, null, List.of("item"), false),
null
);

ReindexingDeletionRule deletionRule = new ReindexingDeletionRule(
Expand All @@ -462,11 +464,18 @@ public void test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompac
null
);

ReindexingIndexSpecRule indexSpecRule = new ReindexingIndexSpecRule(
"indexSpecRule",
null,
Period.days(7),
new IndexSpec.Builder().withDimensionCompression(CompressionStrategy.ZSTD).build()
);

InlineReindexingRuleProvider ruleProvider = InlineReindexingRuleProvider
.builder()
.segmentGranularityRules(List.of(hourRule, dayRule))
.tuningConfigRules(List.of(tuningConfigRule))
.partitioningRules(List.of(hourRule, dayRule))
.deletionRules(List.of(deletionRule))
.indexSpecRules(List.of(indexSpecRule))
.build();

CascadingReindexingTemplate cascadingReindexingTemplate = new CascadingReindexingTemplate(
Expand All @@ -477,7 +486,10 @@ public void test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompac
null,
null,
null,
Granularities.HOUR
Granularities.HOUR,
new DynamicPartitionsSpec(null, null),
null,
null
);
runCompactionWithSpec(cascadingReindexingTemplate);
waitForAllCompactionTasksToFinish();
Expand Down Expand Up @@ -537,25 +549,20 @@ public void test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrec
virtualColumns
);

ReindexingTuningConfigRule tuningConfigRule = new ReindexingTuningConfigRule(
"tuningConfigRule",
null,
Period.days(7),
createTuningConfigWithPartitionsSpec(new DynamicPartitionsSpec(null, null))
);

CascadingReindexingTemplate cascadingTemplate = new CascadingReindexingTemplate(
dataSource,
null,
null,
InlineReindexingRuleProvider.builder()
.deletionRules(List.of(deletionRule))
.tuningConfigRules(List.of(tuningConfigRule))
.build(),
null,
null,
null,
Granularities.DAY
Granularities.DAY,
new DynamicPartitionsSpec(null, null),
null,
null
);

runCompactionWithSpec(cascadingTemplate);
Expand Down Expand Up @@ -634,17 +641,22 @@ public void test_compactionWithTransformFilteringAllRows_createsTombstones(
);
}

// Add partitioning spec based on test parameter
if ("range".equals(partitionType)) {
PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false);
builder.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build());
builder.withTuningConfig(
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false))
.maxNumConcurrentSubTasks(2)
.build()
);
} else {
// Hash partitioning
PartitionsSpec partitionsSpec = new HashedPartitionsSpec(null, null, null);
builder.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder()
.partitionsSpec(partitionsSpec)
.maxNumConcurrentSubTasks(2)
.build());
builder.withTuningConfig(
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new HashedPartitionsSpec(null, null, null))
.maxNumConcurrentSubTasks(2)
.build()
);
}

InlineSchemaDataSourceCompactionConfig compactionConfig = builder.build();
Expand Down Expand Up @@ -700,7 +712,12 @@ public void test_compaction_legacy_string_discovery_sparse_column(
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null)
)
.withTuningConfig(createTuningConfigWithPartitionsSpec(new DynamicPartitionsSpec(null, null)))
.withTuningConfig(
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DynamicPartitionsSpec(null, null))
.build()
)
.build();

runCompactionWithSpec(config);
Expand Down Expand Up @@ -911,29 +928,4 @@ private void verifyEventCountOlderThan(Period period, String dimension, String v
)
);
}

private UserCompactionTaskQueryTuningConfig createTuningConfigWithPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
}
Loading
Loading