Skip to content

Commit bccac64

Browse files
Merge pull request #905 from benjchristensen/scheduler-defaults-plugin
RxJavaSchedulers Plugin
2 parents 44b015f + 9178d14 commit bccac64

File tree

5 files changed

+189
-8
lines changed

5 files changed

+189
-8
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.plugins;
17+
18+
import rx.Scheduler;
19+
import rx.functions.Func0;
20+
21+
/**
22+
* Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods.
23+
* <p>
24+
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: <a
25+
* href="https://github.com/Netflix/RxJava/wiki/Plugins">https://github.com/Netflix/RxJava/wiki/Plugins</a>.
26+
*/
27+
public abstract class RxJavaDefaultSchedulers {
28+
29+
/**
30+
* Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used.
31+
*/
32+
public abstract Func0<Scheduler> getComputationSchedulerFactory();
33+
34+
/**
35+
* Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used.
36+
*/
37+
public abstract Func0<Scheduler> getIOSchedulerFactory();
38+
39+
/**
40+
* Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used.
41+
*/
42+
public abstract Func0<Scheduler> getNewThreadSchedulerFactory();
43+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.plugins;
17+
18+
import rx.Scheduler;
19+
import rx.functions.Func0;
20+
21+
/**
22+
* Default implementation of {@link RxJavaErrorHandler} that does nothing.
23+
*
24+
* @ExcludeFromJavadoc
25+
*/
26+
public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers {
27+
28+
private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault();
29+
30+
public Func0<Scheduler> getComputationSchedulerFactory() {
31+
return null;
32+
}
33+
34+
public Func0<Scheduler> getIOSchedulerFactory() {
35+
return null;
36+
}
37+
38+
public Func0<Scheduler> getNewThreadSchedulerFactory() {
39+
return null;
40+
}
41+
42+
public static RxJavaDefaultSchedulers getInstance() {
43+
return INSTANCE;
44+
}
45+
46+
}

rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class RxJavaPlugins {
3131

3232
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
3333
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
34+
private final AtomicReference<RxJavaDefaultSchedulers> schedulerOverrides = new AtomicReference<RxJavaDefaultSchedulers>();
3435

3536
public static RxJavaPlugins getInstance() {
3637
return INSTANCE;
@@ -149,4 +150,43 @@ private static Object getPluginImplementationViaProperty(Class<?> pluginClass) {
149150
return null;
150151
}
151152
}
153+
154+
/**
155+
* Retrieve instance of {@link RxJavaDefaultSchedulers} to use based on order of precedence as defined in {@link RxJavaPlugins} class header.
156+
* <p>
157+
* Override default by using {@link #registerDefaultSchedulers(RxJavaDefaultSchedulers)} or setting property: <code>rxjava.plugin.RxJavaDefaultSchedulers.implementation</code> with the full
158+
* classname to
159+
* load.
160+
*
161+
* @return {@link RxJavaErrorHandler} implementation to use
162+
*/
163+
public RxJavaDefaultSchedulers getDefaultSchedulers() {
164+
if (schedulerOverrides.get() == null) {
165+
// check for an implementation from System.getProperty first
166+
Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class);
167+
if (impl == null) {
168+
// nothing set via properties so initialize with default
169+
schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance());
170+
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
171+
} else {
172+
// we received an implementation from the system property so use it
173+
schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl);
174+
}
175+
}
176+
return schedulerOverrides.get();
177+
}
178+
179+
/**
180+
* Register a {@link RxJavaDefaultSchedulers} implementation as a global override of any injected or default implementations.
181+
*
182+
* @param impl
183+
* {@link RxJavaDefaultSchedulers} implementation
184+
* @throws IllegalStateException
185+
* if called more than once or after the default was initialized (if usage occurs before trying to register)
186+
*/
187+
public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) {
188+
if (!schedulerOverrides.compareAndSet(null, impl)) {
189+
throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get());
190+
}
191+
}
152192
}

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,14 @@ public Thread newThread(Runnable r) {
4646
}
4747
};
4848

