Skip to content

Commit 745a922

Browse files
authored
Merge pull request #4849 from akarnokd/CompletableConcatFix
1.x: fix Completable.concat & merge hanging in async situations
2 parents dc63e53 + fb3d0e8 commit 745a922

File tree

4 files changed

+132
-34
lines changed

4 files changed

+132
-34
lines changed

src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import rx.*;
2222
import rx.Completable.OnSubscribe;
2323
import rx.exceptions.MissingBackpressureException;
24+
import rx.internal.subscriptions.SequentialSubscription;
2425
import rx.internal.util.unsafe.SpscArrayQueue;
2526
import rx.plugins.RxJavaHooks;
26-
import rx.subscriptions.SerialSubscription;
2727

2828
public final class CompletableOnSubscribeConcat implements OnSubscribe {
2929
final Observable<Completable> sources;
@@ -39,30 +39,29 @@ public CompletableOnSubscribeConcat(Observable<? extends Completable> sources, i
3939
public void call(CompletableSubscriber s) {
4040
CompletableConcatSubscriber parent = new CompletableConcatSubscriber(s, prefetch);
4141
s.onSubscribe(parent);
42-
sources.subscribe(parent);
42+
sources.unsafeSubscribe(parent);
4343
}
4444

4545
static final class CompletableConcatSubscriber
4646
extends Subscriber<Completable> {
4747
final CompletableSubscriber actual;
48-
final SerialSubscription sr;
48+
final SequentialSubscription sr;
4949

5050
final SpscArrayQueue<Completable> queue;
5151

52-
volatile boolean done;
52+
final ConcatInnerSubscriber inner;
5353

5454
final AtomicBoolean once;
5555

56-
final ConcatInnerSubscriber inner;
56+
volatile boolean done;
5757

58-
final AtomicInteger wip;
58+
volatile boolean active;
5959

6060
public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) {
6161
this.actual = actual;
6262
this.queue = new SpscArrayQueue<Completable>(prefetch);
63-
this.sr = new SerialSubscription();
63+
this.sr = new SequentialSubscription();
6464
this.inner = new ConcatInnerSubscriber();
65-
this.wip = new AtomicInteger();
6665
this.once = new AtomicBoolean();
6766
add(sr);
6867
request(prefetch);
@@ -74,9 +73,7 @@ public void onNext(Completable t) {
7473
onError(new MissingBackpressureException());
7574
return;
7675
}
77-
if (wip.getAndIncrement() == 0) {
78-
next();
79-
}
76+
drain();
8077
}
8178

8279
@Override
@@ -94,9 +91,7 @@ public void onCompleted() {
9491
return;
9592
}
9693
done = true;
97-
if (wip.getAndIncrement() == 0) {
98-
next();
99-
}
94+
drain();
10095
}
10196

10297
void innerError(Throwable e) {
@@ -105,32 +100,45 @@ void innerError(Throwable e) {
105100
}
106101

107102
void innerComplete() {
108-
if (wip.decrementAndGet() != 0) {
109-
next();
110-
}
111-
if (!done) {
112-
request(1);
113-
}
103+
active = false;
104+
drain();
114105
}
115106

116-
void next() {
117-
boolean d = done;
118-
Completable c = queue.poll();
119-
if (c == null) {
120-
if (d) {
121-
if (once.compareAndSet(false, true)) {
122-
actual.onCompleted();
123-
}
124-
return;
125-
}
126-
RxJavaHooks.onError(new IllegalStateException("Queue is empty?!"));
107+
void drain() {
108+
ConcatInnerSubscriber inner = this.inner;
109+
if (inner.getAndIncrement() != 0) {
127110
return;
128111
}
129112

130-
c.unsafeSubscribe(inner);
113+
do {
114+
if (isUnsubscribed()) {
115+
return;
116+
}
117+
if (!active) {
118+
boolean d = done;
119+
Completable c = queue.poll();
120+
boolean empty = c == null;
121+
122+
if (d && empty) {
123+
actual.onCompleted();
124+
return;
125+
}
126+
127+
if (!empty) {
128+
active = true;
129+
c.subscribe(inner);
130+
131+
request(1);
132+
}
133+
}
134+
} while (inner.decrementAndGet() != 0);
131135
}
132136

133-
final class ConcatInnerSubscriber implements CompletableSubscriber {
137+
final class ConcatInnerSubscriber
138+
extends AtomicInteger
139+
implements CompletableSubscriber {
140+
private static final long serialVersionUID = 7233503139645205620L;
141+
134142
@Override
135143
public void onSubscribe(Subscription d) {
136144
sr.set(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public CompletableOnSubscribeMerge(Observable<? extends Completable> source, int
4343
public void call(CompletableSubscriber s) {
4444
CompletableMergeSubscriber parent = new CompletableMergeSubscriber(s, maxConcurrency, delayErrors);
4545
s.onSubscribe(parent);
46-
source.subscribe(parent);
46+
source.unsafeSubscribe(parent);
4747
}
4848

4949
static final class CompletableMergeSubscriber
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package rx.internal.operators;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import org.junit.*;
6+
7+
import rx.*;
8+
import rx.functions.*;
9+
import rx.schedulers.Schedulers;
10+
11+
public class CompletableConcatTest {
12+
13+
@Test
14+
public void asyncObservables() {
15+
16+
final int[] calls = { 0 };
17+
18+
Completable.concat(Observable.range(1, 5).map(new Func1<Integer, Completable>() {
19+
@Override
20+
public Completable call(final Integer v) {
21+
System.out.println("Mapping " + v);
22+
return Completable.fromAction(new Action0() {
23+
@Override
24+
public void call() {
25+
System.out.println("Processing " + (calls[0] + 1));
26+
calls[0]++;
27+
}
28+
})
29+
.subscribeOn(Schedulers.io())
30+
.doOnCompleted(new Action0() {
31+
@Override
32+
public void call() {
33+
System.out.println("Inner complete " + v);
34+
}
35+
})
36+
.observeOn(Schedulers.computation());
37+
}
38+
})
39+
).test()
40+
.awaitTerminalEventAndUnsubscribeOnTimeout(5, TimeUnit.SECONDS)
41+
.assertResult();
42+
43+
Assert.assertEquals(5, calls[0]);
44+
}
45+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package rx.internal.operators;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import org.junit.*;
6+
7+
import rx.*;
8+
import rx.functions.*;
9+
import rx.schedulers.Schedulers;
10+
11+
public class CompletableMergeTest {
12+
13+
@Test
14+
public void asyncObservables() {
15+
16+
final int[] calls = { 0 };
17+
18+
Completable.merge(Observable.range(1, 5).map(new Func1<Integer, Completable>() {
19+
@Override
20+
public Completable call(final Integer v) {
21+
System.out.println("Mapping " + v);
22+
return Completable.fromAction(new Action0() {
23+
@Override
24+
public void call() {
25+
System.out.println("Processing " + (calls[0] + 1));
26+
calls[0]++;
27+
}
28+
})
29+
.subscribeOn(Schedulers.io())
30+
.doOnCompleted(new Action0() {
31+
@Override
32+
public void call() {
33+
System.out.println("Inner complete " + v);
34+
}
35+
})
36+
.observeOn(Schedulers.computation());
37+
}
38+
}), 1
39+
).test()
40+
.awaitTerminalEventAndUnsubscribeOnTimeout(5, TimeUnit.SECONDS)
41+
.assertResult();
42+
43+
Assert.assertEquals(5, calls[0]);
44+
}
45+
}

0 commit comments

Comments
 (0)