Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.emitter.kafka.KafkaEmitter;
import org.apache.druid.emitter.kafka.KafkaEmitterModule;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.MostFragmentedIntervalFirstPolicy;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
Expand All @@ -50,10 +55,14 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Embedded test to emit cluster metrics using a {@link KafkaEmitter} and then
Expand Down Expand Up @@ -96,7 +105,8 @@ public void stop()
}
};

indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
indexer.setServerMemory(1_000_000_000L)
.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
.addProperty("druid.worker.capacity", "10");
overlord.addProperty("druid.indexer.task.default.context", "{\"useConcurrentLocks\": true}")
.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced")
Expand Down Expand Up @@ -128,6 +138,20 @@ public void stop()
return cluster;
}

public static Stream<Arguments> getCompactionSupervisorTestParams()
{
return Stream.of(
Arguments.of(
CompactionEngine.NATIVE,
new NewestSegmentFirstPolicy(null)
),
Arguments.of(
CompactionEngine.MSQ,
new MostFragmentedIntervalFirstPolicy(1, HumanReadableBytes.valueOf(1), null, 80, null)
)
);
}

@Test
@Timeout(20)
public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
Expand Down Expand Up @@ -176,9 +200,13 @@ public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}

@Test
@MethodSource("getCompactionSupervisorTestParams")
@ParameterizedTest(name = "engine={0}, policy={1}")
@Timeout(120)
public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkipKillOfUnusedSegments()
public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkipKillOfUnusedSegments(
CompactionEngine engine,
CompactionCandidateSearchPolicy policy
)
{
final int maxRowsPerSegment = 500;
final int compactedMaxRowsPerSegment = 5000;
Expand Down Expand Up @@ -213,7 +241,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
);

