Skip to content

Commit

Permalink
[allow_configuring_avro_consumer] Read avro consumer configs from env…
Browse files Browse the repository at this point in the history
… var
  • Loading branch information
Yevgeniy Magdel committed Mar 21, 2022
1 parent 6ab52cf commit ad8aa0f
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions src/main/java/kafdrop/util/AvroMessageDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.nio.*;
import java.util.*;
import java.util.stream.Collectors;


public final class AvroMessageDeserializer implements MessageDeserializer {
Expand All @@ -12,7 +13,7 @@ public final class AvroMessageDeserializer implements MessageDeserializer {

public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
this.topicName = topicName;
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth);
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth, topicName);
}

@Override
Expand All @@ -22,15 +23,33 @@ public String deserializeMessage(ByteBuffer buffer) {
return deserializer.deserialize(topicName, bytes).toString();
}

private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) {
private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth, String topicName) {
final var config = new HashMap<String, Object>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (schemaRegistryAuth != null) {
config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
}
setConfigFromEnvIfAvailable(topicName, AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, config);
final var kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(config, false);
return kafkaAvroDeserializer;
}

private static void setConfigFromEnvIfAvailable(String topicName, String configPath, Map<String,Object> config){

String configPrefix = "SCHEMA_REGISTRY";
String topicScopedEnvPath = Arrays.stream(new String[]{configPrefix, configPath.replace(".", "_"), topicName.replace("-", "_") } )
.map(String::toUpperCase).collect(Collectors.joining("_"));

String noTopicScopedEnvPath = Arrays.stream(new String[]{ "SCHEMA_REGISTRY", configPath.replace(".", "_") })
.map(String::toUpperCase).collect(Collectors.joining("_"));

for(String envPath : new String[]{topicScopedEnvPath, noTopicScopedEnvPath}) {
String namingStrategyValue = System.getenv(envPath);
if (namingStrategyValue != null) {
config.put(envPath, namingStrategyValue);
}
}
}
}

0 comments on commit ad8aa0f

Please sign in to comment.