Skip to content

Commit e5fdd82

Browse files
authored
[ISSUE-57] - Set unique consumerGroupId for each user session (#58)
* [ISSUE-57] - Set unique consumerGroupId for each user session * Update changelog and default configs * Fix pom * add missing test config
1 parent 4b97961 commit e5fdd82

File tree

10 files changed

+51
-11
lines changed

10 files changed

+51
-11
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
The format is based on [Keep a Changelog](http://keepachangelog.com/)
33
and this project adheres to [Semantic Versioning](http://semver.org/).
44

5-
## 1.0.3 (01/26/2018)
5+
## 1.0.3 (TBD)
66
- Fix 500 error unable to find templates when running under windows.
77
- Add start.bat script for running under windows.
8+
- [Issue#57](https://github.com/SourceLabOrg/kafka-webview/issues/57) Configure consumerId and consumerGroup using a configurable prefix.
89

910
## 1.0.2 (01/26/2018)
1011
- Increase file upload limit from 15MB to 64MB.

kafka-webview-ui/src/assembly/distribution/config.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,9 @@ security:
66

77
## Various App Configs
88
app:
9-
key: "SuperSecretKey"
9+
## Should be unique to your installation.
10+
## This key will be used for symmetric encryption of JKS/TrustStore secrets if you configure any SSL enabled Kafka clusters.
11+
key: "SuperSecretKey"
12+
13+
## Defines a prefix prepended to the Id of all consumers.
14+
consumerIdPrefix: "KafkaWebViewConsumer"

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/AppProperties.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public class AppProperties {
4545
@Value("${app.maxConcurrentWebSocketConsumers}")
4646
private Integer maxConcurrentWebSocketConsumers = 100;
4747

48+
@Value("${app.consumerIdPrefix}")
49+
private String consumerIdPrefix;
50+
4851
public String getName() {
4952
return name;
5053
}
@@ -61,13 +64,18 @@ public Integer getMaxConcurrentWebSocketConsumers() {
6164
return maxConcurrentWebSocketConsumers;
6265
}
6366

67+
public String getConsumerIdPrefix() {
68+
return consumerIdPrefix;
69+
}
70+
6471
@Override
6572
public String toString() {
6673
return "AppProperties{"
6774
+ "name='" + name + '\''
6875
+ ", uploadPath='" + uploadPath + '\''
6976
+ ", appKey='" + appKey + '\''
7077
+ ", maxConcurrentWebSocketConsumers=" + maxConcurrentWebSocketConsumers
78+
+ ", consumerIdPrefix='" + consumerIdPrefix + '\''
7179
+ '}';
7280
}
7381
}

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/PluginConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ private KafkaAdminFactory getKafkaAdminFactory(final AppProperties appProperties
123123
* For creating instances of KafkaConsumers.
124124
*/
125125
private KafkaConsumerFactory getKafkaConsumerFactory(final AppProperties appProperties) {
126-
return new KafkaConsumerFactory(appProperties.getUploadPath() + "/keyStores");
126+
return new KafkaConsumerFactory(
127+
appProperties.getUploadPath() + "/keyStores",
128+
appProperties.getConsumerIdPrefix()
129+
);
127130
}
128131
}

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaConsumerFactory.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,28 @@
4646
*/
4747
public class KafkaConsumerFactory {
4848

49+
/**
50+
* Path on filesystem where keystores are persisted.
51+
*/
4952
private final String keyStoreRootPath;
5053

54+
/**
55+
* Static prefix to pre-pend to all consumerIds.
56+
*/
57+
private final String consumerIdPrefix;
58+
5159
/**
5260
* Constructor.
5361
* @param keyStoreRootPath Parent path to where JKS key/trust stores are saved on disk.
62+
* @param consumerIdPrefix Static prefix to pre-pend to all consumerIds.
5463
*/
55-
public KafkaConsumerFactory(final String keyStoreRootPath) {
64+
public KafkaConsumerFactory(final String keyStoreRootPath, final String consumerIdPrefix) {
65+
if (consumerIdPrefix == null) {
66+
throw new IllegalArgumentException("ConsumerIdPrefix must be configured!");
67+
}
68+
5669
this.keyStoreRootPath = keyStoreRootPath;
70+
this.consumerIdPrefix = consumerIdPrefix;
5771
}
5872

5973
/**
@@ -94,11 +108,18 @@ public KafkaConsumer createConsumerAndSubscribe(final ClientConfig clientConfig)
94108
* Build an appropriate configuration based on the passed in ClientConfig.
95109
*/
96110
private Map<String, Object> buildConsumerConfig(final ClientConfig clientConfig) {
111+
// Generate consumerId with our configured static prefix.
112+
final String prefixedConsumerId = consumerIdPrefix.concat("-").concat(clientConfig.getConsumerId());
113+
97114
// Build config
98115
final Map<String, Object> configMap = new HashMap<>();
99-
configMap.put(ConsumerConfig.CLIENT_ID_CONFIG, clientConfig.getConsumerId());
100116
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clientConfig.getTopicConfig().getClusterConfig().getConnectString());
101117

118+
// ClientId and ConsumerGroupId are intended to be unique for each user session.
119+
// See Issue-57 https://github.com/SourceLabOrg/kafka-webview/issues/57#issuecomment-363508531
120+
configMap.put(ConsumerConfig.CLIENT_ID_CONFIG, prefixedConsumerId);
121+
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, prefixedConsumerId);
122+
102123
// Set deserializer classes.
103124
configMap.put(
104125
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

kafka-webview-ui/src/main/resources/config/base.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ app:
4646
uploadPath: "./data/uploads"
4747
key: "SuperSecretKey"
4848
maxConcurrentWebSocketConsumers: 64
49+
consumerIdPrefix: "KafkaWebViewConsumer"

kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaConsumerFactoryTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void testBasicConsumerMultiplePartitions() {
7979
.produceRecords(maxRecordsPerPoll, topicName, 1);
8080

8181
// Create factory
82-
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used");
82+
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used", "TestPrefix");
8383

8484
// Create cluster Config
8585
final ClusterConfig clusterConfig = ClusterConfig.newBuilder()
@@ -140,7 +140,7 @@ public void testBasicConsumerExcludePartitions() {
140140
.produceRecords(maxRecordsPerPoll, topicName, 1);
141141

142142
// Create factory
143-
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used");
143+
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used", "TestPrefix");
144144

145145
// Create cluster Config
146146
final ClusterConfig clusterConfig = ClusterConfig.newBuilder()
@@ -205,7 +205,7 @@ public void testBasicConsumerWithRecordFilter() throws InterruptedException {
205205
.produceRecords(maxRecordsPerPoll, topicName, 1);
206206

207207
// Create factory
208-
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used");
208+
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used", "TestPrefix");
209209

210210
// Create cluster Config
211211
final ClusterConfig clusterConfig = ClusterConfig.newBuilder()
@@ -267,7 +267,7 @@ public void testDeserializerOptions() throws InterruptedException {
267267
.createTopic(topicName, 1);
268268

269269
// Create factory
270-
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used");
270+
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used", "TestPrefix");
271271

272272
// Create cluster Config
273273
final ClusterConfig clusterConfig = ClusterConfig.newBuilder()

kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ private WebKafkaConsumerFactory createDefaultFactory() {
251251
final PluginFactory<Deserializer> deserializerPluginFactory = new PluginFactory<>("not/used", Deserializer.class);
252252
final PluginFactory<RecordFilter> filterPluginFactoryPluginFactory = new PluginFactory<>("not/used", RecordFilter.class);
253253
final SecretManager secretManager = new SecretManager("Passphrase");
254-
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used");
254+
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used", "MyPrefix");
255255

256256
return new WebKafkaConsumerFactory(
257257
deserializerPluginFactory,

kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class WebKafkaConsumerTest {
4747

4848
private final static Logger logger = LoggerFactory.getLogger(WebKafkaConsumerTest.class);
4949

50-
private final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("./uploads");
50+
private final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("./uploads", "TestPrefix");
5151

5252
//@Test
5353
public void doTest() {

kafka-webview-ui/src/test/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ app:
4646
uploadPath: "./data/uploads"
4747
key: "SuperSecretKey"
4848
maxConcurrentWebSocketConsumers: 64
49+
consumerIdPrefix: "KafkaWebViewConsumer"

0 commit comments

Comments
 (0)