17
17
18
18
import static org .junit .Assert .assertEquals ;
19
19
import static org .junit .Assert .assertFalse ;
20
+ import static org .junit .Assert .assertSame ;
20
21
import static org .junit .Assert .assertTrue ;
21
22
import static org .junit .Assert .fail ;
22
23
import static org .mockito .Matchers .any ;
@@ -274,10 +275,18 @@ public void runConcurrencyTest() {
274
275
}
275
276
}
276
277
278
+ /**
279
+ * Test that a notification does not get delayed in the queue waiting for the next event to push it through.
280
+ *
281
+ * @throws InterruptedException
282
+ */
277
283
@ Test
278
- public void testNotificationDelay () {
284
+ public void testNotificationDelay () throws InterruptedException {
279
285
ExecutorService tp = Executors .newFixedThreadPool (2 );
280
286
287
+ final CountDownLatch onNextCount = new CountDownLatch (1 );
288
+ final CountDownLatch latch = new CountDownLatch (1 );
289
+
281
290
TestSubscriber <String > to = new TestSubscriber <String >(new Observer <String >() {
282
291
283
292
@ Override
@@ -292,12 +301,12 @@ public void onError(Throwable e) {
292
301
293
302
@ Override
294
303
public void onNext (String t ) {
295
- // force it to take time when delivering
296
- // so the second thread will asynchronously enqueue
304
+ // know when the first thread gets in
305
+ onNextCount .countDown ();
306
+ // force it to take time when delivering so the second one is enqueued
297
307
try {
298
- Thread . sleep ( 50 );
308
+ latch . await ( );
299
309
} catch (InterruptedException e ) {
300
- e .printStackTrace ();
301
310
}
302
311
}
303
312
@@ -307,10 +316,23 @@ public void onNext(String t) {
307
316
Future <?> f1 = tp .submit (new OnNextThread (o , 1 ));
308
317
Future <?> f2 = tp .submit (new OnNextThread (o , 1 ));
309
318
319
+ onNextCount .await ();
320
+
321
+ Thread t1 = to .getLastSeenThread ();
322
+ System .out .println ("first onNext on thread: " + t1 );
323
+
324
+ latch .countDown ();
325
+
310
326
waitOnThreads (f1 , f2 );
311
327
// not completed yet
312
328
313
329
assertEquals (2 , to .getOnNextEvents ().size ());
330
+
331
+ Thread t2 = to .getLastSeenThread ();
332
+ System .out .println ("second onNext on thread: " + t2 );
333
+
334
+ assertSame (t1 , t2 );
335
+
314
336
System .out .println (to .getOnNextEvents ());
315
337
o .onCompleted ();
316
338
System .out .println (to .getOnNextEvents ());
0 commit comments