Skip to content

Commit ad4e4cf

Browse files
garyrussellartembilan
authored andcommitted
GH-522: Fix NPE in listener container (#524)
Fixes #522 * Polishing - PR Comment **Cherry-pick to 2.0.x & 1.3.x**
1 parent d978246 commit ad4e4cf

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
9696

9797
private final TopicPartitionInitialOffset[] topicPartitions;
9898

99-
private ListenerConsumer listenerConsumer;
99+
private volatile ListenerConsumer listenerConsumer;
100100

101-
private ListenableFuture<?> listenerConsumerFuture;
101+
private volatile ListenableFuture<?> listenerConsumerFuture;
102102

103103
private GenericMessageListener<?> listener;
104104

@@ -153,11 +153,17 @@ public void setClientIdSuffix(String clientIdSuffix) {
153153
* either explicitly or by Kafka; may be null if not assigned yet.
154154
*/
155155
public Collection<TopicPartition> getAssignedPartitions() {
156-
if (this.listenerConsumer.definedPartitions != null) {
157-
return Collections.unmodifiableCollection(this.listenerConsumer.definedPartitions.keySet());
158-
}
159-
else if (this.listenerConsumer.assignedPartitions != null) {
160-
return Collections.unmodifiableCollection(this.listenerConsumer.assignedPartitions);
156+
ListenerConsumer listenerConsumer = this.listenerConsumer;
157+
if (listenerConsumer != null) {
158+
if (listenerConsumer.definedPartitions != null) {
159+
return Collections.unmodifiableCollection(listenerConsumer.definedPartitions.keySet());
160+
}
161+
else if (listenerConsumer.assignedPartitions != null) {
162+
return Collections.unmodifiableCollection(listenerConsumer.assignedPartitions);
163+
}
164+
else {
165+
return null;
166+
}
161167
}
162168
else {
163169
return null;
@@ -267,7 +273,8 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
267273
public String toString() {
268274
return "KafkaMessageListenerContainer [id=" + getBeanName()
269275
+ (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "")
270-
+ ", topicPartitions=" + getAssignedPartitions()
276+
+ ", topicPartitions="
277+
+ (getAssignedPartitions() == null ? "none assigned" : getAssignedPartitions())
271278
+ "]";
272279
}
273280

0 commit comments

Comments
 (0)