Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions docs/api-reference/dynamic-configuration-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,13 @@ Host: http://ROUTER_IP:ROUTER_PORT
## Broker dynamic configuration

Broker dynamic configuration is managed through the Coordinator but consumed by Brokers.
These settings control broker behavior such as query blocking rules.
These settings control broker behavior such as query blocking rules and default query context values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the blocking we have this warning:

Note: Query blocking is best-effort. Queries may not be blocked in certain cases, such as when a Broker has recently started and hasn't received the config yet, or if the Broker cannot contact the Coordinator. Brokers poll the configuration periodically (default every 1 minute) and also receive push updates from the Coordinator for immediate propagation.

I think we need a similar warning for context. Maybe put this warning at the top of the "Broker dynamic configuration" section and make clear it applies to everything in here. I say this because in case default query context properties are "important" (i.e. operators really want them to always be set) they may be surprised that they are in fact not necessarily always going to be set.

This also makes me think at some point we might want to add a mode to make them actually guaranteed, i.e., block Broker startup until first sync of dynamic config. Doesn't have to be done in this PR.

Copy link
Contributor Author

@jtuglu1 jtuglu1 Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I think we can make this an opt-in config on the broker, but I think it should be a separate change.


> **Note:** Broker dynamic configuration is best-effort. Settings may not be applied in certain
> cases, such as when a Broker has recently started and hasn't received the config yet, or if the
> Broker cannot contact the Coordinator. Brokers poll the configuration periodically (default every
> 1 minute) and also receive push updates from the Coordinator for immediate propagation. If a
> setting is critical and must always be applied, use the equivalent static runtime property instead.

### Get broker dynamic configuration

Expand Down Expand Up @@ -366,7 +372,11 @@ Host: http://ROUTER_IP:ROUTER_PORT
"dataSources": ["large_table"],
"queryTypes": ["scan"]
}
]
],
"queryContext": {
"priority": 0,
"timeout": 300000
}
}
```

Expand Down Expand Up @@ -417,7 +427,7 @@ The endpoint supports a set of optional header parameters to populate the audit
curl -X POST "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/broker/config" \
-H "Content-Type: application/json" \
-H "X-Druid-Author: admin" \
-H "X-Druid-Comment: Add query blocklist rules" \
-H "X-Druid-Comment: Add query blocklist rules and set default context" \
-d '{
"queryBlocklist": [
{
Expand All @@ -431,7 +441,11 @@ curl -X POST "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/broker/config" \
"debug": "true"
}
}
]
],
"queryContext": {
"priority": 0,
"timeout": 300000
}
}'
```

Expand All @@ -444,7 +458,7 @@ POST /druid/coordinator/v1/broker/config HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json
X-Druid-Author: admin
X-Druid-Comment: Add query blocklist rules
X-Druid-Comment: Add query blocklist rules and set default context

