Skip to content

Commit 59ded83

Browse files
committed
Func0 can transparently implement java.util.concurrent.Callable.
This change doesn't change the API at all for users of Func0, but it makes all Func0 objects immediately reusable with any JDK API that accepts Callables. For example, a Func0 can now be submitted directly to an ExecutorService for asynchronous execution. It also allows the elimination of a small amount of redundant code within RxJava itself.
1 parent 2180f39 commit 59ded83

File tree

4 files changed

+43
-37
lines changed

4 files changed

+43
-37
lines changed

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/Async.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,9 +1596,13 @@ public static <R> Observable<R> fromAction(Action0 action, R result) {
15961596
* @see #start(rx.functions.Func0)
15971597
* @see #fromCallable(java.util.concurrent.Callable)
15981598
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
1599+
*
1600+
* @deprecated Unnecessary now that Func0 extends Callable. Just call
1601+
* {@link #fromCallable(Callable)} instead.
15991602
*/
1603+
@Deprecated
16001604
public static <R> Observable<R> fromFunc0(Func0<? extends R> function) {
1601-
return fromFunc0(function, Schedulers.computation());
1605+
return fromCallable(function);
16021606
}
16031607

16041608
/**
@@ -1674,9 +1678,13 @@ public static <R> Observable<R> fromAction(Action0 action, R result, Scheduler s
16741678
* @see #start(rx.functions.Func0)
16751679
* @see #fromCallable(java.util.concurrent.Callable)
16761680
* @see <a href="https://github.com/Netflix/RxJava/wiki/Async-Operators#fromfunc0">RxJava Wiki: fromFunc0()</a>
1681+
*
1682+
* @deprecated Unnecessary now that Func0 extends Callable. Just call
1683+
* {@link #fromCallable(Callable, Scheduler)} instead.
16771684
*/
1685+
@Deprecated
16781686
public static <R> Observable<R> fromFunc0(Func0<? extends R> function, Scheduler scheduler) {
1679-
return Observable.create(OperationFromFunctionals.fromFunc0(function)).subscribeOn(scheduler);
1687+
return fromCallable(function, scheduler);
16801688
}
16811689

16821690
/**

rxjava-contrib/rxjava-async-util/src/main/java/rx/util/async/operators/OperationFromFunctionals.java

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,28 @@ public final class OperationFromFunctionals {
3838
public static <R> OnSubscribeFunc<R> fromAction(Action0 action, R result) {
3939
return new InvokeAsync<R>(Actions.toFunc(action, result));
4040
}
41-
42-
/** Subscriber function that invokes a function and returns its value. */
41+
42+
/**
43+
* Subscriber function that invokes a function and returns its value.
44+
*
45+
* @deprecated Unnecessary now that Func0 extends Callable. Just call
46+
* {@link #fromCallable(Callable)} instead.
47+
*/
48+
@Deprecated
4349
public static <R> OnSubscribeFunc<R> fromFunc0(Func0<? extends R> function) {
44-
return new InvokeAsync<R>(function);
50+
return fromCallable(function);
4551
}
4652

4753
/**
4854
* Subscriber function that invokes the callable and returns its value or
4955
* propagates its checked exception.
5056
*/
5157
public static <R> OnSubscribeFunc<R> fromCallable(Callable<? extends R> callable) {
52-
return new InvokeAsyncCallable<R>(callable);
58+
return new InvokeAsync<R>(callable);
5359
}
5460
/** Subscriber function that invokes a runnable and returns the given result. */
5561
public static <R> OnSubscribeFunc<R> fromRunnable(final Runnable run, final R result) {
56-
return new InvokeAsync(new Func0<R>() {
62+
return new InvokeAsync<R>(new Func0<R>() {
5763
@Override
5864
public R call() {
5965
run.run();
@@ -62,38 +68,13 @@ public R call() {
6268
});
6369
}
6470

65-
/**
66-
* Invokes a function when an observer subscribes.
67-
* @param <R> the return type
68-
*/
69-
static final class InvokeAsync<R> implements OnSubscribeFunc<R> {
70-
final Func0<? extends R> function;
71-
public InvokeAsync(Func0<? extends R> function) {
72-
if (function == null) {
73-
throw new NullPointerException("function");
74-
}
75-
this.function = function;
76-
}
77-
@Override
78-
public Subscription onSubscribe(Observer<? super R> t1) {
79-
Subscription s = Subscriptions.empty();
80-
try {
81-
t1.onNext(function.call());
82-
} catch (Throwable t) {
83-
t1.onError(t);
84-
return s;
85-
}
86-
t1.onCompleted();
87-
return s;
88-
}
89-
}
9071
/**
9172
* Invokes a java.util.concurrent.Callable when an observer subscribes.
9273
* @param <R> the return type
9374
*/
94-
static final class InvokeAsyncCallable<R> implements OnSubscribeFunc<R> {
75+
static final class InvokeAsync<R> implements OnSubscribeFunc<R> {
9576
final Callable<? extends R> callable;
96-
public InvokeAsyncCallable(Callable<? extends R> callable) {
77+
public InvokeAsync(Callable<? extends R> callable) {
9778
if (callable == null) {
9879
throw new NullPointerException("function");
9980
}

rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationFromFunctionalsTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ public void call() {
110110

111111
testRunShouldThrow(source, RuntimeException.class);
112112
}
113+
114+
/**
115+
* @deprecated {@link Func0} now extends {@link Callable}, so
116+
* {@link Async#fromFunc0(Func0)} is unnecessary. Once it's
117+
* removed, this test can be removed as well.
118+
*/
119+
@Deprecated
113120
@Test
114121
public void testFromFunc0() {
115122
Func0<Integer> func = new Func0<Integer>() {
@@ -139,7 +146,14 @@ public Integer call() {
139146
verify(observer, never()).onError(any(Throwable.class));
140147
}
141148
}
142-
149+
150+
/**
151+
* @deprecated {@link Func0} now extends {@link Callable}, so
152+
* {@link Async#fromFunc0(Func0, rx.Scheduler)} is
153+
* unnecessary. Once it's removed, this test can be removed
154+
* as well.
155+
*/
156+
@Deprecated
143157
@Test
144158
public void testFromFunc0Throws() {
145159
Func0<Integer> func = new Func0<Integer>() {

rxjava-core/src/main/java/rx/functions/Func0.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package rx.functions;
1717

18-
public interface Func0<R> extends Function {
18+
import java.util.concurrent.Callable;
19+
20+
public interface Func0<R> extends Function, Callable<R> {
21+
@Override
1922
public R call();
20-
}
23+
}

0 commit comments

Comments
 (0)