Skip to content

Commit 09df08a

Browse files
authored
2.x: make withLatestFrom conditional subscriber, test cold consumption (#5495)
* 2.x: make withLatestFrom conditional subscriber, test cold consumption * Cleanup internal logic, improve coverage
1 parent a85b0db commit 09df08a

File tree

3 files changed

+145
-21
lines changed

3 files changed

+145
-21
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.exceptions.Exceptions;
2222
import io.reactivex.functions.BiFunction;
2323
import io.reactivex.internal.functions.ObjectHelper;
24+
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2425
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2526
import io.reactivex.subscribers.SerializedSubscriber;
2627

@@ -45,7 +46,8 @@ protected void subscribeActual(Subscriber<? super R> s) {
4546
source.subscribe(wlf);
4647
}
4748

48-
static final class WithLatestFromSubscriber<T, U, R> extends AtomicReference<U> implements FlowableSubscriber<T>, Subscription {
49+
static final class WithLatestFromSubscriber<T, U, R> extends AtomicReference<U>
50+
implements ConditionalSubscriber<T>, Subscription {
4951

5052
private static final long serialVersionUID = -312246233408980075L;
5153

@@ -69,6 +71,13 @@ public void onSubscribe(Subscription s) {
6971

7072
@Override
7173
public void onNext(T t) {
74+
if (!tryOnNext(t)) {
75+
s.get().request(1);
76+
}
77+
}
78+
79+
@Override
80+
public boolean tryOnNext(T t) {
7281
U u = get();
7382
if (u != null) {
7483
R r;
@@ -78,11 +87,12 @@ public void onNext(T t) {
7887
Exceptions.throwIfFatal(e);
7988
cancel();
8089
actual.onError(e);
81-
return;
90+
return false;
8291
}
8392
actual.onNext(r);
84-
} else{
85-
request(1);
93+
return true;
94+
} else {
95+
return false;
8696
}
8797
}
8898

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
import io.reactivex.*;
2121
import io.reactivex.annotations.*;
22-
import io.reactivex.disposables.Disposable;
2322
import io.reactivex.exceptions.Exceptions;
2423
import io.reactivex.functions.Function;
2524
import io.reactivex.internal.functions.ObjectHelper;
25+
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2626
import io.reactivex.internal.subscriptions.*;
2727
import io.reactivex.internal.util.*;
2828
import io.reactivex.plugins.RxJavaPlugins;
@@ -95,7 +95,7 @@ protected void subscribeActual(Subscriber<? super R> s) {
9595

9696
static final class WithLatestFromSubscriber<T, R>
9797
extends AtomicInteger
98-
implements FlowableSubscriber<T>, Subscription {
98+
implements ConditionalSubscriber<T>, Subscription {
9999

100100
private static final long serialVersionUID = 1577321883966341961L;
101101

@@ -133,7 +133,7 @@ void subscribe(Publisher<?>[] others, int n) {
133133
WithLatestInnerSubscriber[] subscribers = this.subscribers;
134134
AtomicReference<Subscription> s = this.s;
135135
for (int i = 0; i < n; i++) {
136-
if (SubscriptionHelper.isCancelled(s.get()) || done) {
136+
if (SubscriptionHelper.isCancelled(s.get())) {
137137
return;
138138
}
139139
others[i].subscribe(subscribers[i]);
@@ -147,8 +147,15 @@ public void onSubscribe(Subscription s) {
147147

148148
@Override
149149
public void onNext(T t) {
150+
if (!tryOnNext(t) && !done) {
151+
s.get().request(1);
152+
}
153+
}
154+
155+
@Override
156+
public boolean tryOnNext(T t) {
150157
if (done) {
151-
return;
158+
return false;
152159
}
153160
AtomicReferenceArray<Object> ara = values;
154161
int n = ara.length();
@@ -159,8 +166,7 @@ public void onNext(T t) {
159166
Object o = ara.get(i);
160167
if (o == null) {
161168
// somebody hasn't signalled yet, skip this T
162-
s.get().request(1);
163-
return;
169+
return false;
164170
}
165171
objects[i + 1] = o;
166172
}
@@ -173,10 +179,11 @@ public void onNext(T t) {
173179
Exceptions.throwIfFatal(ex);
174180
cancel();
175181
onError(ex);
176-
return;
182+
return false;
177183
}
178184

179185
HalfSerializer.onNext(actual, v, this, error);
186+
return true;
180187
}
181188

182189
@Override
@@ -207,7 +214,7 @@ public void request(long n) {
207214
@Override
208215
public void cancel() {
209216
SubscriptionHelper.cancel(s);
210-
for (Disposable s : subscribers) {
217+
for (WithLatestInnerSubscriber s : subscribers) {
211218
s.dispose();
212219
}
213220
}
@@ -226,6 +233,7 @@ void innerError(int index, Throwable t) {
226233
void innerComplete(int index, boolean nonEmpty) {
227234
if (!nonEmpty) {
228235
done = true;
236+
SubscriptionHelper.cancel(s);
229237
cancelAllBut(index);
230238
HalfSerializer.onComplete(actual, this, error);
231239
}
@@ -243,7 +251,7 @@ void cancelAllBut(int index) {
243251

244252
static final class WithLatestInnerSubscriber
245253
extends AtomicReference<Subscription>
246-
implements FlowableSubscriber<Object>, Disposable {
254+
implements FlowableSubscriber<Object> {
247255

248256
private static final long serialVersionUID = 3256684027868224024L;
249257

@@ -283,13 +291,7 @@ public void onComplete() {
283291
parent.innerComplete(index, hasValue);
284292
}
285293

286-
@Override
287-
public boolean isDisposed() {
288-
return SubscriptionHelper.isCancelled(get());
289-
}
290-
291-
@Override
292-
public void dispose() {
294+
void dispose() {
293295
SubscriptionHelper.cancel(this);
294296
}
295297
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ public void zeroOtherCombinerReturnsNull() {
726726
}
727727

728728
@Test
729-
public void testSingleRequestNotForgottenWhenNoData() {
729+
public void singleRequestNotForgottenWhenNoData() {
730730
PublishProcessor<Integer> source = PublishProcessor.create();
731731
PublishProcessor<Integer> other = PublishProcessor.create();
732732

@@ -750,4 +750,116 @@ public void testSingleRequestNotForgottenWhenNoData() {
750750

751751
ts.assertValue((2 << 8) + 1);
752752
}
753+
754+
@Test
755+
public void coldSourceConsumedWithoutOther() {
756+
Flowable.range(1, 10).withLatestFrom(Flowable.never(),
757+
new BiFunction<Integer, Object, Object>() {
758+
@Override
759+
public Object apply(Integer a, Object b) throws Exception {
760+
return a;
761+
}
762+
})
763+
.test(1)
764+
.assertResult();
765+
}
766+
767+
@Test
768+
public void coldSourceConsumedWithoutManyOthers() {
769+
Flowable.range(1, 10).withLatestFrom(Flowable.never(), Flowable.never(), Flowable.never(),
770+
new Function4<Integer, Object, Object, Object, Object>() {
771+
@Override
772+
public Object apply(Integer a, Object b, Object c, Object d) throws Exception {
773+
return a;
774+
}
775+
})
776+
.test(1)
777+
.assertResult();
778+
}
779+
780+
@Test
781+
public void otherOnSubscribeRace() {
782+
for (int i = 0; i < 1000; i++) {
783+
final PublishProcessor<Integer> pp0 = PublishProcessor.create();
784+
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
785+
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
786+
final PublishProcessor<Integer> pp3 = PublishProcessor.create();
787+
788+
final Flowable<Object> source = pp0.withLatestFrom(pp1, pp2, pp3, new Function4<Object, Integer, Integer, Integer, Object>() {
789+
@Override
790+
public Object apply(Object a, Integer b, Integer c, Integer d)
791+
throws Exception {
792+
return a;
793+
}
794+
});
795+
796+
final TestSubscriber<Object> ts = new TestSubscriber<Object>();
797+
798+
Runnable r1 = new Runnable() {
799+
@Override
800+
public void run() {
801+
source.subscribe(ts);
802+
}
803+
};
804+
805+
Runnable r2 = new Runnable() {
806+
@Override
807+
public void run() {
808+
ts.cancel();
809+
}
810+
};
811+
812+
TestHelper.race(r1, r2);
813+
814+
ts.assertEmpty();
815+
816+
assertFalse(pp0.hasSubscribers());
817+
assertFalse(pp1.hasSubscribers());
818+
assertFalse(pp2.hasSubscribers());
819+
assertFalse(pp3.hasSubscribers());
820+
}
821+
}
822+
823+
@Test
824+
public void otherCompleteRace() {
825+
for (int i = 0; i < 1000; i++) {
826+
final PublishProcessor<Integer> pp0 = PublishProcessor.create();
827+
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
828+
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
829+
final PublishProcessor<Integer> pp3 = PublishProcessor.create();
830+
831+
final Flowable<Object> source = pp0.withLatestFrom(pp1, pp2, pp3, new Function4<Object, Integer, Integer, Integer, Object>() {
832+
@Override
833+
public Object apply(Object a, Integer b, Integer c, Integer d)
834+
throws Exception {
835+
return a;
836+
}
837+
});
838+
839+
final TestSubscriber<Object> ts = new TestSubscriber<Object>();
840+
841+
Runnable r1 = new Runnable() {
842+
@Override
843+
public void run() {
844+
source.subscribe(ts);
845+
}
846+
};
847+
848+
Runnable r2 = new Runnable() {
849+
@Override
850+
public void run() {
851+
pp1.onComplete();
852+
}
853+
};
854+
855+
TestHelper.race(r1, r2);
856+
857+
ts.assertResult();
858+
859+
assertFalse(pp0.hasSubscribers());
860+
assertFalse(pp1.hasSubscribers());
861+
assertFalse(pp2.hasSubscribers());
862+
assertFalse(pp3.hasSubscribers());
863+
}
864+
}
753865
}

0 commit comments

Comments
 (0)