final ClusterCompactionConfig updatedCompactionConfig
= new ClusterCompactionConfig(1.0, 10, null, true, null, null);
= new ClusterCompactionConfig(1.0, 10, policy, true, engine, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
Expand All @@ -237,6 +265,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
overlord.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.TASK_TYPE, "compact")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasDimension(DruidMetrics.TASK_STATUS, "SUCCESS"),
agg -> agg.hasCountAtLeast(10)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSlotManager;
import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator;
import org.apache.druid.server.compaction.Eligibility;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
Expand Down Expand Up @@ -94,7 +94,7 @@ public List<CompactionJob> createCompactionJobs(
// Create a job for each CompactionCandidate
while (segmentIterator.hasNext()) {
final CompactionCandidate candidate = segmentIterator.next();
final CompactionCandidateSearchPolicy.Eligibility eligibility =
final Eligibility eligibility =
params.getClusterCompactionConfig()
.getCompactionPolicy()
.checkEligibilityForCompaction(candidate, params.getLatestTaskStatus(candidate));
Expand Down Expand Up @@ -126,7 +126,7 @@ public List<CompactionJob> createCompactionJobs(
);
ClientCompactionTaskQuery taskPayload = CompactSegments.createCompactionTask(
finalCandidate,
eligibility.getMode(),
eligibility,
finalConfig,
engine,
indexingStateFingerprint,
Expand All @@ -138,7 +138,8 @@ public List<CompactionJob> createCompactionJobs(
finalCandidate,
CompactionSlotManager.computeSlotsRequiredForTask(taskPayload),
indexingStateFingerprint,
compactionState
compactionState,
eligibility
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.indexing.template.BatchIndexingJob;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.Eligibility;
import org.apache.druid.timeline.CompactionState;

/**
Expand All @@ -34,32 +34,20 @@ public class CompactionJob extends BatchIndexingJob
private final int maxRequiredTaskSlots;
private final String targetIndexingStateFingerprint;
private final CompactionState targetIndexingState;
private final Eligibility eligibility;

public CompactionJob(
ClientCompactionTaskQuery task,
CompactionCandidate candidate,
int maxRequiredTaskSlots,
String targetIndexingStateFingerprint,
CompactionState targetIndexingState
CompactionState targetIndexingState,
Eligibility eligibility
)
{
super(task, null);
this.candidate = candidate;
this.maxRequiredTaskSlots = maxRequiredTaskSlots;
this.targetIndexingStateFingerprint = targetIndexingStateFingerprint;
this.targetIndexingState = targetIndexingState;
}

public CompactionJob(
ClientSqlQuery msqQuery,
CompactionCandidate candidate,
int maxRequiredTaskSlots,
String targetIndexingStateFingerprint,
CompactionState targetIndexingState
)
{
super(null, msqQuery);
this.candidate = candidate;
this.eligibility = eligibility;
this.maxRequiredTaskSlots = maxRequiredTaskSlots;
this.targetIndexingStateFingerprint = targetIndexingStateFingerprint;
this.targetIndexingState = targetIndexingState;
Expand Down Expand Up @@ -90,6 +78,11 @@ public CompactionState getTargetIndexingState()
return targetIndexingState;
}

public Eligibility getEligibility()
{
return eligibility;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Iterates over all eligible compaction jobs in order of their priority.
Expand Down Expand Up @@ -198,7 +197,7 @@ public void removeJobs(String dataSource)
final List<CompactionJob> jobsToRemove = queue
.stream()
.filter(job -> job.getDataSource().equals(dataSource))
.collect(Collectors.toList());
.toList();

queue.removeAll(jobsToRemove);
log.info("Removed [%d] jobs for datasource[%s] from queue.", jobsToRemove.size(), dataSource);
Expand All @@ -221,7 +220,10 @@ public void runReadyJobs()
while (!queue.isEmpty()) {
final CompactionJob job = queue.poll();
if (startJobIfPendingAndReady(job, pendingJobs, slotManager)) {
runStats.add(Stats.Compaction.SUBMITTED_TASKS, RowKey.of(Dimension.DATASOURCE, job.getDataSource()), 1);
final RowKey rowKey = RowKey
.with(Dimension.DATASOURCE, job.getDataSource())
.and(Dimension.DESCRIPTION, job.getEligibility().getMode().name());
runStats.add(Stats.Compaction.SUBMITTED_TASKS, rowKey, 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,14 @@ private void assertQueryToTask(ClientCompactionTaskQuery query, CompactionTask t
{
Assert.assertEquals(query.getId(), task.getId());
Assert.assertEquals(query.getDataSource(), task.getDataSource());
Assert.assertTrue(query.getIoConfig().getInputSpec() instanceof ClientCompactionIntervalSpec);
Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec);
Assert.assertEquals(
query.getIoConfig().getInputSpec().getInterval(),
((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getInterval()
);
Assert.assertEquals(
query.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds(),
((ClientCompactionIntervalSpec) query.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds(),
((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds()
);
Assert.assertEquals(
Expand Down Expand Up @@ -301,7 +302,7 @@ private ClientCompactionTaskQuery createCompactionTaskQuery(String id, Compactio
id,
"datasource",
new ClientCompactionIOConfig(
new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), null, "testSha256OfSortedSegmentIds"), true
new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true
),
new ClientCompactionTaskQueryTuningConfig(
100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ public class ClientCompactionIOConfig
{
private static final String TYPE = "compact";

private final ClientCompactionIntervalSpec inputSpec;
private final ClientCompactionInputSpec inputSpec;
private final boolean dropExisting;

@JsonCreator
public ClientCompactionIOConfig(
@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec,
@JsonProperty("inputSpec") ClientCompactionInputSpec inputSpec,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
Expand All @@ -55,7 +55,7 @@ public String getType()
}

@JsonProperty
public ClientCompactionIntervalSpec getInputSpec()
public ClientCompactionInputSpec getInputSpec()
{
return inputSpec;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client.indexing;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.joda.time.Interval;

/**
* Client side equivalent of {@code CompactionInputSpec}. Required since the
* {@code CompactionInputSpec} resides in {@code indexing-service} module.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = ClientCompactionIntervalSpec.TYPE, value = ClientCompactionIntervalSpec.class),
@JsonSubTypes.Type(name = ClientMinorCompactionInputSpec.TYPE, value = ClientMinorCompactionInputSpec.class)
})
public interface ClientCompactionInputSpec
{
/**
* @return non-null Interval that this input spec operates on.
*/
Interval getInterval();
}
Loading
Loading