Skip to content

Commit 76b6414

Browse files
quentingodeauQuentin Godeau
authored and
Quentin Godeau
committed
Add de-serialization autoconfiguration
1 parent af35921 commit 76b6414

File tree

28 files changed

+509
-44
lines changed

28 files changed

+509
-44
lines changed

.gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.sh text eol=lf

kafka-webview-plugin/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<artifactId>kafka-webview-plugin</artifactId>
12-
<version>1.0.0</version>
12+
<version>1.1.0</version>
1313

1414
<!-- Module Description and Ownership -->
1515
<name>Kafka WebView Plugin</name>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package org.sourcelab.kafka.webview.ui.plugin.deserializer;
26+
27+
import java.util.List;
28+
29+
/**
30+
* Discovery service for deserializer auto configuration.
31+
*/
32+
public interface DeserializerDiscoveryService {
33+
34+
public List<DeserializerInformation> getDeserializersInformation();
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package org.sourcelab.kafka.webview.ui.plugin.deserializer;
26+
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
30+
/**
31+
* POJO deserializer information.
32+
*/
33+
public class DeserializerInformation {
34+
35+
private final String name;
36+
private final String classpath;
37+
private final Map<String, String> defaultConfig;
38+
39+
public DeserializerInformation(String name, Class<?> clazz) {
40+
this(name, clazz.getName());
41+
}
42+
43+
public DeserializerInformation(String name, String classpath) {
44+
this(name, classpath, new HashMap<>());
45+
}
46+
47+
public DeserializerInformation(String name, Class<?> clazz, Map<String, String> defaultConfig) {
48+
this(name, clazz.getName(), defaultConfig);
49+
}
50+
51+
/**
52+
* Constructor.
53+
*
54+
* @param name Deserializer name.
55+
* @param classpath Deserializer class full name, must implement org.apache.kafka.common.serialization.Deserializer.
56+
* @param defaultConfig Default configuration of the deserializer.
57+
*/
58+
public DeserializerInformation(String name, String classpath, Map<String, String> defaultConfig) {
59+
this.name = name;
60+
this.classpath = classpath;
61+
this.defaultConfig = defaultConfig;
62+
}
63+
64+
public String getName() {
65+
return name;
66+
}
67+
68+
public String getClasspath() {
69+
return classpath;
70+
}
71+
72+
public Map<String, String> getDefaultConfig() {
73+
return defaultConfig;
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "DeserializerInformation{"
79+
+ "name=" + name
80+
+ ", classpath=" + classpath
81+
+ ", defaultConfig=" + defaultConfig
82+
+ '}';
83+
}
84+
}

kafka-webview-ui/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
<dependency>
4444
<groupId>org.sourcelab</groupId>
4545
<artifactId>kafka-webview-plugin</artifactId>
46-
<version>1.0.0</version>
46+
<version>1.1.0</version>
4747
</dependency>
4848

4949
<!-- Kafka -->
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
## Define configuration
1+
@echo off
2+
3+
cd /D %~dp0
4+
5+
:: Define configuration
26
set SPRING_CONFIG_LOCATION=classpath:/config/base.yml,config.yml
37

4-
## launch webapp
5-
java -jar kafka-webview-ui-*.jar
8+
:: launch webapp
9+
FOR /F "tokens=* USEBACKQ" %%F IN (`dir /b kafka-webview-ui-*.jar`) DO SET "jar=%%F"
10+
java -jar %jar%

kafka-webview-ui/src/assembly/distribution/start.sh

+1-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,4 @@ fi
1717
export SPRING_CONFIG_LOCATION=classpath:/config/base.yml,config.yml
1818

1919
## launch webapp
20-
java -jar kafka-webview-ui-*.jar $HEAP_OPTS $LOG_OPTS
21-
22-
## Change back to previous directory
23-
cd $CWD
20+
exec java -jar kafka-webview-ui-*.jar $HEAP_OPTS $LOG_OPTS

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/AppProperties.java

+7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ public class AppProperties {
3939

4040
@Value("${app.uploadPath}")
4141
private String uploadPath;
42+
43+
@Value("${app.deserializerPath}")
44+
private String deserializerPath;
4245

4346
@Value("${app.key}")
4447
private String appKey;
@@ -76,6 +79,10 @@ public String getUploadPath() {
7679
return uploadPath;
7780
}
7881

82+
public String getDeserializerPath() {
83+
return deserializerPath;
84+
}
85+
7986
public String getAppKey() {
8087
return appKey;
8188
}

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/DataLoaderConfig.java

+86-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@
2424

2525
package org.sourcelab.kafka.webview.ui.configuration;
2626

27+
import com.fasterxml.jackson.core.JsonProcessingException;
28+
import com.fasterxml.jackson.databind.ObjectMapper;
29+
30+
import java.io.File;
31+
32+
import java.net.MalformedURLException;
33+
import java.net.URL;
34+
import java.net.URLClassLoader;
35+
2736
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
2837
import org.apache.kafka.common.serialization.BytesDeserializer;
2938
import org.apache.kafka.common.serialization.DoubleDeserializer;
@@ -43,24 +52,43 @@
4352
import org.springframework.stereotype.Component;
4453

4554
import java.util.HashMap;
55+
import java.util.List;
4656
import java.util.Map;
57+
import java.util.ServiceLoader;
58+
59+
import org.slf4j.Logger;
60+
import org.slf4j.LoggerFactory;
61+
62+
import org.sourcelab.kafka.webview.ui.plugin.deserializer.DeserializerDiscoveryService;
63+
import org.sourcelab.kafka.webview.ui.plugin.deserializer.DeserializerInformation;
64+
import org.sourcelab.kafka.webview.ui.model.MessageFormatType;
4765

4866
/**
4967
* Called on startup to ensure we have sane default data loaded.
5068
*/
5169
@Component
5270
public final class DataLoaderConfig implements ApplicationRunner {
5371

72+
private static final Logger logger = LoggerFactory.getLogger(DataLoaderConfig.class);
73+
74+
private final AppProperties appProperties;
5475
private final MessageFormatRepository messageFormatRepository;
5576
private final UserRepository userRepository;
77+
private final ObjectMapper mapper;
5678

5779
/**
5880
* Constructor.
5981
*/
6082
@Autowired
61-
private DataLoaderConfig(final MessageFormatRepository messageFormatRepository, final UserRepository userRepository) {
83+
private DataLoaderConfig(
84+
final AppProperties appProperties,
85+
final MessageFormatRepository messageFormatRepository,
86+
final UserRepository userRepository,
87+
final ObjectMapper mapper) {
88+
this.appProperties = appProperties;
6289
this.messageFormatRepository = messageFormatRepository;
6390
this.userRepository = userRepository;
91+
this.mapper = mapper;
6492
}
6593

6694
/**
@@ -69,6 +97,7 @@ private DataLoaderConfig(final MessageFormatRepository messageFormatRepository,
6997
private void createData() {
7098
createDefaultUser();
7199
createDefaultMessageFormats();
100+
discoverDeserializers();
72101
}
73102

74103
/**
@@ -112,11 +141,66 @@ private void createDefaultMessageFormats() {
112141
messageFormat.setName(entry.getKey());
113142
messageFormat.setClasspath(entry.getValue());
114143
messageFormat.setJar("n/a");
115-
messageFormat.setDefaultFormat(true);
144+
messageFormat.setMessageFormatType(MessageFormatType.DEFAULT);
116145
messageFormatRepository.save(messageFormat);
117146
}
118147
}
119148

149+
private void discoverDeserializers() {
150+
final File deserializerPath = new File(appProperties.getDeserializerPath());
151+
152+
// Check preconditions
153+
if (!deserializerPath.exists()) {
154+
logger.warn("Directory {} doesn't exists.", deserializerPath);
155+
return;
156+
}
157+
if (!deserializerPath.isDirectory()) {
158+
logger.error("{} is not a directory.", deserializerPath);
159+
return;
160+
}
161+
162+
File[] jars = deserializerPath.listFiles((File dir, String name) -> name.endsWith(".jar"));
163+
logger.info("Analyse {} for deserializer", (Object) jars);
164+
for (File jar : jars) {
165+
try {
166+
URL jarUrl = jar.toURI().toURL();
167+
ClassLoader cl = new URLClassLoader(new URL[]{jarUrl}, getClass().getClassLoader());
168+
ServiceLoader<DeserializerDiscoveryService> services = ServiceLoader.load(DeserializerDiscoveryService.class, cl);
169+
loadDeserializers(jar, services);
170+
} catch (MalformedURLException ex) {
171+
logger.error("Failed to load {}, error: {}", jar, ex.getMessage());
172+
}
173+
}
174+
}
175+
176+
private void loadDeserializers(File jar, ServiceLoader<DeserializerDiscoveryService> services) {
177+
for (DeserializerDiscoveryService service : services) {
178+
List<DeserializerInformation> deserializersInformation = service.getDeserializersInformation();
179+
for (DeserializerInformation deserializerInformation : deserializersInformation) {
180+
try {
181+
MessageFormat messageFormat = messageFormatRepository.findByName(deserializerInformation.getName());
182+
if (messageFormat == null) {
183+
messageFormat = new MessageFormat();
184+
} else if (MessageFormatType.AUTOCONF != messageFormat.getMessageFormatType()) {
185+
logger.error("Try to register the formatter {} but this name is already register as a {}.",
186+
messageFormat.getName(), messageFormat.getMessageFormatType());
187+
continue;
188+
}
189+
messageFormat.setName(deserializerInformation.getName());
190+
messageFormat.setClasspath(deserializerInformation.getClasspath());
191+
messageFormat.setJar(jar.getName());
192+
messageFormat.setMessageFormatType(MessageFormatType.AUTOCONF);
193+
messageFormat.setOptionParameters(
194+
mapper.writeValueAsString(deserializerInformation.getDefaultConfig()));
195+
messageFormatRepository.save(messageFormat);
196+
} catch (JsonProcessingException ex) {
197+
logger.error("Failed to load {}, because the default config are invalid ({})",
198+
deserializerInformation.getName(), ex.getMessage());
199+
}
200+
}
201+
}
202+
}
203+
120204
/**
121205
* Run on startup.
122206
*/

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/PluginConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ public PluginFactory<Deserializer> getDeserializerPluginFactory(final AppPropert
6868
final String jarDirectory = appProperties.getUploadPath() + "/deserializers";
6969
return new PluginFactory<>(jarDirectory, Deserializer.class);
7070
}
71+
72+
private PluginFactory<Deserializer> getAutoconfDeserializerPluginFactory(final AppProperties appProperties) {
73+
final String jarDirectory = appProperties.getDeserializerPath();
74+
return new PluginFactory<>(jarDirectory, Deserializer.class);
75+
}
7176

7277
/**
7378
* PluginFactory for creating instances of Record Filters.
@@ -99,6 +104,7 @@ public SecretManager getSecretManager(final AppProperties appProperties) {
99104
public WebKafkaConsumerFactory getWebKafkaConsumerFactory(final AppProperties appProperties, final KafkaClientConfigUtil configUtil) {
100105
return new WebKafkaConsumerFactory(
101106
getDeserializerPluginFactory(appProperties),
107+
getAutoconfDeserializerPluginFactory(appProperties),
102108
getRecordFilterPluginFactory(appProperties),
103109
getSecretManager(appProperties),
104110
getKafkaConsumerFactory(configUtil)

0 commit comments

Comments
 (0)