Skip to content

Commit 8f1f6f2

Browse files
Merge pull request #1463 from benjchristensen/merge-1420
Merge Bug: Missing Emissions
2 parents 3f6ee36 + a720f58 commit 8f1f6f2

File tree

3 files changed

+46
-4
lines changed

3 files changed

+46
-4
lines changed

rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorToIterator.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.BlockingQueue;
2121
import java.util.concurrent.LinkedBlockingQueue;
22+
import java.util.concurrent.TimeUnit;
2223

2324
import rx.Notification;
2425
import rx.Observable;
@@ -89,7 +90,13 @@ public T next() {
8990

9091
private Notification<? extends T> take() {
9192
try {
92-
return notifications.take();
93+
Notification<? extends T> n = notifications.poll(10000, TimeUnit.MILLISECONDS);
94+
if(n == null) {
95+
System.err.println("Timed out waiting for value. File a bug at github.com/Netflix/RxJava");
96+
throw new RuntimeException("Timed out waiting for value. File a bug at github.com/Netflix/RxJava");
97+
} else {
98+
return n;
99+
}
93100
} catch (InterruptedException e) {
94101
throw Exceptions.propagate(e);
95102
}

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

+33
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,39 @@
4444
*/
4545
public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
4646

47+
/*
48+
* benjchristensen => This class is complex and I'm not a fan of it despite writing it. I want to give some background
49+
* as to why for anyone who wants to try and help improve it.
50+
*
51+
* One of my first implementations that added backpressure support (Producer.request) was fairly elegant and used a simple
52+
* queue draining approach. It was simple to understand as all onNext were added to their queues, then a single winner
53+
* would drain the queues, similar to observeOn. It killed the Netflix API when I canaried it. There were two problems:
54+
* (1) performance and (2) object allocation overhead causing massive GC pressure. Remember that merge is one of the most
55+
* used operators (mostly due to flatmap) and is therefore critical to and a limiter of performance in any application.
56+
*
57+
* All subsequent work on this class and the various fast-paths and branches within it have been to achieve the needed functionality
58+
* while reducing or eliminating object allocation and keeping performance acceptable.
59+
*
60+
* This has meant adopting strategies such as:
61+
*
62+
* - ring buffers instead of growable queues
63+
* - object pooling
64+
* - skipping request logic when downstream does not need backpressure
65+
* - ScalarValueQueue for optimizing synchronous single-value Observables
66+
* - adopting data structures that use Unsafe (and gating them based on environment so non-Oracle JVMs still work)
67+
*
68+
* It has definitely increased the complexity and maintenance cost of this class, but the performance gains have been significant.
69+
*
70+
* The biggest cost of the increased complexity is concurrency bugs and reasoning through what's going on.
71+
*
72+
* I'd love to have contributions that improve this class, but keep in mind the performance and GC pressure.
73+
* The benchmarks I use are in the JMH OperatorMergePerf class. GC memory pressure is tested using Java Flight Recorder
74+
* to track object allocation.
75+
*
76+
* TODO There is still a known concurrency bug somewhere either in this class, in SubscriptionIndexedRingBuffer or their relationship.
77+
* See https://github.com/Netflix/RxJava/issues/1420 for more information on this.
78+
*/
79+
4780
public OperatorMerge() {
4881
this.delayErrors = false;
4982
}

rxjava-core/src/main/java/rx/internal/util/SubscriptionIndexedRingBuffer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public boolean isUnsubscribed() {
5757
*
5858
* @return int index that can be used to remove a Subscription
5959
*/
60-
public int add(final T s) {
60+
public synchronized int add(final T s) {
61+
// TODO figure out how to remove synchronized here. See https://github.com/Netflix/RxJava/issues/1420
6162
if (unsubscribed == 1 || subscriptions == null) {
6263
s.unsubscribe();
6364
return -1;
@@ -116,13 +117,14 @@ public void unsubscribe() {
116117
public int forEach(Func1<T, Boolean> action) {
117118
return forEach(action, 0);
118119
}
119-
120+
120121
/**
121122
*
122123
* @param action
123124
* @return int of last index seen if forEach exited early
124125
*/
125-
public int forEach(Func1<T, Boolean> action, int startIndex) {
126+
public synchronized int forEach(Func1<T, Boolean> action, int startIndex) {
127+
// TODO figure out how to remove synchronized here. See https://github.com/Netflix/RxJava/issues/1420
126128
if (unsubscribed == 1 || subscriptions == null) {
127129
return 0;
128130
}

0 commit comments

Comments
 (0)