Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.CompactionEngine;
Expand All @@ -32,12 +36,12 @@
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.compact.CascadingReindexingTemplate;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
Expand All @@ -49,12 +53,14 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
import org.apache.druid.segment.metadata.IndexingStateCache;
import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
Expand All @@ -76,6 +82,7 @@
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
Expand All @@ -85,11 +92,12 @@
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.testing.tools.EventSerializer;
import org.apache.druid.testing.tools.ITRetryUtil;
import org.apache.druid.testing.tools.JsonEventSerializer;
import org.apache.druid.testing.tools.StreamGenerator;
import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
import org.apache.druid.timeline.DataSegment;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
Expand All @@ -101,18 +109,19 @@
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Embedded test that runs compaction supervisors of various types.
*/
public class CompactionSupervisorTest extends EmbeddedClusterTestBase
{
private final KafkaResource kafkaServer = new KafkaResource();
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
.setServerMemory(2_000_000_000L)
Expand All @@ -138,12 +147,7 @@ public EmbeddedDruidCluster createCluster()
"[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]"
)
.addCommonProperty("druid.policy.enforcer.type", "restrictAllTables")
.addExtensions(
CatalogClientModule.class,
CatalogCoordinatorModule.class,
KafkaIndexTaskModule.class
)
.addResource(kafkaServer)
.addExtensions(CatalogClientModule.class, CatalogCoordinatorModule.class)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
Expand All @@ -156,7 +160,14 @@ public EmbeddedDruidCluster createCluster()
private void configureCompaction(CompactionEngine compactionEngine, @Nullable CompactionCandidateSearchPolicy policy)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, policy, true, compactionEngine, true))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
1.0,
100,
policy,
true,
compactionEngine,
true
))
);
Assertions.assertTrue(updateResponse.isSuccess());
}
Expand Down Expand Up @@ -236,24 +247,14 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex
CompactionEngine.MSQ,
new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null)
);
KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = MoreResources.Supervisor.KAFKA_JSON
.get()
.withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
.withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(1))
.withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));

// Set up first topic and supervisor
final String topic1 = IdUtils.getRandomId();
kafkaServer.createTopicWithPartitions(topic1, 1);
final KafkaSupervisorSpec supervisor1 = kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
cluster.callApi().postSupervisor(supervisor1);

final int totalRowCount = publish1kRecords(topic1, true) + publish1kRecords(topic1, false);
waitUntilPublishedRecordsAreIngested(totalRowCount);
ingest1kRecords();
ingest1kRecords();

// Before compaction
Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
waitSegmentsAvailableInBroker();
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(2000, getTotalRowCount());

// Create a compaction config with DAY granularity
InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
Expand All @@ -276,31 +277,28 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex
waitForAllCompactionTasksToFinish();

pauseCompaction(dayGranularityConfig);
Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));

overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
waitSegmentsAvailableInBroker();
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(2000, getTotalRowCount());

verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);

// published another 1k
final int appendedRowCount = publish1kRecords(topic1, true);
indexer.latchableEmitter().flush();
waitUntilPublishedRecordsAreIngested(appendedRowCount);
// ingest another 2k
ingest1kRecords();
ingest1kRecords();

// Tear down both topics and supervisors
kafkaServer.deleteTopic(topic1);
cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
waitSegmentsAvailableInBroker();
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(4000, getTotalRowCount());

long totalUsed = overlord.latchableEmitter().getMetricValues(
"segment/metadataCache/used/count",
Map.of(DruidMetrics.DATASOURCE, dataSource)
).stream().reduce((first, second) -> second).orElse(0).longValue();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
// 1 compacted segment + 2 appended segment
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(3000, getTotalRowCount());

runCompactionWithSpec(dayGranularityConfig);
waitForAllCompactionTasksToFinish();

Expand All @@ -310,45 +308,44 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueMatching(Matchers.greaterThan(totalUsed)));

