Skip to content

Commit 4b29429

Browse files
committed
Merge pull request #3286 from hyleung/blocking-single
1.x: Implements BlockingSingle
2 parents c63fa2e + d55cd40 commit 4b29429

File tree

7 files changed

+380
-22
lines changed

7 files changed

+380
-22
lines changed

src/main/java/rx/Single.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import rx.internal.operators.OperatorTimeout;
4242
import rx.internal.operators.OperatorZip;
4343
import rx.internal.producers.SingleDelayedProducer;
44+
import rx.singles.BlockingSingle;
4445
import rx.observers.SafeSubscriber;
4546
import rx.plugins.RxJavaObservableExecutionHook;
4647
import rx.plugins.RxJavaPlugins;
@@ -1794,6 +1795,21 @@ public final Single<T> timeout(long timeout, TimeUnit timeUnit, Single<? extends
17941795
return lift(new OperatorTimeout<T>(timeout, timeUnit, asObservable(other), scheduler));
17951796
}
17961797

1798+
/**
1799+
* Converts a Single into a {@link BlockingSingle} (a Single with blocking operators).
1800+
* <dl>
1801+
* <dt><b>Scheduler:</b></dt>
1802+
* <dd>{@code toBlocking} does not operate by default on a particular {@link Scheduler}.</dd>
1803+
* </dl>
1804+
*
1805+
* @return a {@code BlockingSingle} version of this Single.
1806+
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
1807+
*/
1808+
@Experimental
1809+
public final BlockingSingle<T> toBlocking() {
1810+
return BlockingSingle.from(this);
1811+
}
1812+
17971813
/**
17981814
* Returns a Single that emits the result of applying a specified function to the pair of items emitted by
17991815
* the source Single and another specified Single.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.util;
18+
19+
import rx.Subscription;
20+
import rx.annotations.Experimental;
21+
22+
import java.util.concurrent.CountDownLatch;
23+
24+
/**
25+
* Utility functions relating to blocking types.
26+
* <p/>
27+
* Not intended to be part of the public API.
28+
*/
29+
@Experimental
30+
public final class BlockingUtils {
31+
32+
private BlockingUtils() { }
33+
34+
/**
35+
* Blocks and waits for a {@link Subscription} to complete.
36+
*
37+
* @param latch a CountDownLatch
38+
* @param subscription the Subscription to wait on.
39+
*/
40+
@Experimental
41+
public static void awaitForComplete(CountDownLatch latch, Subscription subscription) {
42+
if (latch.getCount() == 0) {
43+
// Synchronous observable completes before awaiting for it.
44+
// Skip await so InterruptedException will never be thrown.
45+
return;
46+
}
47+
// block until the subscription completes and then return
48+
try {
49+
latch.await();
50+
} catch (InterruptedException e) {
51+
subscription.unsubscribe();
52+
// set the interrupted flag again so callers can still get it
53+
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
54+
Thread.currentThread().interrupt();
55+
// using Runtime so it is not checked
56+
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
57+
}
58+
}
59+
}

src/main/java/rx/observables/BlockingObservable.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.exceptions.OnErrorNotImplementedException;
2727
import rx.functions.*;
2828
import rx.internal.operators.*;
29+
import rx.internal.util.BlockingUtils;
2930
import rx.internal.util.UtilityFunctions;
3031
import rx.subscriptions.Subscriptions;
3132

@@ -123,7 +124,7 @@ public void onNext(T args) {
123124
onNext.call(args);
124125
}
125126
});
126-
awaitForComplete(latch, subscription);
127+
BlockingUtils.awaitForComplete(latch, subscription);
127128

128129
if (exceptionFromOnError.get() != null) {
129130
if (exceptionFromOnError.get() instanceof RuntimeException) {
@@ -446,7 +447,7 @@ public void onNext(final T item) {
446447
returnItem.set(item);
447448
}
448449
});
449-
awaitForComplete(latch, subscription);
450+
BlockingUtils.awaitForComplete(latch, subscription);
450451

