Skip to content

Commit 84bf7c8

Browse files
Merge pull request #928 from akarnokd/SubscribeOnBoundedFix
Fix deadlock in SubscribeOnBounded
2 parents 378772c + c32209b commit 84bf7c8

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,13 @@ public void onNext(final Observable<T> o) {
8888
if (checkNeedBuffer(o)) {
8989
// use buffering (possibly blocking) for a possibly synchronous subscribe
9090
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
91-
o.subscribe(bus);
9291
subscriber.add(scheduler.schedule(new Action1<Inner>() {
9392
@Override
9493
public void call(final Inner inner) {
9594
bus.enterPassthroughMode();
9695
}
9796
}));
98-
return;
97+
o.subscribe(bus);
9998
} else {
10099
// no buffering (async subscribe)
101100
subscriber.add(scheduler.schedule(new Action1<Inner>() {

rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,5 +399,18 @@ public void call(Subscriber<? super Integer> sub) {
399399
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
400400
assertEquals(10, count.get());
401401
}
402-
402+
@Test(timeout = 2000)
403+
public void testNoDeadlock() {
404+
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
405+
Observable<Integer> source = Observable.from(data);
406+
407+
Observable<Integer> result = source.nest().lift(new OperatorSubscribeOnBounded<Integer>(Schedulers.newThread(), 1));
408+
409+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
410+
411+
result.subscribe(ts);
412+
413+
ts.awaitTerminalEvent();
414+
ts.assertReceivedOnNext(data);
415+
}
403416
}

0 commit comments

Comments
 (0)