49+
@Deprecated
4950
public static NewThreadScheduler getInstance() {
5051
return INSTANCE;
5152
}
53+
54+
/* package */ static NewThreadScheduler instance() {
55+
return INSTANCE;
56+
}
5257

5358
private NewThreadScheduler() {
5459

rxjava-core/src/main/java/rx/schedulers/Schedulers.java

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,62 @@
2323
import java.util.concurrent.atomic.AtomicLong;
2424

2525
import rx.Scheduler;
26+
import rx.functions.Func0;
27+
import rx.plugins.RxJavaPlugins;
2628

2729
/**
2830
* Static factory methods for creating Schedulers.
2931
*/
3032
public class Schedulers {
31-
private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor();
32-
private static final Executor IO_EXECUTOR = createIOExecutor();
33+
34+
private final Func0<Scheduler> computationScheduler;
35+
private final Func0<Scheduler> ioScheduler;
36+
private final Func0<Scheduler> newThreadScheduler;
37+
38+
private static final Schedulers INSTANCE = new Schedulers();
3339

3440
private Schedulers() {
41+
Func0<Scheduler> c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationSchedulerFactory();
42+
if (c != null) {
43+
computationScheduler = c;
44+
} else {
45+
computationScheduler = new Func0<Scheduler>() {
46+
47+
@Override
48+
public Scheduler call() {
49+
return executor(createComputationExecutor());
50+
}
51+
52+
};
53+
}
54+
55+
Func0<Scheduler> io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOSchedulerFactory();
56+
if (io != null) {
57+
ioScheduler = io;
58+
} else {
59+
ioScheduler = new Func0<Scheduler>() {
60+
61+
@Override
62+
public Scheduler call() {
63+
return executor(createIOExecutor());
64+
}
65+
66+
};
67+
}
68+
69+
Func0<Scheduler> nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadSchedulerFactory();
70+
if (nt != null) {
71+
newThreadScheduler = nt;
72+
} else {
73+
newThreadScheduler = new Func0<Scheduler>() {
74+
75+
@Override
76+
public Scheduler call() {
77+
return NewThreadScheduler.instance();
78+
}
79+
80+
};
81+
}
3582

3683
}
3784

@@ -63,14 +110,14 @@ public static Scheduler currentThread() {
63110
public static Scheduler trampoline() {
64111
return TrampolineScheduler.getInstance();
65112
}
66-
113+
67114
/**
68115
* {@link Scheduler} that creates a new {@link Thread} for each unit of work.
69116
*
70117
* @return {@link NewThreadScheduler} instance
71118
*/
72119
public static Scheduler newThread() {
73-
return NewThreadScheduler.getInstance();
120+
return INSTANCE.newThreadScheduler.call();
74121
}
75122

76123
/**
@@ -107,7 +154,7 @@ public static Scheduler executor(ScheduledExecutorService executor) {
107154
*/
108155
@Deprecated
109156
public static Scheduler threadPoolForComputation() {
110-
return executor(COMPUTATION_EXECUTOR);
157+
return computation();
111158
}
112159

113160
/**
@@ -120,7 +167,7 @@ public static Scheduler threadPoolForComputation() {
120167
* @return {@link Scheduler} for computation-bound work.
121168
*/
122169
public static Scheduler computation() {
123-
return executor(COMPUTATION_EXECUTOR);
170+
return INSTANCE.computationScheduler.call();
124171
}
125172

126173
/**
@@ -137,7 +184,7 @@ public static Scheduler computation() {
137184
*/
138185
@Deprecated
139186
public static Scheduler threadPoolForIO() {
140-
return executor(IO_EXECUTOR);
187+
return io();
141188
}
142189

143190
/**
@@ -152,7 +199,7 @@ public static Scheduler threadPoolForIO() {
152199
* @return {@link ExecutorScheduler} for IO-bound work.
153200
*/
154201
public static Scheduler io() {
155-
return executor(IO_EXECUTOR);
202+
return INSTANCE.ioScheduler.call();
156203
}
157204

158205
private static ScheduledExecutorService createComputationExecutor() {

0 commit comments

Comments
 (0)