Skip to content

KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams #20149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 92 additions & 104 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -874,16 +874,6 @@ public class StreamsConfig extends AbstractConfig {
public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";

private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS =
new String[] {
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
ProducerConfig.TRANSACTIONAL_ID_CONFIG
};

static {
CONFIG = new ConfigDef()
Expand Down Expand Up @@ -1282,33 +1272,59 @@ public class StreamsConfig extends AbstractConfig {

// this is the list of configs for underlying clients
// that streams prefer different default values
private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES = Map.of(ProducerConfig.LINGER_MS_CONFIG, "100");
private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
static {
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");

KS_DEFAULT_PRODUCER_CONFIGS = Collections.unmodifiableMap(tempProducerDefaultOverrides);
}

private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
static {
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
// Reduce the transaction timeout for quicker pending offset expiration on broker side.
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT);

KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides);
}

private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
private static final Map<String, Object> KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
static {
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
tempProducerDefaultOverrides.putAll(Map.of(
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
// Reduce the transaction timeout for quicker pending offset expiration on broker side.
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT
));
PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);

KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides);
}

private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES = Map.of(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
"internal.leave.group.on.close", false,
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"
);
private static final Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS;
static {
final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
tempConsumerDefaultOverrides.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic");

KS_DEFAULT_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
private static final Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS;
static {
final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
tempConsumerDefaultOverrides.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

KS_CONTROLLED_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

private static final Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED;
static {
final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(KS_CONTROLLED_CONSUMER_CONFIGS);
tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.toString());
CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);

KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

private static final Map<String, Object> ADMIN_CLIENT_OVERRIDES =
Expand Down Expand Up @@ -1676,10 +1692,7 @@ private Map<String, Object> getCommonConsumerConfigs() {

clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG);

checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);

final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
final Map<String, Object> consumerProps = new HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
if (StreamsConfigUtils.eosEnabled(this)) {
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
}
Expand All @@ -1692,74 +1705,46 @@ private Map<String, Object> getCommonConsumerConfigs() {
return consumerProps;
}

private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Object> clientProvidedProps,
final String[] nonConfigurableConfigs) {
// Streams does not allow users to configure certain consumer/producer configurations, for example,
// enable.auto.commit. In cases where user tries to override such non-configurable
// consumer/producer configurations, log a warning and remove the user defined value from the Map.
// Thus, the default values for these consumer/producer configurations that are suitable for
// Streams will be used instead.

final String nonConfigurableConfigMessage = "Unexpected user-specified {} config '{}' found. {} setting ({}) will be ignored and the Streams default setting ({}) will be used.";
final String eosMessage = "'" + PROCESSING_GUARANTEE_CONFIG + "' is set to \"" + getString(PROCESSING_GUARANTEE_CONFIG) + "\". Hence, user";

for (final String config: nonConfigurableConfigs) {
if (clientProvidedProps.containsKey(config)) {

if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config))) {
log.error(
nonConfigurableConfigMessage,
"consumer",
config,
"User",
clientProvidedProps.get(config),
CONSUMER_DEFAULT_OVERRIDES.get(config)
);
clientProvidedProps.remove(config);
}
} else if (eosEnabled) {
if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
log.warn(
nonConfigurableConfigMessage,
"consumer",
config,
eosMessage,
clientProvidedProps.get(config),
CONSUMER_EOS_OVERRIDES.get(config)
);
clientProvidedProps.remove(config);
}
} else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
log.warn(
nonConfigurableConfigMessage,
"producer",
config,
eosMessage,
clientProvidedProps.get(config),
PRODUCER_EOS_OVERRIDES.get(config)
);
clientProvidedProps.remove(config);
}
} else if (ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
log.warn(
nonConfigurableConfigMessage,
"producer",
config,
eosMessage,
clientProvidedProps.get(config),
"<appId>-<generatedSuffix>"
);
clientProvidedProps.remove(config);
}
}
private void overwritePropertyMap(final Map<String, Object> props, final String configName, final Object unmodifiableValue, final Object unmodifiableLogValue, final String clientType) {
final String overwritePropertyLogMessage = "Unexpected {} config `{}` found. User setting ({}) will be ignored and the Kafka Streams setting ({}) will be used";

if (props.containsKey(configName) && (!Objects.equals(props.get(configName), unmodifiableValue))) {
log.warn(overwritePropertyLogMessage, clientType, configName, props.get(configName), unmodifiableLogValue);
}

props.put(configName, unmodifiableValue);
}

