Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.http.security.StateResourceFilter;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -129,32 +128,11 @@ public Void getWorkerState(
final AsyncContext asyncContext = req.startAsync();

asyncContext.addListener(
new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public void onTimeout(AsyncEvent event)
{

// HTTP 204 NO_CONTENT is sent to the client.
future.cancel(true);
event.getAsyncContext().complete();
}

@Override
public void onError(AsyncEvent event)
{
}

@Override
public void onStartAsync(AsyncEvent event)
{
}
}
ServletResourceUtils.createAsyncTimeoutListener(event -> {
// HTTP 204 NO_CONTENT is sent to the client.
future.cancel(true);
event.getAsyncContext().complete();
})
);

Futures.addCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ protected Pair<ServiceClient, Closeable> makeNewClient(final WorkerId workerId)
/**
* Service client that adds the {@link DartWorkerResource#HEADER_CONTROLLER_HOST} header.
*/
private static class ControllerDecoratedClient implements ServiceClient
public static class ControllerDecoratedClient implements ServiceClient
{
private final ServiceClient delegate;
private final String controllerHost;

ControllerDecoratedClient(final ServiceClient delegate, final String controllerHost)
public ControllerDecoratedClient(final ServiceClient delegate, final String controllerHost)
{
this.delegate = delegate;
this.controllerHost = controllerHost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public WorkerContext build(
jsonMapper,
policyEnforcer,
injector,
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, null),
createWorkerClient(queryId),
processingConfig,
segmentWrangler,
groupingEngine,
Expand All @@ -135,4 +135,9 @@ public WorkerContext build(
emitter
);
}

protected DartWorkerClient createWorkerClient(String queryId)
{
return new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
Expand Down Expand Up @@ -72,6 +73,11 @@ public class DartWorkerRunner
@GuardedBy("this")
private final Set<String> activeControllerHosts = new HashSet<>();

/**
* Used to track the time since this runner has been idle.
*/
private final Stopwatch sinceLastWorkerFinished = Stopwatch.createUnstarted();

/**
* Query ID -> Worker instance.
*/
Expand Down Expand Up @@ -147,6 +153,7 @@ public Worker startWorker(
holder,
e -> log.warn(e, "Failed to close worker[%s]", holder.worker.id())
);
resetWorkerFinishTime();
this.notifyAll();
}
},
Expand Down Expand Up @@ -196,7 +203,9 @@ public GetWorkersResponse getWorkersResponse()
{
final List<DartWorkerInfo> infos = new ArrayList<>();

final long idleDurationMillis;
synchronized (this) {
idleDurationMillis = workerMap.isEmpty() ? sinceLastWorkerFinished.millisElapsed() : 0L;
for (final Map.Entry<String, WorkerHolder> entry : workerMap.entrySet()) {
final String queryId = entry.getKey();
final WorkerHolder workerHolder = entry.getValue();
Expand All @@ -211,7 +220,22 @@ public GetWorkersResponse getWorkersResponse()
}
}

return new GetWorkersResponse(infos);
return new GetWorkersResponse(infos, idleDurationMillis);
}

/**
* Sets the finish time of the last worker to the current time. Used to track
* the duration for which this runner has been idle.
*/
public void resetWorkerFinishTime()
{
synchronized (this) {
if (sinceLastWorkerFinished.isRunning()) {
sinceLastWorkerFinished.restart();
} else {
sinceLastWorkerFinished.start();
}
}
}

@LifecycleStart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@
public class GetWorkersResponse
{
private final List<DartWorkerInfo> workers;
private final long idleDurationMillis;

public GetWorkersResponse(@JsonProperty("workers") final List<DartWorkerInfo> workers)
public GetWorkersResponse(
@JsonProperty("workers") final List<DartWorkerInfo> workers,
@JsonProperty("idleDurationMillis") final long idleDurationMillis
)
{
this.workers = workers;
this.idleDurationMillis = idleDurationMillis;
}

@JsonProperty
Expand All @@ -43,6 +48,12 @@ public List<DartWorkerInfo> getWorkers()
return workers;
}

@JsonProperty
public long getIdleDurationMillis()
{
return idleDurationMillis;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -53,12 +64,13 @@ public boolean equals(Object o)
return false;
}
GetWorkersResponse that = (GetWorkersResponse) o;
return Objects.equals(workers, that.workers);
return Objects.equals(workers, that.workers)
&& idleDurationMillis == that.idleDurationMillis;
}

