|
22 | 22 | import java.util.concurrent.CountDownLatch;
|
23 | 23 | import java.util.concurrent.TimeUnit;
|
24 | 24 | import java.util.concurrent.atomic.AtomicInteger;
|
| 25 | +import java.util.concurrent.atomic.AtomicLong; |
25 | 26 |
|
26 | 27 | import org.junit.Test;
|
27 | 28 | import org.mockito.InOrder;
|
@@ -310,6 +311,41 @@ public void call(Integer t1) {
|
310 | 311 | });
|
311 | 312 | }
|
312 | 313 |
|
| 314 | + @Test |
| 315 | + public void testNonBlockingOuterWhileBlockingOnNext() throws InterruptedException { |
| 316 | + |
| 317 | + final CountDownLatch latch = new CountDownLatch(1); |
| 318 | + final AtomicLong completeTime = new AtomicLong(); |
| 319 | + // use subscribeOn to make async, observeOn to move |
| 320 | + Observable.range(1, 1000).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Observer<Integer>() { |
| 321 | + |
| 322 | + @Override |
| 323 | + public void onCompleted() { |
| 324 | + System.out.println("onCompleted"); |
| 325 | + completeTime.set(System.nanoTime()); |
| 326 | + latch.countDown(); |
| 327 | + } |
| 328 | + |
| 329 | + @Override |
| 330 | + public void onError(Throwable e) { |
| 331 | + |
| 332 | + } |
| 333 | + |
| 334 | + @Override |
| 335 | + public void onNext(Integer t) { |
| 336 | + |
| 337 | + } |
| 338 | + |
| 339 | + }); |
| 340 | + |
| 341 | + long afterSubscribeTime = System.nanoTime(); |
| 342 | + System.out.println("After subscribe: " + latch.getCount()); |
| 343 | + assertEquals(1, latch.getCount()); |
| 344 | + latch.await(); |
| 345 | + assertTrue(completeTime.get() > afterSubscribeTime); |
| 346 | + System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime)); |
| 347 | + } |
| 348 | + |
313 | 349 | @Test
|
314 | 350 | public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException {
|
315 | 351 | testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread());
|
|
0 commit comments