Skip to content

Commit 8fa3a37

Browse files
committed
Scheduler correctness improvements.
1 parent 29f564a commit 8fa3a37

File tree

2 files changed

+138
-113
lines changed

2 files changed

+138
-113
lines changed

rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
import rx.Scheduler;
88
import rx.Subscription;
99
import rx.functions.Action0;
10-
import rx.schedulers.NewThreadScheduler.OnActionComplete;
10+
import rx.schedulers.NewThreadScheduler.NewThreadWorker.Remover;
11+
import rx.schedulers.NewThreadScheduler.NewThreadWorker.ScheduledAction;
1112
import rx.subscriptions.CompositeSubscription;
1213
import rx.subscriptions.Subscriptions;
1314

1415
/* package */class EventLoopsScheduler extends Scheduler {
15-
16-
private static class ComputationSchedulerPool {
17-
final int cores = Runtime.getRuntime().availableProcessors();
16+
/** Manages a fixed number of workers. */
17+
static final class FixedSchedulerPool {
18+
final int cores;
1819
final ThreadFactory factory = new ThreadFactory() {
1920
final AtomicInteger counter = new AtomicInteger();
2021

@@ -26,47 +27,46 @@ public Thread newThread(Runnable r) {
2627
}
2728
};
2829

29-
final EventLoopScheduler[] eventLoops;
30+
final PoolWorker[] eventLoops;
31+
long n;
3032

31-
ComputationSchedulerPool() {
33+
FixedSchedulerPool() {
3234
// initialize event loops
33-
eventLoops = new EventLoopScheduler[cores];
35+
this.cores = Runtime.getRuntime().availableProcessors();
36+
this.eventLoops = new PoolWorker[cores];
3437
for (int i = 0; i < cores; i++) {
35-
eventLoops[i] = new EventLoopScheduler(factory);
38+
this.eventLoops[i] = new PoolWorker(factory);
3639
}
3740
}
3841

39-
private static ComputationSchedulerPool INSTANCE = new ComputationSchedulerPool();
40-
41-
long n = 0;
42-
43-
public EventLoopScheduler getEventLoop() {
44-
// round-robin selection (improvements to come)
45-
return eventLoops[(int) (n++ % cores)];
42+
public PoolWorker getEventLoop() {
43+
// simple round robin, improvements to come
44+
return eventLoops[(int)(n++ % cores)];
4645
}
47-
4846
}
4947

48+
final FixedSchedulerPool pool;
49+
50+
/**
51+
* Create a scheduler with pool size equal to the available processor
52+
* count and using least-recent worker selection policy.
53+
*/
54+
EventLoopsScheduler() {
55+
pool = new FixedSchedulerPool();
56+
}
57+
5058
@Override
5159
public Worker createWorker() {
52-
return new EventLoop();
60+
return new EventLoopWorker(pool.getEventLoop());
5361
}
5462

55-
private static class EventLoop extends Scheduler.Worker {
63+
private static class EventLoopWorker extends Scheduler.Worker {
5664
private final CompositeSubscription innerSubscription = new CompositeSubscription();
57-
private final EventLoopScheduler pooledEventLoop;
58-
private final OnActionComplete onComplete;
59-
60-
EventLoop() {
61-
pooledEventLoop = ComputationSchedulerPool.INSTANCE.getEventLoop();
62-
onComplete = new OnActionComplete() {
65+
private final PoolWorker poolWorker;
6366

64-
@Override
65-
public void complete(Subscription s) {
66-
innerSubscription.remove(s);
67-
}
68-
69-
};
67+
EventLoopWorker(PoolWorker poolWorker) {
68+
this.poolWorker = poolWorker;
69+
7070
}
7171

7272
@Override
@@ -81,29 +81,25 @@ public boolean isUnsubscribed() {
8181

8282
@Override
8383
public Subscription schedule(Action0 action) {
84-
if (innerSubscription.isUnsubscribed()) {
85-
// don't schedule, we are unsubscribed
86-
return Subscriptions.empty();
87-
}
88-
return pooledEventLoop.schedule(action, onComplete);
84+
return schedule(action, 0, null);
8985
}
90-
9186
@Override
9287
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
9388
if (innerSubscription.isUnsubscribed()) {
9489
// don't schedule, we are unsubscribed
9590
return Subscriptions.empty();
9691
}
9792

98-
return pooledEventLoop.schedule(action, delayTime, unit, onComplete);
93+
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
94+
innerSubscription.add(s);
95+
s.addParent(innerSubscription);
96+
return s;
9997
}
100-
10198
}
102-
103-
private static class EventLoopScheduler extends NewThreadScheduler.EventLoopScheduler {
104-
EventLoopScheduler(ThreadFactory threadFactory) {
99+
100+
private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker {
101+
PoolWorker(ThreadFactory threadFactory) {
105102
super(threadFactory);
106103
}
107104
}
108-
109-
}
105+
}

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 99 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.ExecutorService;
1918
import java.util.concurrent.Executors;
20-
import java.util.concurrent.ScheduledFuture;
19+
import java.util.concurrent.Future;
20+
import java.util.concurrent.ScheduledExecutorService;
2121
import java.util.concurrent.ThreadFactory;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.concurrent.atomic.AtomicLong;
24-
import java.util.concurrent.atomic.AtomicReference;
2525

2626
import rx.Scheduler;
2727
import rx.Subscription;
@@ -56,15 +56,15 @@ private NewThreadScheduler() {
5656

5757
@Override
5858
public Worker createWorker() {
59-
return new EventLoopScheduler(THREAD_FACTORY);
59+
return new NewThreadWorker(THREAD_FACTORY);
6060
}
6161

62-
/* package */static class EventLoopScheduler extends Scheduler.Worker implements Subscription {
62+
/* package */static class NewThreadWorker extends Scheduler.Worker implements Subscription {
6363
private final CompositeSubscription innerSubscription = new CompositeSubscription();
64-
private final ExecutorService executor;
64+
private final ScheduledExecutorService executor;
6565

66-
/* package */EventLoopScheduler(ThreadFactory threadFactory) {
67-
executor = Executors.newSingleThreadExecutor(threadFactory);
66+
/* package */NewThreadWorker(ThreadFactory threadFactory) {
67+
executor = Executors.newScheduledThreadPool(1, threadFactory);
6868
}
6969

7070
@Override
@@ -73,80 +73,109 @@ public Subscription schedule(final Action0 action) {
7373
}
7474

7575
/* package */Subscription schedule(final Action0 action, final OnActionComplete onComplete) {
76+
return scheduleActual(action, 0, null);
77+
}
78+
79+
@Override
80+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
7681
if (innerSubscription.isUnsubscribed()) {
77-
// don't schedule, we are unsubscribed
7882
return Subscriptions.empty();
7983
}
80-
81-
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
82-
Subscription s = Subscriptions.from(executor.submit(new Runnable() {
83-
84-
@Override
85-
public void run() {
86-
try {
87-
if (innerSubscription.isUnsubscribed()) {
88-
return;
89-
}
90-
action.call();
91-
} finally {
92-
// remove the subscription now that we're completed
93-
Subscription s = sf.get();
94-
if (s != null) {
95-
innerSubscription.remove(s);
96-
}
97-
if (onComplete != null) {
98-
onComplete.complete(s);
99-
}
100-
}
101-
}
102-
}));
103-
104-
sf.set(s);
105-
innerSubscription.add(s);
106-
return s;
84+
return scheduleActual(action, delayTime, unit);
10785
}
10886

109-
@Override
110-
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
111-
return schedule(action, delayTime, unit, null);
87+
/* package */ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
88+
ScheduledAction run = new ScheduledAction(action, innerSubscription);
89+
Future<?> f;
90+
if (delayTime <= 0) {
91+
f = executor.submit(run);
92+
} else {
93+
f = executor.schedule(run, delayTime, unit);
94+
}
95+
run.add(Subscriptions.from(f));
96+
97+
return run;
11298
}
99+
100+
/** Remove a child subscription from a composite when unsubscribing. */
101+
public static final class Remover implements Subscription {
102+
final Subscription s;
103+
final CompositeSubscription parent;
104+
final AtomicBoolean once;
105+
106+
public Remover(Subscription s, CompositeSubscription parent) {
107+
this.s = s;
108+
this.parent = parent;
109+
this.once = new AtomicBoolean();
110+
}
111+
112+
@Override
113+
public boolean isUnsubscribed() {
114+
return s.isUnsubscribed();
115+
}
116+
117+
@Override
118+
public void unsubscribe() {
119+
if (once.compareAndSet(false, true)) {
120+
parent.remove(s);
121+
}
122+
}
123+
124+
}
125+
/**
126+
* A runnable that executes an Action0 and can be cancelled
127+
* The analogue is the Subscriber in respect of an Observer.
128+
*/
129+
public static final class ScheduledAction implements Runnable, Subscription {
130+
final CompositeSubscription cancel;
131+
final Action0 action;
132+
final CompositeSubscription parent;
133+
final AtomicBoolean once;
134+
135+
public ScheduledAction(Action0 action, CompositeSubscription parent) {
136+
this.action = action;
137+
this.parent = parent;
138+
this.cancel = new CompositeSubscription();
139+
this.once = new AtomicBoolean();
140+
}
113141

114-
/* package */Subscription schedule(final Action0 action, long delayTime, TimeUnit unit, final OnActionComplete onComplete) {
115-
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
116-
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
117-
// we will instead schedule the event then launch the thread after the delay has passed
118-
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
119-
120-
@Override
121-
public void run() {
122-
try {
123-
if (innerSubscription.isUnsubscribed()) {
124-
return;
125-
}
126-
// now that the delay is past schedule the work to be done for real on the UI thread
127-
schedule(action);
128-
} finally {
129-
// remove the subscription now that we're completed
130-
Subscription s = sf.get();
131-
if (s != null) {
132-
innerSubscription.remove(s);
133-
}
134-
if (onComplete != null) {
135-
onComplete.complete(s);
136-
}
137-
}
142+
@Override
143+
public void run() {
144+
try {
145+
action.call();
146+
} finally {
147+
unsubscribe();
138148
}
139-
}, delayTime, unit);
149+
}
140150

141-
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
142-
Subscription s = Subscriptions.from(f);
143-
sf.set(s);
144-
innerSubscription.add(s);
145-
return s;
151+
@Override
152+
public boolean isUnsubscribed() {
153+
return cancel.isUnsubscribed();
154+
}
155+
156+
@Override
157+
public void unsubscribe() {
158+
if (once.compareAndSet(false, true)) {
159+
cancel.unsubscribe();
160+
parent.remove(this);
161+
}
162+
}
163+
public void add(Subscription s) {
164+
cancel.add(s);
165+
}
166+
/**
167+
* Adds a parent to this ScheduledAction so when it is
168+
* cancelled or terminates, it can remove itself from this parent.
169+
* @param parent
170+
*/
171+
public void addParent(CompositeSubscription parent) {
172+
cancel.add(new Remover(this, parent));
173+
}
146174
}
147175

148176
@Override
149177
public void unsubscribe() {
178+
executor.shutdown();
150179
innerSubscription.unsubscribe();
151180
}
152181

@@ -163,4 +192,4 @@ public boolean isUnsubscribed() {
163192

164193
}
165194

166-
}
195+
}

0 commit comments

Comments
 (0)