{
"queryBlocklist": [
Expand All @@ -459,7 +473,11 @@ X-Druid-Comment: Add query blocklist rules
"debug": "true"
}
}
]
],
"queryContext": {
"priority": 0,
"timeout": 300000
}
}
```

Expand All @@ -477,6 +495,7 @@ The following table shows the dynamic configuration properties for the Broker.
|Property|Description|Default|
|--------|-----------|-------|
|`queryBlocklist`| List of rules to block queries based on datasource, query type, and/or query context parameters. Each rule defines criteria that are combined with AND logic. Blocked queries return an HTTP 403 error. See [Query blocklist rules](#query-blocklist-rules) for details.|none|
|`queryContext`| Map of default query context key-value pairs applied to all queries on this broker. These values override static defaults set via runtime properties (`druid.query.default.context.*`) but are overridden by context values supplied in individual query payloads. Useful for setting cluster-wide defaults such as `priority` or `timeout` without restarting. See [Query context reference](../querying/query-context-reference.md) for available keys.|none|

#### Query blocklist rules

Expand All @@ -499,8 +518,6 @@ Each rule in the `queryBlocklist` array is a JSON object with the following prop
- At least one criterion must be specified per rule to prevent accidentally blocking all queries
- A query is blocked if it matches ANY rule in the blocklist (OR logic between rules)

> **Note:** Query blocking is best-effort. Queries may not be blocked in certain cases, such as when a Broker has recently started and hasn't received the config yet, or if the Broker cannot contact the Coordinator. Brokers poll the configuration periodically (default every 1 minute) and also receive push updates from the Coordinator for immediate propagation.

**Error response:**

When a query is blocked, the Broker returns an HTTP 403 error with a message indicating the query ID and the rule that blocked it:
Expand Down Expand Up @@ -586,7 +603,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"comment": "Add query blocklist rules",
"ip": "127.0.0.1"
},
"payload": "{\"queryBlocklist\":[{\"ruleName\":\"block-expensive-scans\",\"dataSources\":[\"large_table\"],\"queryTypes\":[\"scan\"]}]}",
"payload": "{\"queryBlocklist\":[{\"ruleName\":\"block-expensive-scans\",\"dataSources\":[\"large_table\"],\"queryTypes\":[\"scan\"]}],\"queryContext\":{\"priority\":0,\"timeout\":300000}}",
"auditTime": "2024-03-06T12:00:00.000Z"
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.server;

import org.apache.druid.audit.AuditInfo;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.server.QueryBlocklistRule;
import org.apache.druid.server.broker.BrokerDynamicConfig;
import org.apache.druid.server.http.BrokerDynamicConfigSyncer;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.indexing.Resources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Integration test for broker dynamic configuration
*/
public class EmbeddedBrokerDynamicConfigTest extends EmbeddedClusterTestBase
{
// Fixed datasource ingested once for all tests; restored before each test since the
// base class @BeforeEach would otherwise assign a fresh name.
private String fixedDataSource;

private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
private final EmbeddedHistorical historical = new EmbeddedHistorical();
private final EmbeddedBroker broker = new EmbeddedBroker();

@Override
protected EmbeddedDruidCluster createCluster()
{
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");

return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.addServer(overlord)
.addServer(coordinator)
.addServer(indexer)
.addServer(historical)
.addServer(broker);
}

@BeforeAll
@Override
public void setup() throws Exception
{
fixedDataSource = EmbeddedClusterApis.createTestDatasourceName();
dataSource = fixedDataSource;
super.setup();
ingestData();
cluster.callApi().waitForAllSegmentsToBeAvailable(fixedDataSource, coordinator, broker);
}

@BeforeEach
@Override
protected void refreshDatasourceName()
{
dataSource = fixedDataSource;
}

@Test
@Timeout(30)
public void testQueryBlocklistBlocksMatchingQueries()
{
// Baseline: query succeeds before blocklist is applied
String initialResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
Assertions.assertFalse(initialResult.isBlank());

// Apply blocklist rule that matches all queries on this datasource
QueryBlocklistRule blockRule = new QueryBlocklistRule(
"block-test-datasource",
Set.of(dataSource),
null,
null
);
updateBrokerDynamicConfig(
BrokerDynamicConfig.builder()
.withQueryBlocklist(List.of(blockRule))
.build()
);

// Query should now throw due to FORBIDDEN blocklist rule
Assertions.assertThrows(
RuntimeException.class,
() -> cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource)
);

// Clear the blocklist and verify queries resume
updateBrokerDynamicConfig(BrokerDynamicConfig.builder().build());
String finalResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
Assertions.assertFalse(finalResult.isBlank());
}

@Test
@Timeout(30)
public void testDynamicQueryContextTimeoutCausesQueryToFail()
{
// Baseline: query succeeds before a timeout context is applied
String initialResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
Assertions.assertFalse(initialResult.isBlank());

// Apply a 1ms timeout via dynamic query context — any real query will expire before responding
updateBrokerDynamicConfig(
BrokerDynamicConfig.builder()
.withQueryContext(Map.of("timeout", 1))
.build()
);

// Query should now throw due to the timeout being exceeded
Assertions.assertThrows(
RuntimeException.class,
() -> cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource)
);

// Clear the dynamic context and verify queries resume
updateBrokerDynamicConfig(BrokerDynamicConfig.builder().build());
String finalResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
Assertions.assertFalse(finalResult.isBlank());
}

private void ingestData()
{
cluster.callApi().runTask(
TaskBuilder.ofTypeIndex()
.dataSource(dataSource)
.isoTimestampColumn("time")
.csvInputFormatWithColumns("time", "item", "value")
.inlineInputSourceWithData(Resources.InlineData.CSV_10_DAYS)
.segmentGranularity("DAY")
.dimensions()
.withId(IdUtils.getRandomId()),
overlord
);
}

/**
* Updates the broker dynamic config on the coordinator and synchronously broadcasts it
* to all brokers.
*
* Uses {@link JacksonConfigManager} directly to avoid the HTTP endpoint's builder-merge
* semantics, which cannot distinguish "clear to empty" from "not specified" when fields
* are omitted via {@code @JsonInclude(NON_EMPTY)}.
*/
private void updateBrokerDynamicConfig(BrokerDynamicConfig config)
{
coordinator.bindings()
.getInstance(JacksonConfigManager.class)
.set(BrokerDynamicConfig.CONFIG_KEY, config, new AuditInfo("test", "test@test.com", "Testing", "127.0.0.1"));
coordinator.bindings()
.getInstance(BrokerDynamicConfigSyncer.class)
.broadcastConfigToBrokers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* @see org.apache.druid.query.scan.ScanQueryConfig
*
*/
public class DefaultQueryConfig
public class DefaultQueryConfig implements QueryContextProvider
{
/**
* Config that does nothing.
Expand All @@ -54,6 +54,7 @@ public class DefaultQueryConfig
@JsonProperty
private final Map<String, Object> context;

@Override
@Nonnull
public Map<String, Object> getContext()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import java.util.Map;

/**
* Provides the default query context applied to all incoming queries before per-query overrides are merged in.
*
* <p>On non-broker nodes this is backed by static runtime properties ({@link DefaultQueryConfig}).
* On brokers it is backed by {@code BrokerViewOfBrokerConfig}, which merges the static defaults with
* operator-supplied overrides pushed dynamically from the Coordinator.
*/
public interface QueryContextProvider
{
Map<String, Object> getContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public synchronized DynamicConfig getDynamicConfig()
public synchronized void setDynamicConfig(@NotNull DynamicConfig updatedConfig)
{
config = updatedConfig;
log.info("Updated %s to [%s]", getConfigTypeName(), updatedConfig);
log.info("Updated [%s] dynamic config to [%s]", getConfigTypeName(), updatedConfig);
}

/**
Expand Down
Loading
Loading