Skip to content

Commit 5ab00f9

Browse files
committed
Merge pull request #3645 from akarnokd/AmbStateFix1x
1.x: fix Amb sharing the choice among all subscribers
2 parents 4e31a5f + 0f8caf3 commit 5ab00f9

File tree

2 files changed

+24
-3
lines changed

2 files changed

+24
-3
lines changed

src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,15 +353,15 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
353353
//give default access instead of private as a micro-optimization
354354
//for access from anonymous classes below
355355
final Iterable<? extends Observable<? extends T>> sources;
356-
final Selection<T> selection = new Selection<T>();
357-
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;
358356

359357
private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
360358
this.sources = sources;
361359
}
362360

363361
@Override
364362
public void call(final Subscriber<? super T> subscriber) {
363+
final Selection<T> selection = new Selection<T>();
364+
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;
365365

366366
//setup unsubscription of all the subscribers to the sources
367367
subscriber.add(Subscriptions.create(new Action0() {

src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,5 +288,26 @@ public void call(Object t) {
288288
}).ambWith(Observable.just(2)).toBlocking().single();
289289
assertEquals(1, result);
290290
}
291-
291+
292+
@Test(timeout = 1000)
293+
public void testMultipleUse() {
294+
TestSubscriber<Long> ts1 = new TestSubscriber<Long>();
295+
TestSubscriber<Long> ts2 = new TestSubscriber<Long>();
296+
297+
Observable<Long> amb = Observable.timer(100, TimeUnit.MILLISECONDS).ambWith(Observable.timer(200, TimeUnit.MILLISECONDS));
298+
299+
amb.subscribe(ts1);
300+
amb.subscribe(ts2);
301+
302+
ts1.awaitTerminalEvent();
303+
ts2.awaitTerminalEvent();
304+
305+
ts1.assertValue(0L);
306+
ts1.assertCompleted();
307+
ts1.assertNoErrors();
308+
309+
ts2.assertValue(0L);
310+
ts2.assertCompleted();
311+
ts2.assertNoErrors();
312+
}
292313
}

0 commit comments

Comments
 (0)