Skip to content

Commit 7668f52

Browse files
garyrussellartembilan
authored andcommitted
GH-1618: Remove Duplicate Parameter
Resolves #1618 Incorrectly added the child container as a parameter for the `ConsumerStoppedEvent`. The child container is already passed in as the `source`. **cherry-pick to 2.5.x**
1 parent 5c1d290 commit 7668f52

File tree

4 files changed

+24
-27
lines changed

4 files changed

+24
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.kafka.event;
1818

19-
import org.springframework.lang.Nullable;
20-
2119
/**
2220
* An event published when a consumer is stopped. While it is best practice to use
2321
* stateless listeners, you can consume this event to clean up any thread-based resources
@@ -55,8 +53,6 @@ public enum Reason {
5553

5654
private final Reason reason;
5755

58-
private final Object container;
59-
6056
/**
6157
* Construct an instance with the provided source.
6258
* @param source the container.
@@ -74,22 +70,18 @@ public ConsumerStoppedEvent(Object source) {
7470
*/
7571
@Deprecated
7672
public ConsumerStoppedEvent(Object source, Object container) {
77-
this(source, container, null, Reason.NORMAL);
73+
this(source, container, Reason.NORMAL);
7874
}
7975

8076
/**
8177
* Construct an instance with the provided source and container.
8278
* @param source the container instance that generated the event.
8379
* @param container the container or the parent container if the container is a child.
84-
* @param childContainer the child container, or null.
8580
* @param reason the reason.
8681
* @since 2.5.8
8782
*/
88-
public ConsumerStoppedEvent(Object source, Object container, @Nullable Object childContainer,
89-
Reason reason) {
90-
83+
public ConsumerStoppedEvent(Object source, Object container, Reason reason) {
9184
super(source, container);
92-
this.container = childContainer;
9385
this.reason = reason;
9486
}
9587

@@ -102,22 +94,9 @@ public Reason getReason() {
10294
return this.reason;
10395
}
10496

105-
/**
106-
* Return the container that the Consumer belonged to.
107-
* @param <T> the container type.
108-
* @return the container.
109-
* @since 2.5.8
110-
*/
111-
@SuppressWarnings("unchecked")
112-
public <T> T getContainer() {
113-
return this.container == null ? (T) getSource() : (T) this.container;
114-
}
115-
11697
@Override
11798
public String toString() {
118-
return "ConsumerStoppedEvent [source=" + getSource()
119-
+ (this.container == null ? "" : (", container=" + this.container))
120-
+ ", reason=" + this.reason + "]";
99+
return "ConsumerStoppedEvent [source=" + getSource() + ", reason=" + this.reason + "]";
121100
}
122101

123102
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,6 @@ private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
405405
private void publishConsumerStoppedEvent(@Nullable Throwable throwable) {
406406
if (getApplicationEventPublisher() != null) {
407407
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer,
408-
this.thisOrParentContainer.equals(this) ? null : this,
409408
throwable instanceof Error
410409
? Reason.ERROR
411410
: throwable instanceof StopAfterFenceException

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,7 @@ public void onMessage(Object data) {
333333
assertThat(stopEventLatch.await(10, TimeUnit.SECONDS)).isTrue();
334334
assertThat(stopEvent.get().getReason()).isEqualTo(Reason.NORMAL);
335335
}
336-
MessageListenerContainer stoppedContainer = stopEvent.get().getContainer();
337-
assertThat(stoppedContainer).isSameAs(container);
336+
assertThat(stopEvent.get().getSource()).isSameAs(container);
338337
}
339338

340339
@SuppressWarnings({ "rawtypes", "unchecked" })

src/reference/asciidoc/kafka.adoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2598,6 +2598,24 @@ The `ConsumerStartingEvent`, `ConsumerStartingEvent`, `ConsumerFailedToStartEven
25982598
All containers (whether a child or a parent) publish `ContainerStoppedEvent`.
25992599
For a parent container, the source and container properties are identical.
26002600

2601+
In addition, the `ConsumerStoppedEvent` has the following additional property:
2602+
2603+
* `reason`
2604+
** `NORMAL` - the consumer stopped normally (container was stopped).
2605+
** `ERROR` - a `java.lang.Error` was thrown.
2606+
** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`.
2607+
2608+
You can use this event to restart the container after such a condition:
2609+
2610+
====
2611+
[source, java]
2612+
----
2613+
if (event.getReason.equals(Reason.FENCED)) {
2614+
event.getSource(MessageListenerContainer.class).start();
2615+
}
2616+
----
2617+
====
2618+
26012619
[[idle-containers]]
26022620
===== Detecting Idle and Non-Responsive Consumers
26032621

@@ -5015,6 +5033,8 @@ IMPORTANT: With current `kafka-clients`, the container cannot detect whether a `
50155033
Because, in most cases, it is caused by a rebalance, the container does not call the `AfterRollbackProcessor` (because it's not appropriate to seek the partitions because we no longer are assigned them).
50165034
If you ensure the timeout is large enough to process each transaction and periodically perform an "empty" transaction (e.g. via a `ListenerContainerIdleEvent`) you can avoid fencing due to timeout and expiry.
50175035
Or, you can set the `stopContainerWhenFenced` container property to `true` and the container will stop, avoiding the loss of records.
5036+
You can consume a `ConsumerStoppedEvent` and check the `Reason` property for `FENCED` to detect this condition.
5037+
Since the event also has a reference to the container, you can restart the container using this event.
50185038

50195039
[[delivery-header]]
50205040
===== Delivery Attempts Header

0 commit comments

Comments
 (0)