Skip to content

Commit a1df031

Browse files
Merge pull request #832 from duncani/fixes
[Issue #831] Fix for OperationJoin race condition
2 parents 9520d63 + a225944 commit a1df031

File tree

1 file changed

+29
-20
lines changed

1 file changed

+29
-20
lines changed

rxjava-core/src/main/java/rx/operators/OperationJoin.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,12 @@ protected void expire(int id, Subscription resource) {
109109

110110
@Override
111111
public void onNext(TLeft args) {
112-
int id;
112+
int id, highRightId;
113+
113114
synchronized (gate) {
114115
id = leftId++;
115116
leftMap.put(id, args);
117+
highRightId = rightId;
116118
}
117119
SerialSubscription md = new SerialSubscription();
118120
group.add(md);
@@ -129,16 +131,19 @@ public void onNext(TLeft args) {
129131
md.setSubscription(duration.subscribe(new LeftDurationObserver(id, md)));
130132

131133
synchronized (gate) {
132-
for (TRight r : rightMap.values()) {
133-
R result;
134-
try {
135-
result = resultSelector.call(args, r);
136-
} catch (Throwable t) {
137-
observer.onError(t);
138-
cancel.unsubscribe();
139-
return;
134+
for (Map.Entry<Integer, TRight> entry : rightMap.entrySet()) {
135+
if (entry.getKey() < highRightId) {
136+
TRight r = entry.getValue();
137+
R result;
138+
try {
139+
result = resultSelector.call(args, r);
140+
} catch (Throwable t) {
141+
observer.onError(t);
142+
cancel.unsubscribe();
143+
return;
144+
}
145+
observer.onNext(result);
140146
}
141-
observer.onNext(result);
142147
}
143148
}
144149
}
@@ -212,10 +217,11 @@ void expire(int id, Subscription resource) {
212217

213218
@Override
214219
public void onNext(TRight args) {
215-
int id = 0;
220+
int id = 0, highLeftId;
216221
synchronized (gate) {
217222
id = rightId++;
218223
rightMap.put(id, args);
224+
highLeftId = leftId;
219225
}
220226
SerialSubscription md = new SerialSubscription();
221227
group.add(md);
@@ -232,16 +238,19 @@ public void onNext(TRight args) {
232238
md.setSubscription(duration.subscribe(new RightDurationObserver(id, md)));
233239

234240
synchronized (gate) {
235-
for (TLeft lv : leftMap.values()) {
236-
R result;
237-
try {
238-
result = resultSelector.call(lv, args);
239-
} catch (Throwable t) {
240-
observer.onError(t);
241-
cancel.unsubscribe();
242-
return;
241+
for (Map.Entry<Integer, TLeft> entry : leftMap.entrySet()) {
242+
if (entry.getKey() < highLeftId) {
243+
TLeft lv = entry.getValue();
244+
R result;
245+
try {
246+
result = resultSelector.call(lv, args);
247+
} catch (Throwable t) {
248+
observer.onError(t);
249+
cancel.unsubscribe();
250+
return;
251+
}
252+
observer.onNext(result);
243253
}
244-
observer.onNext(result);
245254
}
246255
}
247256
}

0 commit comments

Comments
 (0)