Skip to content

Commit 3c60dac

Browse files
Merge pull request #1729 from benjchristensen/combineLatest-1717
CombineLatest: Request Up When Dropping Values
2 parents 80f7f30 + fcb06c5 commit 3c60dac

File tree

2 files changed

+60
-10
lines changed

2 files changed

+60
-10
lines changed

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ public void onCompleted(int index, boolean hadValue) {
183183
}
184184
}
185185

186-
public void onNext(int index, T t) {
186+
/**
187+
* @return boolean true if propagated value
188+
*/
189+
public boolean onNext(int index, T t) {
187190
synchronized (this) {
188191
if (!haveValues.get(index)) {
189192
haveValues.set(index);
@@ -192,17 +195,19 @@ public void onNext(int index, T t) {
192195
collectedValues[index] = t;
193196
if (haveValuesCount != collectedValues.length) {
194197
// haven't received value from each source yet so won't emit
195-
return;
196-
}
197-
try {
198-
buffer.onNext(combinator.call(collectedValues));
199-
} catch (MissingBackpressureException e) {
200-
onError(e);
201-
} catch (Throwable e) {
202-
onError(e);
198+
return false;
199+
} else {
200+
try {
201+
buffer.onNext(combinator.call(collectedValues));
202+
} catch (MissingBackpressureException e) {
203+
onError(e);
204+
} catch (Throwable e) {
205+
onError(e);
206+
}
203207
}
204208
}
205209
tick();
210+
return true;
206211
}
207212

208213
public void onError(Throwable e) {
@@ -244,7 +249,10 @@ public void onError(Throwable e) {
244249
public void onNext(T t) {
245250
hasValue = true;
246251
emitted.incrementAndGet();
247-
producer.onNext(index, t);
252+
boolean emitted = producer.onNext(index, t);
253+
if (!emitted) {
254+
request(1);
255+
}
248256
}
249257

250258
}

src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java

+42
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,18 @@
2929
import java.util.Collections;
3030
import java.util.List;
3131
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
3234

3335
import org.junit.Test;
3436
import org.mockito.InOrder;
3537
import org.mockito.Matchers;
3638

39+
import rx.Notification;
3740
import rx.Observable;
3841
import rx.Observer;
3942
import rx.Subscriber;
43+
import rx.functions.Action1;
4044
import rx.functions.Func2;
4145
import rx.functions.Func3;
4246
import rx.functions.Func4;
@@ -803,4 +807,42 @@ public void testBackpressure() {
803807
assertEquals("two4", events.get(2));
804808
assertEquals(NUM, events.size());
805809
}
810+
811+
@Test
812+
public void testWithCombineLatestIssue1717() throws InterruptedException {
813+
final CountDownLatch latch = new CountDownLatch(1);
814+
final AtomicInteger count = new AtomicInteger();
815+
final int SIZE = 2000;
816+
Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
817+
.observeOn(Schedulers.newThread())
818+
.doOnEach(new Action1<Notification<? super Long>>() {
819+
820+
@Override
821+
public void call(Notification<? super Long> n) {
822+
// System.out.println(n);
823+
if (count.incrementAndGet() >= SIZE) {
824+
latch.countDown();
825+
}
826+
}
827+
828+
}).take(SIZE);
829+
830+
TestSubscriber<Long> ts = new TestSubscriber<Long>();
831+
832+
Observable.combineLatest(timer, Observable.<Integer> never(), new Func2<Long, Integer, Long>() {
833+
834+
@Override
835+
public Long call(Long t1, Integer t2) {
836+
return t1;
837+
}
838+
839+
}).subscribe(ts);
840+
841+
if (!latch.await(SIZE + 1000, TimeUnit.MILLISECONDS)) {
842+
fail("timed out");
843+
}
844+
845+
assertEquals(SIZE, count.get());
846+
}
847+
806848
}

0 commit comments

Comments
 (0)