Skip to content

Commit c58e1c3

Browse files
committed
Fix async tests for Collections.synchronizedList()
The `AsyncCompletableFutureRetryTopicScenarioTests` & `AsyncMonoRetryTopicScenarioTests` use `new ArrayList()` to collect consumed messages and then `CountDownLatch` to fulfill expectations. Turns out the `CountDownLatch` might be fulfilled and respective assert in the test would pass, but the next assertion for the mentioned list might fail. The idea that state of the `ArrayList` cannot be predictable in between threads. So, suggestion is to use `Collections.synchronizedList()` instead for better memory barrier management
1 parent ccf4666 commit c58e1c3

File tree

2 files changed

+32
-32
lines changed

2 files changed

+32
-32
lines changed

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -602,9 +602,9 @@ static class TestTopicListener0 {
602602
@Autowired
603603
CountDownLatchContainer container;
604604

605-
private final List<String> receivedMsgs = new ArrayList<>();
605+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
606606

607-
private final List<String> receivedTopics = new ArrayList<>();
607+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
608608

609609
@KafkaHandler
610610
public CompletableFuture<Void> listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
@@ -634,9 +634,9 @@ static class TestTopicListener1 {
634634
@Autowired
635635
CountDownLatchContainer container;
636636

637-
private final List<String> receivedMsgs = new ArrayList<>();
637+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
638638

639-
private final List<String> receivedTopics = new ArrayList<>();
639+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
640640

641641
private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1);
642642

@@ -691,9 +691,9 @@ static class TestTopicListener2 {
691691
@Autowired
692692
CountDownLatchContainer container;
693693

694-
protected final List<String> receivedMsgs = new ArrayList<>();
694+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
695695

696-
private final List<String> receivedTopics = new ArrayList<>();
696+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
697697

698698
private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1);
699699

@@ -750,9 +750,9 @@ static class TestTopicListener3 {
750750
@Autowired
751751
CountDownLatchContainer container;
752752

753-
protected final List<String> receivedMsgs = new ArrayList<>();
753+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
754754

755-
private final List<String> receivedTopics = new ArrayList<>();
755+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
756756

757757
public static final String FAIL_PREFIX = "fail";
758758

@@ -815,9 +815,9 @@ static class TestTopicListener4 {
815815
@Autowired
816816
CountDownLatchContainer container;
817817

818-
protected final List<String> receivedMsgs = new ArrayList<>();
818+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
819819

820-
private final List<String> receivedTopics = new ArrayList<>();
820+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
821821

822822
public static final String LONG_SUCCESS_MSG = "success";
823823

@@ -881,9 +881,9 @@ static class TestTopicListener5 {
881881
@Autowired
882882
CountDownLatchContainer container;
883883

884-
protected final List<String> receivedMsgs = new ArrayList<>();
884+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
885885

886-
private final List<String> receivedTopics = new ArrayList<>();
886+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
887887

888888
public static final String LONG_SUCCESS_MSG = "success";
889889

@@ -947,9 +947,9 @@ static class TestTopicListener6 {
947947
@Autowired
948948
CountDownLatchContainer container;
949949

950-
protected final List<String> receivedMsgs = new ArrayList<>();
950+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
951951

952-
private final List<String> receivedTopics = new ArrayList<>();
952+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
953953

954954
public static final String SUCCESS_PREFIX = "success";
955955

@@ -1080,7 +1080,7 @@ static class CountDownLatchContainer {
10801080

10811081
static class MyCustomDltProcessor {
10821082

1083-
final List<String> receivedMsg = new ArrayList<>();
1083+
private final List<String> receivedMsg = Collections.synchronizedList(new ArrayList<>());
10841084

10851085
MyCustomDltProcessor(KafkaTemplate<String, String> kafkaTemplate,
10861086
CountDownLatch latch) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -598,9 +598,9 @@ static class TestTopicListener0 {
598598
@Autowired
599599
CountDownLatchContainer container;
600600

601-
private final List<String> receivedMsgs = new ArrayList<>();
601+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
602602

603-
private final List<String> receivedTopics = new ArrayList<>();
603+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
604604

605605
@KafkaHandler
606606
public Mono<Void> listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
@@ -630,9 +630,9 @@ static class TestTopicListener1 {
630630
@Autowired
631631
CountDownLatchContainer container;
632632

633-
private final List<String> receivedMsgs = new ArrayList<>();
633+
private final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
634634

635-
private final List<String> receivedTopics = new ArrayList<>();
635+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
636636

637637
private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1);
638638

@@ -687,9 +687,9 @@ static class TestTopicListener2 {
687687
@Autowired
688688
CountDownLatchContainer container;
689689

690-
protected final List<String> receivedMsgs = new ArrayList<>();
690+
protected final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
691691

692-
private final List<String> receivedTopics = new ArrayList<>();
692+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
693693

694694
private CountDownLatch firstRetryFailMsgLatch = new CountDownLatch(1);
695695

@@ -746,9 +746,9 @@ static class TestTopicListener3 {
746746
@Autowired
747747
CountDownLatchContainer container;
748748

749-
protected final List<String> receivedMsgs = new ArrayList<>();
749+
protected final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
750750

751-
private final List<String> receivedTopics = new ArrayList<>();
751+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
752752

753753
public static final String FAIL_PREFIX = "fail";
754754

@@ -811,9 +811,9 @@ static class TestTopicListener4 {
811811
@Autowired
812812
CountDownLatchContainer container;
813813

814-
protected final List<String> receivedMsgs = new ArrayList<>();
814+
protected final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
815815

816-
private final List<String> receivedTopics = new ArrayList<>();
816+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
817817

818818
public static final String LONG_SUCCESS_MSG = "success";
819819

@@ -877,9 +877,9 @@ static class TestTopicListener5 {
877877
@Autowired
878878
CountDownLatchContainer container;
879879

880-
protected final List<String> receivedMsgs = new ArrayList<>();
880+
protected final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
881881

882-
private final List<String> receivedTopics = new ArrayList<>();
882+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
883883

884884
public static final String LONG_SUCCESS_MSG = "success";
885885

@@ -943,9 +943,9 @@ static class TestTopicListener6 {
943943
@Autowired
944944
CountDownLatchContainer container;
945945

946-
protected final List<String> receivedMsgs = new ArrayList<>();
946+
protected final List<String> receivedMsgs = Collections.synchronizedList(new ArrayList<>());
947947

948-
private final List<String> receivedTopics = new ArrayList<>();
948+
private final List<String> receivedTopics = Collections.synchronizedList(new ArrayList<>());
949949

950950
public static final String SUCCESS_PREFIX = "success";
951951

@@ -1076,7 +1076,7 @@ static class CountDownLatchContainer {
10761076

10771077
static class MyCustomDltProcessor {
10781078

1079-
final List<String> receivedMsg = new ArrayList<>();
1079+
final List<String> receivedMsg = Collections.synchronizedList(new ArrayList<>());
10801080

10811081
MyCustomDltProcessor(KafkaTemplate<String, String> kafkaTemplate, CountDownLatch latch) {
10821082
this.kafkaTemplate = kafkaTemplate;

0 commit comments

Comments
 (0)