diff --git a/src/main/java/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/kafdrop/util/AvroMessageDeserializer.java index b01ea072..c42ee671 100644 --- a/src/main/java/kafdrop/util/AvroMessageDeserializer.java +++ b/src/main/java/kafdrop/util/AvroMessageDeserializer.java @@ -4,6 +4,7 @@ import java.nio.*; import java.util.*; +import java.util.stream.Collectors; public final class AvroMessageDeserializer implements MessageDeserializer { @@ -12,7 +13,7 @@ public final class AvroMessageDeserializer implements MessageDeserializer { public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) { this.topicName = topicName; - this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth); + this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth, topicName); } @Override @@ -22,15 +23,33 @@ public String deserializeMessage(ByteBuffer buffer) { return deserializer.deserialize(topicName, bytes).toString(); } - private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { + private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth, String topicName) { final var config = new HashMap(); config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); if (schemaRegistryAuth != null) { config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); } + setConfigFromEnvIfAvailable(topicName, AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, config); final var kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(config, false); return kafkaAvroDeserializer; } + + private static void setConfigFromEnvIfAvailable(String topicName, String configPath, Map config){ + + String configPrefix = "SCHEMA_REGISTRY"; + String topicScopedEnvPath = Arrays.stream(new String[]{configPrefix, configPath.replace(".", "_"), topicName.replace("-", "_") } ) + .map(String::toUpperCase).collect(Collectors.joining("_")); + + String noTopicScopedEnvPath = Arrays.stream(new String[]{ "SCHEMA_REGISTRY", configPath.replace(".", "_") }) + .map(String::toUpperCase).collect(Collectors.joining("_")); + + for(String envPath : new String[]{topicScopedEnvPath, noTopicScopedEnvPath}) { + String namingStrategyValue = System.getenv(envPath); + if (namingStrategyValue != null) { + config.put(envPath, namingStrategyValue); + } + } + } }