private void overwritePropertyMap(final Map<String, Object> props, final String configName, final Object unmodifiableValue, final String clientType) {
overwritePropertyMap(props, configName, unmodifiableValue, unmodifiableValue, clientType);
}

private void validateConsumerPropertyMap(final Map<String, Object> props) {
if (eosEnabled) {
for (final Map.Entry<String, Object> entry : KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) {
overwritePropertyMap(props, entry.getKey(), entry.getValue(), "consumer");
}
} else {
for (final Map.Entry<String, Object> entry : KS_CONTROLLED_CONSUMER_CONFIGS.entrySet()) {
overwritePropertyMap(props, entry.getKey(), entry.getValue(), "consumer");
}
}
}

private void validateProducerPropertyMap(final Map<String, Object> props) {
if (eosEnabled) {
verifyMaxInFlightRequestPerConnection(clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
for (final Map.Entry<String, Object> entry : KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED.entrySet()) {
if (ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(entry.getKey())) {
// Special handling for transactional.id logging
final String transactionalIdExpression = getString(PROCESSING_GUARANTEE_CONFIG).equals(EXACTLY_ONCE_V2) ?
"<application.id>-<processId>-<threadIdx>" : "<application.id>-<task_id>";
overwritePropertyMap(props, entry.getKey(), entry.getValue(), transactionalIdExpression, "producer");
} else {
overwritePropertyMap(props, entry.getKey(), entry.getValue(), "producer");
}
}
// Verify max.in.flight.requests.per.connection
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
}
}

Expand Down Expand Up @@ -1807,6 +1792,8 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
final Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
consumerProps.putAll(mainConsumerProps);

validateConsumerPropertyMap(consumerProps);

// this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting
consumerProps.put(APPLICATION_ID_CONFIG, groupId);

Expand Down Expand Up @@ -1835,9 +1822,6 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG));

// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
Expand Down Expand Up @@ -1882,6 +1866,8 @@ public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
final Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
baseConsumerProps.putAll(restoreConsumerProps);

validateConsumerPropertyMap(baseConsumerProps);

// no need to set group id for a restore consumer
baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
// no need to set instance id for a restore consumer
Expand Down Expand Up @@ -1915,6 +1901,8 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
final Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
baseConsumerProps.putAll(globalConsumerProps);

validateConsumerPropertyMap(baseConsumerProps);

// no need to set group id for a global consumer
baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
// no need to set instance id for a restore consumer
Expand All @@ -1939,13 +1927,13 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
public Map<String, Object> getProducerConfigs(final String clientId) {
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());

checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);

// generate producer configs from original properties and overridden maps
final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
final Map<String, Object> props = new HashMap<>(eosEnabled ? KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED : KS_DEFAULT_PRODUCER_CONFIGS);
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);

validateProducerPropertyMap(props);

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,66 @@ public void testGetRestoreConsumerConfigs() {
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
}

@Test
public void shouldResetToDefaultIfMainConsumerAllowAutoCreateTopicsIsOverridden() {
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.WARN);

final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);

// Verify the controlled value is enforced
assertEquals("false", consumerConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG));

// Verify warning is logged when user tries to override controlled config
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("Unexpected consumer config `allow.auto.create.topics` found") &&
msg.contains("User setting (true) will be ignored")));
}
}

@Test
public void shouldResetToDefaultIfRestoreConsumerAllowAutoCreateTopicsIsOverridden() {
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.WARN);

final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);

// Verify the controlled value is enforced
assertEquals("false", consumerConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG));

// Verify warning is logged when user tries to override controlled config
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("Unexpected consumer config `allow.auto.create.topics` found") &&
msg.contains("User setting (true) will be ignored")));
}
}

@Test
public void shouldResetToDefaultIfGlobalConsumerAllowAutoCreateTopicsIsOverridden() {
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.WARN);

final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);

// Verify the controlled value is enforced
assertEquals("false", consumerConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG));

// Verify warning is logged when user tries to override controlled config
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("Unexpected consumer config `allow.auto.create.topics` found") &&
msg.contains("User setting (true) will be ignored")));
}
}

@SuppressWarnings("resource")
@Test
public void defaultSerdeShouldBeConfigured() {
Expand Down