@Override
public int hashCode()
{
return Objects.hashCode(workers);
return Objects.hash(workers, idleDurationMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.serde.ClusterByStatisticsSnapshotSerde;
import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CloseableUtils;

import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -109,33 +108,13 @@ public Response httpGetChannelData(

asyncContext.setTimeout(GET_CHANNEL_DATA_TIMEOUT);
asyncContext.addListener(
new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public void onTimeout(AsyncEvent event)
{
if (responseResolved.compareAndSet(false, true)) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_OK);
event.getAsyncContext().complete();
}
}

@Override
public void onError(AsyncEvent event)
{
}

@Override
public void onStartAsync(AsyncEvent event)
{
ServletResourceUtils.createAsyncTimeoutListener(event -> {
if (responseResolved.compareAndSet(false, true)) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_OK);
event.getAsyncContext().complete();
}
}
})
);

// Save these items, since "req" becomes inaccessible in future exception handlers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void tearDown() throws Exception
public void test_getWorkersResponse_empty()
{
final GetWorkersResponse workersResponse = workerRunner.getWorkersResponse();
Assertions.assertEquals(new GetWorkersResponse(Collections.emptyList()), workersResponse);
Assertions.assertEquals(new GetWorkersResponse(Collections.emptyList(), 0L), workersResponse);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void test_serde() throws Exception
"localhost:8101",
DateTimes.of("2000")
)
)
),
0L
);
final GetWorkersResponse response2 =
jsonMapper.readValue(jsonMapper.writeValueAsBytes(response), GetWorkersResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
Expand Down Expand Up @@ -50,8 +51,8 @@ public abstract class DruidNodeDiscoveryProvider
ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
);

private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery> serviceDiscoveryMap =
new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size());
private final ConcurrentHashMap<ServiceAndRoles, ServiceDruidNodeDiscovery> serviceDiscoveryMap =
new ConcurrentHashMap<>(10);

public abstract BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole);

Expand All @@ -63,15 +64,26 @@ public abstract class DruidNodeDiscoveryProvider
*/
public DruidNodeDiscovery getForService(String serviceName)
{
return serviceDiscoveryMap.computeIfAbsent(
return getForServiceAndRoles(
serviceName,
service -> {
DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName)
);
}

Set<NodeRole> nodeRolesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service);
if (nodeRolesToWatch == null) {
throw new IAE("Unknown service [%s].", service);
/**
* Get DruidNodeDiscovery instance to discover nodes of a specific role that
* announce the given service in their metadata.
*/
public DruidNodeDiscovery getForServiceAndRoles(String serviceName, Set<NodeRole> nodeRolesToWatch)
{
return serviceDiscoveryMap.computeIfAbsent(
new ServiceAndRoles(serviceName, nodeRolesToWatch),
serviceAndRoles -> {
if (nodeRolesToWatch == null || nodeRolesToWatch.isEmpty()) {
throw InvalidInput.exception("No node role specified to watch for service[%s].", serviceName);
}
ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service, nodeRolesToWatch.size());
ServiceDruidNodeDiscovery serviceDiscovery =
new ServiceDruidNodeDiscovery(serviceName, nodeRolesToWatch.size());
DruidNodeDiscovery.Listener filteringGatheringUpstreamListener =
serviceDiscovery.filteringUpstreamListener();
for (NodeRole nodeRole : nodeRolesToWatch) {
Expand All @@ -82,6 +94,14 @@ public DruidNodeDiscovery getForService(String serviceName)
);
}

/**
* Record containing serviceName and nodeRoles, used as a key in {@link #serviceDiscoveryMap}.
*/
private record ServiceAndRoles(String serviceName, Set<NodeRole> nodeRoles)
{

}

private static class ServiceDruidNodeDiscovery implements DruidNodeDiscovery
{
private static final Logger log = new Logger(ServiceDruidNodeDiscovery.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
import org.apache.druid.messages.MessageBatch;
import org.apache.druid.messages.client.MessageListener;
import org.apache.druid.messages.client.MessageRelayClient;
import org.apache.druid.server.http.ServletResourceUtils;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -113,34 +112,14 @@ public Void httpGetMessagesFromOutbox(
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(GET_MESSAGES_TIMEOUT);
asyncContext.addListener(
new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public void onTimeout(AsyncEvent event)
{
if (didRespond.compareAndSet(false, true)) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
event.getAsyncContext().complete();
batchFuture.cancel(true);
}
}

@Override
public void onError(AsyncEvent event)
{
}

@Override
public void onStartAsync(AsyncEvent event)
{
ServletResourceUtils.createAsyncTimeoutListener(event -> {
if (didRespond.compareAndSet(false, true)) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
event.getAsyncContext().complete();
batchFuture.cancel(true);
}
}
})
);

// Save these items, since "req" becomes inaccessible in future exception handlers.
Expand Down
Loading
Loading