// performed minor compaction: 1 previously compacted segment + 1 incrementally compacted segment
// performed minor compaction: 1 previously compacted segment + 1 recently compacted segment from minor compaction
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(3000, getTotalRowCount());
Assertions.assertEquals(4000, getTotalRowCount());
}

protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
{
indexer.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("ingest/events/processed")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasSumAtLeast(expectedRowCount)
);

final int totalEventsProcessed = indexer
.latchableEmitter()
.getMetricValues("ingest/events/processed", Map.of(DruidMetrics.DATASOURCE, dataSource))
.stream()
.mapToInt(Number::intValue)
.sum();
Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
}

protected int publish1kRecords(String topic, boolean useTransactions)
protected void ingest1kRecords()
{
final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper());
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 100, 100);
List<byte[]> records = streamGenerator.generateEvents(10);

ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new ArrayList<>();
for (byte[] record : records) {
producerRecords.add(new ProducerRecord<>(topic, record));
}

if (useTransactions) {
kafkaServer.produceRecordsToTopic(producerRecords);
} else {
kafkaServer.produceRecordsWithoutTransaction(producerRecords);
}
return producerRecords.size();
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the large number of records crucial for this test?
If not, you could try using some of the templates from MoreResources such as MoreResources.Task.BASIC_INDEX, MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS or MoreResources.MSQ.INSERT_TINY_WIKI_JSON.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a large dataset (wikipedia 1 day = 24k rows), you could also try the following (from IngestionSmokeTest.test_runIndexParallelTask_andCompactData())

final String taskId = IdUtils.getRandomId();
    final ParallelIndexSupervisorTask task = TaskBuilder
        .ofTypeIndexParallel()
        .timestampColumn("timestamp")
        .jsonInputFormat()
        .inputSource(Resources.HttpData.wikipedia1Day())
        .dimensions()
        .tuningConfig(t -> t.withMaxNumConcurrentSubTasks(1))
        .dataSource(dataSource)
        .withId(taskId);
    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
    cluster.callApi().waitForTaskToSucceed(taskId, eventCollector.latchableEmitter());

List<byte[]> records = streamGenerator.generateEvents(2);

final InlineInputSource input = new InlineInputSource(
records.stream().map(b -> new String(b, StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
input,
new JsonInputFormat(null, null, null, null, null),
true,
null
);
final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec(
DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
.build(),
ioConfig,
TuningConfigBuilder.forParallelIndexTask().build()
);
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
taskId,
null,
null,
indexIngestionSpec,
null
);
cluster.callApi().submitTask(task);
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
}

@MethodSource("getEngine")
Expand Down Expand Up @@ -842,6 +839,32 @@ private int getNumSegmentsWith(Granularity granularity)
.count();
}

private void waitSegmentsAvailableInBroker()
{
Set<SegmentId> segments = overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.stream()
.map(DataSegment::getId)
.collect(Collectors.toSet());

ITRetryUtil.retryUntilEquals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wait for a Broker metric instead.
Does cluster.callApi().waitForSegmentsToBeAvailable() not work for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ITRetryUtil, try using cluster.callApi().waitForResult().

() ->
broker.bindings()
.getInstance(BrokerServerView.class)
.getTimeline(TableDataSource.create(dataSource))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of querying the timeline directly, please use SELECT id FROM sys.segments.

.get()
.iterateAllObjects()
.stream()
.map(ServerSelector::getSegment)
.map(DataSegment::getId)
.collect(Collectors.toSet()).containsAll(segments),
true,
"wait until segments are available in broker"
);
}

private void runIngestionAtGranularity(
String granularity,
String inlineDataCsv
Expand All @@ -859,7 +882,7 @@ private void runIngestionAtGranularity(
public static List<PartitionsSpec> getPartitionsSpec()
{
return List.of(
new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false),
new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false),
new DynamicPartitionsSpec(null, null)
);
}
Expand Down
Loading