@@ -284,7 +284,8 @@ public void runConcurrencyTest() {
284
284
public void testNotificationDelay () throws InterruptedException {
285
285
ExecutorService tp = Executors .newFixedThreadPool (2 );
286
286
287
- final CountDownLatch onNextCount = new CountDownLatch (1 );
287
+ final CountDownLatch firstOnNext = new CountDownLatch (1 );
288
+ final CountDownLatch onNextCount = new CountDownLatch (2 );
288
289
final CountDownLatch latch = new CountDownLatch (1 );
289
290
290
291
TestSubscriber <String > to = new TestSubscriber <String >(new Observer <String >() {
@@ -301,8 +302,7 @@ public void onError(Throwable e) {
301
302
302
303
@ Override
303
304
public void onNext (String t ) {
304
- // know when the first thread gets in
305
- onNextCount .countDown ();
305
+ firstOnNext .countDown ();
306
306
// force it to take time when delivering so the second one is enqueued
307
307
try {
308
308
latch .await ();
@@ -313,10 +313,10 @@ public void onNext(String t) {
313
313
});
314
314
Observer <String > o = serializedObserver (to );
315
315
316
- Future <?> f1 = tp .submit (new OnNextThread (o , 1 ));
317
- Future <?> f2 = tp .submit (new OnNextThread (o , 1 ));
316
+ Future <?> f1 = tp .submit (new OnNextThread (o , 1 , onNextCount ));
317
+ Future <?> f2 = tp .submit (new OnNextThread (o , 1 , onNextCount ));
318
318
319
- onNextCount .await ();
319
+ firstOnNext .await ();
320
320
321
321
Thread t1 = to .getLastSeenThread ();
322
322
System .out .println ("first onNext on thread: " + t1 );
@@ -431,14 +431,24 @@ public void call(Subscriber<? super String> s) {
431
431
*/
432
432
public static class OnNextThread implements Runnable {
433
433
434
+ private final CountDownLatch latch ;
434
435
private final Observer <String > observer ;
435
436
private final int numStringsToSend ;
436
437
final AtomicInteger produced ;
437
438
439
+ OnNextThread (Observer <String > observer , int numStringsToSend , CountDownLatch latch ) {
440
+ this (observer , numStringsToSend , new AtomicInteger (), latch );
441
+ }
442
+
438
443
OnNextThread (Observer <String > observer , int numStringsToSend , AtomicInteger produced ) {
444
+ this (observer , numStringsToSend , produced , null );
445
+ }
446
+
447
+ OnNextThread (Observer <String > observer , int numStringsToSend , AtomicInteger produced , CountDownLatch latch ) {
439
448
this .observer = observer ;
440
449
this .numStringsToSend = numStringsToSend ;
441
450
this .produced = produced ;
451
+ this .latch = latch ;
442
452
}
443
453
444
454
OnNextThread (Observer <String > observer , int numStringsToSend ) {
@@ -449,6 +459,9 @@ public static class OnNextThread implements Runnable {
449
459
public void run () {
450
460
for (int i = 0 ; i < numStringsToSend ; i ++) {
451
461
observer .onNext (Thread .currentThread ().getId () + "-" + i );
462
+ if (latch != null ) {
463
+ latch .countDown ();
464
+ }
452
465
produced .incrementAndGet ();
453
466
}
454
467
}
0 commit comments