19
19
import static org .assertj .core .api .Assertions .assertThat ;
20
20
21
21
import java .util .Collections ;
22
+ import java .util .HashMap ;
22
23
import java .util .List ;
23
24
import java .util .Map ;
24
25
import java .util .concurrent .CountDownLatch ;
27
28
import org .apache .pulsar .client .admin .PulsarAdmin ;
28
29
import org .apache .pulsar .client .api .PulsarClient ;
29
30
import org .apache .pulsar .client .api .PulsarClientException ;
30
- import org .junit .jupiter .api .Disabled ;
31
31
32
32
import org .springframework .context .annotation .AnnotationConfigApplicationContext ;
33
33
import org .springframework .context .annotation .Bean ;
64
64
* @author Chris Bono
65
65
* @see SampleTestRunner
66
66
*/
67
- @ Disabled
68
67
public class ObservationIntegrationTests extends SampleTestRunner implements PulsarTestContainerSupport {
69
68
70
69
@ SuppressWarnings ("unchecked" )
@@ -73,13 +72,35 @@ public SampleTestRunnerConsumer yourCode() {
73
72
// template -> listener -> template -> listener
74
73
return (bb , meterRegistry ) -> {
75
74
ObservationRegistry observationRegistry = getObservationRegistry ();
76
- try (AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext ()) {
77
- applicationContext .registerBean (ObservationRegistry .class , () -> observationRegistry );
78
- applicationContext .register (ObservationIntegrationTestAppConfig .class );
79
- applicationContext .refresh ();
80
- applicationContext .getBean (PulsarTemplate .class ).send ("obs1-topic" , "hello" );
81
- CountDownLatch latch = applicationContext .getBean (ObservationIntegrationTestAppListeners .class ).latch ;
82
- assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
75
+ try (AnnotationConfigApplicationContext appContext = new AnnotationConfigApplicationContext ()) {
76
+ appContext .registerBean (ObservationRegistry .class , () -> observationRegistry );
77
+ appContext .register (ObservationIntegrationTestAppConfig .class );
78
+ appContext .refresh ();
79
+
80
+ ObservationIntegrationTestAppListeners listeners = appContext
81
+ .getBean (ObservationIntegrationTestAppListeners .class );
82
+ PulsarTemplate <String > template = appContext .getBean (PulsarTemplate .class );
83
+
84
+ String msg = "hello-" + System .currentTimeMillis ();
85
+ listeners .expectMessage (msg );
86
+ template .send ("obs1-topic" , msg );
87
+
88
+ boolean listen1Completed = listeners .latchesByMessageListen1 .get (msg ).await (10 , TimeUnit .SECONDS );
89
+ boolean listen2Completed = listeners .latchesByMessageListen2 .get (msg ).await (10 , TimeUnit .SECONDS );
90
+ assertThat (listen1Completed ).withFailMessage (
91
+ "Message %s not received in listen1 (latchesByMessageListen1 = %s and latchesByMessageListen2 = %s)" ,
92
+ msg , listeners .latchesByMessageListen1 , listeners .latchesByMessageListen2 ).isTrue ();
93
+ assertThat (listen2Completed ).withFailMessage (
94
+ "Message %s not received in listen2 (latchesByMessageListen1 = %s and latchesByMessageListen2 = %s)" ,
95
+ msg , listeners .latchesByMessageListen1 , listeners .latchesByMessageListen2 ).isTrue ();
96
+
97
+ // Without this sleep, the 2nd tracingSetup run sometimes fails due to
98
+ // messages from 1st run being
99
+ // delivered during the 2nd run. The test runs share the same listener
100
+ // config, including the
101
+ // same subscription names. Seems like the listener in run2 is getting
102
+ // message from run1.
103
+ Thread .sleep (5000 );
83
104
}
84
105
85
106
List <FinishedSpan > finishedSpans = bb .getFinishedSpans ();
@@ -165,20 +186,30 @@ static class ObservationIntegrationTestAppListeners {
165
186
166
187
private PulsarTemplate <String > template ;
167
188
168
- CountDownLatch latch = new CountDownLatch (1 );
189
+ Map <String , CountDownLatch > latchesByMessageListen1 = new HashMap <>();
190
+
191
+ Map <String , CountDownLatch > latchesByMessageListen2 = new HashMap <>();
169
192
170
193
ObservationIntegrationTestAppListeners (PulsarTemplate <String > template ) {
171
194
this .template = template ;
172
195
}
173
196
197
+ void expectMessage (String message ) {
198
+ latchesByMessageListen1 .put (message , new CountDownLatch (1 ));
199
+ latchesByMessageListen2 .put (message , new CountDownLatch (1 ));
200
+ }
201
+
174
202
@ PulsarListener (id = "obs1-id" , properties = { "subscriptionName=obs1-sub" , "topicNames=obs1-topic" })
175
203
void listen1 (String message ) throws PulsarClientException {
204
+ assertThat (latchesByMessageListen1 ).containsKey (message );
205
+ latchesByMessageListen1 .get (message ).countDown ();
176
206
this .template .send ("obs2-topic" , message );
177
207
}
178
208
179
209
@ PulsarListener (id = "obs2-id" , properties = { "subscriptionName=obs2-sub" , "topicNames=obs2-topic" })
180
210
void listen2 (String message ) {
181
- latch .countDown ();
211
+ assertThat (latchesByMessageListen2 ).containsKey (message );
212
+ latchesByMessageListen2 .get (message ).countDown ();
182
213
}
183
214
184
215
}
0 commit comments