451452
if (returnException.get() != null) {
452453
if (returnException.get() instanceof RuntimeException) {
@@ -458,25 +459,6 @@ public void onNext(final T item) {
458459

459460
return returnItem.get();
460461
}
461-
462-
private void awaitForComplete(CountDownLatch latch, Subscription subscription) {
463-
if (latch.getCount() == 0) {
464-
// Synchronous observable completes before awaiting for it.
465-
// Skip await so InterruptedException will never be thrown.
466-
return;
467-
}
468-
// block until the subscription completes and then return
469-
try {
470-
latch.await();
471-
} catch (InterruptedException e) {
472-
subscription.unsubscribe();
473-
// set the interrupted flag again so callers can still get it
474-
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
475-
Thread.currentThread().interrupt();
476-
// using Runtime so it is not checked
477-
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
478-
}
479-
}
480462

481463
/**
482464
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
@@ -502,7 +484,7 @@ public void onCompleted() {
502484
}
503485
});
504486

505-
awaitForComplete(cdl, s);
487+
BlockingUtils.awaitForComplete(cdl, s);
506488
Throwable e = error[0];
507489
if (e != null) {
508490
if (e instanceof RuntimeException) {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.singles;
18+
19+
import rx.Single;
20+
import rx.SingleSubscriber;
21+
import rx.Subscription;
22+
import rx.annotations.Experimental;
23+
import rx.internal.operators.BlockingOperatorToFuture;
24+
import rx.internal.util.BlockingUtils;
25+
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
/**
31+
* {@code BlockingSingle} is a blocking "version" of {@link Single} that provides blocking
32+
* operators.
33+
* <p/>
34+
* You construct a {@code BlockingSingle} from a {@code Single} with {@link #from(Single)}
35+
* or {@link Single#toBlocking()}.
36+
*/
37+
@Experimental
38+
public class BlockingSingle<T> {
39+
private final Single<? extends T> single;
40+
41+
private BlockingSingle(Single<? extends T> single) {
42+
this.single = single;
43+
}
44+
45+
/**
46+
* Converts a {@link Single} into a {@code BlockingSingle}.
47+
*
48+
* @param single the {@link Single} you want to convert
49+
* @return a {@code BlockingSingle} version of {@code single}
50+
*/
51+
@Experimental
52+
public static <T> BlockingSingle<T> from(Single<? extends T> single) {
53+
return new BlockingSingle<T>(single);
54+
}
55+
56+
/**
57+
* Returns the item emitted by this {@code BlockingSingle}.
58+
* <p/>
59+
* If the underlying {@link Single} returns successfully, the value emitted
60+
* by the {@link Single} is returned. If the {@link Single} emits an error,
61+
* the throwable emitted ({@link SingleSubscriber#onError(Throwable)}) is
62+
* thrown.
63+
*
64+
* @return the value emitted by this {@code BlockingSingle}
65+
*/
66+
@Experimental
67+
public T value() {
68+
final AtomicReference<T> returnItem = new AtomicReference<T>();
69+
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
70+
final CountDownLatch latch = new CountDownLatch(1);
71+
Subscription subscription = single.subscribe(new SingleSubscriber<T>() {
72+
@Override
73+
public void onSuccess(T value) {
74+
returnItem.set(value);
75+
latch.countDown();
76+
}
77+
78+
@Override
79+
public void onError(Throwable error) {
80+
returnException.set(error);
81+
latch.countDown();
82+
}
83+
});
84+
85+
BlockingUtils.awaitForComplete(latch, subscription);
86+
Throwable throwable = returnException.get();
87+
if (throwable != null) {
88+
if (throwable instanceof RuntimeException) {
89+
throw (RuntimeException) throwable;
90+
}
91+
throw new RuntimeException(throwable);
92+
}
93+
return returnItem.get();
94+
}
95+
96+
/**
97+
* Returns a {@link Future} representing the value emitted by this {@code BlockingSingle}.
98+
*
99+
* @return a {@link Future} that returns the value
100+
*/
101+
@Experimental
102+
public Future<T> toFuture() {
103+
return BlockingOperatorToFuture.toFuture(single.toObservable());
104+
}
105+
}
106+

src/test/java/rx/SingleTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package rx;
1414

1515
import static org.junit.Assert.assertEquals;
16+
import static org.junit.Assert.assertNotNull;
1617
import static org.junit.Assert.assertSame;
1718
import static org.junit.Assert.assertTrue;
1819
import static org.junit.Assert.fail;
@@ -40,6 +41,7 @@
4041
import rx.functions.Func1;
4142
import rx.functions.Func2;
4243
import rx.schedulers.TestScheduler;
44+
import rx.singles.BlockingSingle;
4345
import rx.observers.TestSubscriber;
4446
import rx.schedulers.Schedulers;
4547
import rx.subscriptions.Subscriptions;
@@ -260,6 +262,14 @@ public void call(SingleSubscriber<? super String> s) {
260262
ts.assertValue("hello");
261263
}
262264

265+
@Test
266+
public void testToBlocking() {
267+
Single<String> s = Single.just("one");
268+
BlockingSingle<String> blocking = s.toBlocking();
269+
assertNotNull(blocking);
270+
assertEquals("one", blocking.value());
271+
}
272+
263273
@Test
264274
public void testUnsubscribe() throws InterruptedException {
265275
TestSubscriber<String> ts = new TestSubscriber<String>();
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.util;
18+
19+
import static org.mockito.Mockito.*;
20+
import static org.junit.Assert.*;
21+
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
25+
import org.junit.Test;
26+
27+
import rx.Observable;
28+
import rx.Subscriber;
29+
import rx.Subscription;
30+
import rx.schedulers.Schedulers;
31+
32+
/**
33+
* Test suite for {@link BlockingUtils}.
34+
*/
35+
public class BlockingUtilsTest {
36+
@Test
37+
public void awaitCompleteShouldReturnIfCountIsZero() {
38+
Subscription subscription = mock(Subscription.class);
39+
CountDownLatch latch = new CountDownLatch(0);
40+
BlockingUtils.awaitForComplete(latch, subscription);
41+
verifyZeroInteractions(subscription);
42+
}
43+
44+
@Test
45+
public void awaitCompleteShouldReturnOnEmpty() {
46+
final CountDownLatch latch = new CountDownLatch(1);
47+
Subscriber<Object> subscription = createSubscription(latch);
48+
Observable<Object> observable = Observable.empty().subscribeOn(Schedulers.newThread());
49+
observable.subscribe(subscription);
50+
BlockingUtils.awaitForComplete(latch, subscription);
51+
}
52+
53+
@Test
54+
public void awaitCompleteShouldReturnOnError() {
55+
final CountDownLatch latch = new CountDownLatch(1);
56+
Subscriber<Object> subscription = createSubscription(latch);
57+
Observable<Object> observable = Observable.error(new RuntimeException()).subscribeOn(Schedulers.newThread());
58+
observable.subscribe(subscription);
59+
BlockingUtils.awaitForComplete(latch, subscription);
60+
}
61+
62+
@Test
63+
public void shouldThrowRuntimeExceptionOnThreadInterrupted() throws Exception {
64+
final CountDownLatch latch = new CountDownLatch(1);
65+
final Subscription subscription = mock(Subscription.class);
66+
final AtomicReference<Exception> caught = new AtomicReference<Exception>();
67+
Thread thread = new Thread(new Runnable() {
68+
@Override
69+
public void run() {
70+
Thread.currentThread().interrupt();
71+
try {
72+
BlockingUtils.awaitForComplete(latch, subscription);
73+
} catch (RuntimeException e) {
74+
caught.set(e);
75+
}
76+
}
77+
});
78+
thread.run();
79+
verify(subscription).unsubscribe();
80+
Exception actual = caught.get();
81+
assertNotNull(actual);
82+
assertNotNull(actual.getCause());
83+
assertTrue(actual.getCause() instanceof InterruptedException);
84+
}
85+
86+
87+
private static <T> Subscriber<T> createSubscription(final CountDownLatch latch) {
88+
return new Subscriber<T>() {
89+
@Override
90+
public void onNext(T t) {
91+
//no-oop
92+
}
93+
94+
@Override
95+
public void onError(Throwable e) {
96+
latch.countDown();
97+
}
98+
99+
@Override
100+
public void onCompleted() {
101+
latch.countDown();
102+
}
103+
};
104+
}
105+
}

0 commit comments

Comments
 (0)