Skip to content

Commit 63a058e

Browse files
Merge pull request #1819 from benjchristensen/issuje-1818-concat
Fix Concat Breaks with Double onCompleted
2 parents 3722fe0 + cade58c commit 63a058e

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

src/main/java/rx/internal/operators/OperatorConcat.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ static class ConcatInnerSubscriber<T> extends Subscriber<T> {
176176

177177
private final Subscriber<T> child;
178178
private final ConcatSubscriber<T> parent;
179+
@SuppressWarnings("unused")
180+
private volatile int once = 0;
181+
@SuppressWarnings("rawtypes")
182+
private final static AtomicIntegerFieldUpdater<ConcatInnerSubscriber> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once");
179183

180184
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, long initialRequest) {
181185
this.parent = parent;
@@ -195,14 +199,18 @@ public void onNext(T t) {
195199

196200
@Override
197201
public void onError(Throwable e) {
198-
// terminal error through parent so everything gets cleaned up, including this inner
199-
parent.onError(e);
202+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
203+
// terminal error through parent so everything gets cleaned up, including this inner
204+
parent.onError(e);
205+
}
200206
}
201207

202208
@Override
203209
public void onCompleted() {
204-
// terminal completion to parent so it continues to the next
205-
parent.completeInner();
210+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
211+
// terminal completion to parent so it continues to the next
212+
parent.completeInner();
213+
}
206214
}
207215

208216
};

src/test/java/rx/internal/operators/OperatorConcatTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -695,5 +695,27 @@ public void testInnerBackpressureWithoutAlignedBoundaries() {
695695
ts.assertNoErrors();
696696
assertEquals((RxRingBuffer.SIZE * 4) + 20, ts.getOnNextEvents().size());
697697
}
698+
699+
// https://github.com/ReactiveX/RxJava/issues/1818
700+
@Test
701+
public void testConcatWithNonCompliantSourceDoubleOnComplete() {
702+
Observable<String> o = Observable.create(new OnSubscribe<String>() {
703+
704+
@Override
705+
public void call(Subscriber<? super String> s) {
706+
s.onNext("hello");
707+
s.onCompleted();
708+
s.onCompleted();
709+
}
710+
711+
});
712+
713+
TestSubscriber<String> ts = new TestSubscriber<String>();
714+
Observable.concat(o, o).subscribe(ts);
715+
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
716+
ts.assertTerminalEvent();
717+
ts.assertNoErrors();
718+
ts.assertReceivedOnNext(Arrays.asList("hello", "hello"));
719+
}
698720

699721
}

0 commit comments

Comments
 (0)