Skip to content

WIP - Add de-serialization autoconfiguration #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.sh text eol=lf
2 changes: 1 addition & 1 deletion kafka-webview-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-webview-plugin</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>

<!-- Module Description and Ownership -->
<name>Kafka WebView Plugin</name>
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* MIT License
*
* Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.sourcelab.kafka.webview.ui.plugin.deserializer;

import java.util.List;

/**
* Discovery service for deserializer auto configuration.
*/
public interface DeserializerDiscoveryService {

public List<DeserializerInformation> getDeserializersInformation();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* MIT License
*
* Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.sourcelab.kafka.webview.ui.plugin.deserializer;

import java.util.HashMap;
import java.util.Map;

/**
* POJO deserializer information.
*/
public class DeserializerInformation {

private final String name;
private final String classpath;
private final Map<String, String> defaultConfig;

public DeserializerInformation(String name, Class<?> clazz) {
this(name, clazz.getName());
}

public DeserializerInformation(String name, String classpath) {
this(name, classpath, new HashMap<>());
}

public DeserializerInformation(String name, Class<?> clazz, Map<String, String> defaultConfig) {
this(name, clazz.getName(), defaultConfig);
}

/**
* Constructor.
*
* @param name Deserializer name.
* @param classpath Deserializer class full name, must implement org.apache.kafka.common.serialization.Deserializer.
* @param defaultConfig Default configuration of the deserializer.
*/
public DeserializerInformation(String name, String classpath, Map<String, String> defaultConfig) {
this.name = name;
this.classpath = classpath;
this.defaultConfig = defaultConfig;
}

public String getName() {
return name;
}

public String getClasspath() {
return classpath;
}

public Map<String, String> getDefaultConfig() {
return defaultConfig;
}

@Override
public String toString() {
return "DeserializerInformation{"
+ "name=" + name
+ ", classpath=" + classpath
+ ", defaultConfig=" + defaultConfig
+ '}';
}
}
2 changes: 1 addition & 1 deletion kafka-webview-ui/pom.xml
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@
<dependency>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-webview-plugin</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
</dependency>

<!-- Kafka -->
11 changes: 8 additions & 3 deletions kafka-webview-ui/src/assembly/distribution/start.bat
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Define configuration
@echo off

cd /D %~dp0

:: Define configuration
set SPRING_CONFIG_LOCATION=classpath:/config/base.yml,config.yml

## launch webapp
java -jar kafka-webview-ui-*.jar
:: launch webapp
FOR /F "tokens=* USEBACKQ" %%F IN (`dir /b kafka-webview-ui-*.jar`) DO SET "jar=%%F"
java -jar %jar%
5 changes: 1 addition & 4 deletions kafka-webview-ui/src/assembly/distribution/start.sh
Original file line number Diff line number Diff line change
@@ -17,7 +17,4 @@ fi
export SPRING_CONFIG_LOCATION=classpath:/config/base.yml,config.yml

## launch webapp
java -jar kafka-webview-ui-*.jar $HEAP_OPTS $LOG_OPTS

## Change back to previous directory
cd $CWD
exec java -jar kafka-webview-ui-*.jar $HEAP_OPTS $LOG_OPTS
Original file line number Diff line number Diff line change
@@ -39,6 +39,9 @@ public class AppProperties {

@Value("${app.uploadPath}")
private String uploadPath;

@Value("${app.deserializerPath}")
private String deserializerPath;

@Value("${app.key}")
private String appKey;
@@ -76,6 +79,10 @@ public String getUploadPath() {
return uploadPath;
}

public String getDeserializerPath() {
return deserializerPath;
}

public String getAppKey() {
return appKey;
}
Original file line number Diff line number Diff line change
@@ -24,6 +24,15 @@

package org.sourcelab.kafka.webview.ui.configuration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.File;

import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;

import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
@@ -43,24 +52,43 @@
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.sourcelab.kafka.webview.ui.plugin.deserializer.DeserializerDiscoveryService;
import org.sourcelab.kafka.webview.ui.plugin.deserializer.DeserializerInformation;
import org.sourcelab.kafka.webview.ui.model.MessageFormatType;

/**
* Called on startup to ensure we have sane default data loaded.
*/
@Component
public final class DataLoaderConfig implements ApplicationRunner {

private static final Logger logger = LoggerFactory.getLogger(DataLoaderConfig.class);

private final AppProperties appProperties;
private final MessageFormatRepository messageFormatRepository;
private final UserRepository userRepository;
private final ObjectMapper mapper;

/**
* Constructor.
*/
@Autowired
private DataLoaderConfig(final MessageFormatRepository messageFormatRepository, final UserRepository userRepository) {
private DataLoaderConfig(
final AppProperties appProperties,
final MessageFormatRepository messageFormatRepository,
final UserRepository userRepository,
final ObjectMapper mapper) {
this.appProperties = appProperties;
this.messageFormatRepository = messageFormatRepository;
this.userRepository = userRepository;
this.mapper = mapper;
}

/**
@@ -69,6 +97,7 @@ private DataLoaderConfig(final MessageFormatRepository messageFormatRepository,
private void createData() {
createDefaultUser();
createDefaultMessageFormats();
discoverDeserializers();
}

/**
@@ -112,11 +141,66 @@ private void createDefaultMessageFormats() {
messageFormat.setName(entry.getKey());
messageFormat.setClasspath(entry.getValue());
messageFormat.setJar("n/a");
messageFormat.setDefaultFormat(true);
messageFormat.setMessageFormatType(MessageFormatType.DEFAULT);
messageFormatRepository.save(messageFormat);
}
}

private void discoverDeserializers() {
final File deserializerPath = new File(appProperties.getDeserializerPath());

// Check preconditions
if (!deserializerPath.exists()) {
logger.warn("Directory {} doesn't exists.", deserializerPath);
return;
}
if (!deserializerPath.isDirectory()) {
logger.error("{} is not a directory.", deserializerPath);
return;
}

File[] jars = deserializerPath.listFiles((File dir, String name) -> name.endsWith(".jar"));
logger.info("Analyse {} for deserializer", (Object) jars);
for (File jar : jars) {
try {
URL jarUrl = jar.toURI().toURL();
ClassLoader cl = new URLClassLoader(new URL[]{jarUrl}, getClass().getClassLoader());
ServiceLoader<DeserializerDiscoveryService> services = ServiceLoader.load(DeserializerDiscoveryService.class, cl);
loadDeserializers(jar, services);
} catch (MalformedURLException ex) {
logger.error("Failed to load {}, error: {}", jar, ex.getMessage());
}
}
}

private void loadDeserializers(File jar, ServiceLoader<DeserializerDiscoveryService> services) {
for (DeserializerDiscoveryService service : services) {
List<DeserializerInformation> deserializersInformation = service.getDeserializersInformation();
for (DeserializerInformation deserializerInformation : deserializersInformation) {
try {
MessageFormat messageFormat = messageFormatRepository.findByName(deserializerInformation.getName());
if (messageFormat == null) {
messageFormat = new MessageFormat();
} else if (MessageFormatType.AUTOCONF != messageFormat.getMessageFormatType()) {
logger.error("Try to register the formatter {} but this name is already register as a {}.",
messageFormat.getName(), messageFormat.getMessageFormatType());
continue;
}
messageFormat.setName(deserializerInformation.getName());
messageFormat.setClasspath(deserializerInformation.getClasspath());
messageFormat.setJar(jar.getName());
messageFormat.setMessageFormatType(MessageFormatType.AUTOCONF);
messageFormat.setOptionParameters(
mapper.writeValueAsString(deserializerInformation.getDefaultConfig()));
messageFormatRepository.save(messageFormat);
} catch (JsonProcessingException ex) {
logger.error("Failed to load {}, because the default config are invalid ({})",
deserializerInformation.getName(), ex.getMessage());
}
}
}
}

/**
* Run on startup.
*/
Original file line number Diff line number Diff line change
@@ -68,6 +68,11 @@ public PluginFactory<Deserializer> getDeserializerPluginFactory(final AppPropert
final String jarDirectory = appProperties.getUploadPath() + "/deserializers";
return new PluginFactory<>(jarDirectory, Deserializer.class);
}

private PluginFactory<Deserializer> getAutoconfDeserializerPluginFactory(final AppProperties appProperties) {
final String jarDirectory = appProperties.getDeserializerPath();
return new PluginFactory<>(jarDirectory, Deserializer.class);
}

/**
* PluginFactory for creating instances of Record Filters.
@@ -99,6 +104,7 @@ public SecretManager getSecretManager(final AppProperties appProperties) {
public WebKafkaConsumerFactory getWebKafkaConsumerFactory(final AppProperties appProperties, final KafkaClientConfigUtil configUtil) {
return new WebKafkaConsumerFactory(
getDeserializerPluginFactory(appProperties),
getAutoconfDeserializerPluginFactory(appProperties),
getRecordFilterPluginFactory(appProperties),
getSecretManager(appProperties),
getKafkaConsumerFactory(configUtil)
Loading