Skip to content

Commit 0302369

Browse files
Merge pull request #1158 from akarnokd/SchedulerFixes0506
Scheduler correctness improvements.
2 parents a1e0352 + c857ba3 commit 0302369

File tree

2 files changed

+133
-120
lines changed

2 files changed

+133
-120
lines changed

rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import rx.Scheduler;
2323
import rx.Subscription;
2424
import rx.functions.Action0;
25-
import rx.schedulers.NewThreadScheduler.OnActionComplete;
25+
import rx.schedulers.NewThreadScheduler.NewThreadWorker.ScheduledAction;
2626
import rx.subscriptions.CompositeSubscription;
2727
import rx.subscriptions.Subscriptions;
2828

2929
/* package */class EventLoopsScheduler extends Scheduler {
30-
31-
private static class ComputationSchedulerPool {
32-
final int cores = Runtime.getRuntime().availableProcessors();
30+
/** Manages a fixed number of workers. */
31+
static final class FixedSchedulerPool {
32+
final int cores;
3333
final ThreadFactory factory = new ThreadFactory() {
3434
final AtomicInteger counter = new AtomicInteger();
3535

@@ -41,47 +41,46 @@ public Thread newThread(Runnable r) {
4141
}
4242
};
4343

44-
final EventLoopScheduler[] eventLoops;
44+
final PoolWorker[] eventLoops;
45+
long n;
4546

46-
ComputationSchedulerPool() {
47+
FixedSchedulerPool() {
4748
// initialize event loops
48-
eventLoops = new EventLoopScheduler[cores];
49+
this.cores = Runtime.getRuntime().availableProcessors();
50+
this.eventLoops = new PoolWorker[cores];
4951
for (int i = 0; i < cores; i++) {
50-
eventLoops[i] = new EventLoopScheduler(factory);
52+
this.eventLoops[i] = new PoolWorker(factory);
5153
}
5254
}
5355

54-
private static ComputationSchedulerPool INSTANCE = new ComputationSchedulerPool();
55-
56-
long n = 0;
57-
58-
public EventLoopScheduler getEventLoop() {
59-
// round-robin selection (improvements to come)
60-
return eventLoops[(int) (n++ % cores)];
56+
public PoolWorker getEventLoop() {
57+
// simple round robin, improvements to come
58+
return eventLoops[(int)(n++ % cores)];
6159
}
62-
6360
}
6461

62+
final FixedSchedulerPool pool;
63+
64+
/**
65+
* Create a scheduler with pool size equal to the available processor
66+
* count and using least-recent worker selection policy.
67+
*/
68+
EventLoopsScheduler() {
69+
pool = new FixedSchedulerPool();
70+
}
71+
6572
@Override
6673
public Worker createWorker() {
67-
return new EventLoop();
74+
return new EventLoopWorker(pool.getEventLoop());
6875
}
6976

70-
private static class EventLoop extends Scheduler.Worker {
77+
private static class EventLoopWorker extends Scheduler.Worker {
7178
private final CompositeSubscription innerSubscription = new CompositeSubscription();
72-
private final EventLoopScheduler pooledEventLoop;
73-
private final OnActionComplete onComplete;
74-
75-
EventLoop() {
76-
pooledEventLoop = ComputationSchedulerPool.INSTANCE.getEventLoop();
77-
onComplete = new OnActionComplete() {
79+
private final PoolWorker poolWorker;
7880

79-
@Override
80-
public void complete(Subscription s) {
81-
innerSubscription.remove(s);
82-
}
83-
84-
};
81+
EventLoopWorker(PoolWorker poolWorker) {
82+
this.poolWorker = poolWorker;
83+
8584
}
8685

8786
@Override
@@ -96,29 +95,25 @@ public boolean isUnsubscribed() {
9695

9796
@Override
9897
public Subscription schedule(Action0 action) {
99-
if (innerSubscription.isUnsubscribed()) {
100-
// don't schedule, we are unsubscribed
101-
return Subscriptions.empty();
102-
}
103-
return pooledEventLoop.schedule(action, onComplete);
98+
return schedule(action, 0, null);
10499
}
105-
106100
@Override
107101
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
108102
if (innerSubscription.isUnsubscribed()) {
109103
// don't schedule, we are unsubscribed
110104
return Subscriptions.empty();
111105
}
112106

113-
return pooledEventLoop.schedule(action, delayTime, unit, onComplete);
107+
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
108+
innerSubscription.add(s);
109+
s.addParent(innerSubscription);
110+
return s;
114111
}
115-
116112
}
117-
118-
private static class EventLoopScheduler extends NewThreadScheduler.EventLoopScheduler {
119-
EventLoopScheduler(ThreadFactory threadFactory) {
113+
114+
private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker {
115+
PoolWorker(ThreadFactory threadFactory) {
120116
super(threadFactory);
121117
}
122118
}
123-
124119
}

rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 96 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.ExecutorService;
1918
import java.util.concurrent.Executors;
20-
import java.util.concurrent.ScheduledFuture;
19+
import java.util.concurrent.Future;
20+
import java.util.concurrent.ScheduledExecutorService;
2121
import java.util.concurrent.ThreadFactory;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.concurrent.atomic.AtomicLong;
24-
import java.util.concurrent.atomic.AtomicReference;
2525

2626
import rx.Scheduler;
2727
import rx.Subscription;
@@ -56,97 +56,122 @@ private NewThreadScheduler() {
5656

5757
@Override
5858
public Worker createWorker() {
59-
return new EventLoopScheduler(THREAD_FACTORY);
59+
return new NewThreadWorker(THREAD_FACTORY);
6060
}
6161

62-
/* package */static class EventLoopScheduler extends Scheduler.Worker implements Subscription {
62+
/* package */static class NewThreadWorker extends Scheduler.Worker implements Subscription {
6363
private final CompositeSubscription innerSubscription = new CompositeSubscription();
64-
private final ExecutorService executor;
64+
private final ScheduledExecutorService executor;
6565

66-
/* package */EventLoopScheduler(ThreadFactory threadFactory) {
67-
executor = Executors.newSingleThreadExecutor(threadFactory);
66+
/* package */NewThreadWorker(ThreadFactory threadFactory) {
67+
executor = Executors.newScheduledThreadPool(1, threadFactory);
6868
}
6969

7070
@Override
7171
public Subscription schedule(final Action0 action) {
72-
return schedule(action, null);
72+
return schedule(action, 0, null);
7373
}
7474

75-
/* package */Subscription schedule(final Action0 action, final OnActionComplete onComplete) {
75+
@Override
76+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
7677
if (innerSubscription.isUnsubscribed()) {
77-
// don't schedule, we are unsubscribed
7878
return Subscriptions.empty();
7979
}
80-
81-
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
82-
Subscription s = Subscriptions.from(executor.submit(new Runnable() {
83-
84-
@Override
85-
public void run() {
86-
try {
87-
if (innerSubscription.isUnsubscribed()) {
88-
return;
89-
}
90-
action.call();
91-
} finally {
92-
// remove the subscription now that we're completed
93-
Subscription s = sf.get();
94-
if (s != null) {
95-
innerSubscription.remove(s);
96-
}
97-
if (onComplete != null) {
98-
onComplete.complete(s);
99-
}
100-
}
101-
}
102-
}));
103-
104-
sf.set(s);
105-
innerSubscription.add(s);
106-
return s;
80+
return scheduleActual(action, delayTime, unit);
10781
}
10882

109-
@Override
110-
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
111-
return schedule(action, delayTime, unit, null);
83+
/* package */ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
84+
ScheduledAction run = new ScheduledAction(action, innerSubscription);
85+
Future<?> f;
86+
if (delayTime <= 0) {
87+
f = executor.submit(run);
88+
} else {
89+
f = executor.schedule(run, delayTime, unit);
90+
}
91+
run.add(Subscriptions.from(f));
92+
93+
return run;
94+
}
95+
96+
/** Remove a child subscription from a composite when unsubscribing. */
97+
private static final class Remover implements Subscription {
98+
final Subscription s;
99+
final CompositeSubscription parent;
100+
final AtomicBoolean once;
101+
102+
public Remover(Subscription s, CompositeSubscription parent) {
103+
this.s = s;
104+
this.parent = parent;
105+
this.once = new AtomicBoolean();
106+
}
107+
108+
@Override
109+
public boolean isUnsubscribed() {
110+
return s.isUnsubscribed();
111+
}
112+
113+
@Override
114+
public void unsubscribe() {
115+
if (once.compareAndSet(false, true)) {
116+
parent.remove(s);
117+
}
118+
}
119+
112120
}
121+
/**
122+
* A runnable that executes an Action0 and can be cancelled
123+
* The analogue is the Subscriber in respect of an Observer.
124+
*/
125+
public static final class ScheduledAction implements Runnable, Subscription {
126+
final CompositeSubscription cancel;
127+
final Action0 action;
128+
final CompositeSubscription parent;
129+
final AtomicBoolean once;
130+
131+
public ScheduledAction(Action0 action, CompositeSubscription parent) {
132+
this.action = action;
133+
this.parent = parent;
134+
this.cancel = new CompositeSubscription();
135+
this.once = new AtomicBoolean();
136+
}
113137

114-
/* package */Subscription schedule(final Action0 action, long delayTime, TimeUnit unit, final OnActionComplete onComplete) {
115-
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
116-
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
117-
// we will instead schedule the event then launch the thread after the delay has passed
118-
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
119-
120-
@Override
121-
public void run() {
122-
try {
123-
if (innerSubscription.isUnsubscribed()) {
124-
return;
125-
}
126-
// now that the delay is past schedule the work to be done for real on the UI thread
127-
schedule(action);
128-
} finally {
129-
// remove the subscription now that we're completed
130-
Subscription s = sf.get();
131-
if (s != null) {
132-
innerSubscription.remove(s);
133-
}
134-
if (onComplete != null) {
135-
onComplete.complete(s);
136-
}
137-
}
138+
@Override
139+
public void run() {
140+
try {
141+
action.call();
142+
} finally {
143+
unsubscribe();
138144
}
139-
}, delayTime, unit);
145+
}
140146

141-
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
142-
Subscription s = Subscriptions.from(f);
143-
sf.set(s);
144-
innerSubscription.add(s);
145-
return s;
147+
@Override
148+
public boolean isUnsubscribed() {
149+
return cancel.isUnsubscribed();
150+
}
151+
152+
@Override
153+
public void unsubscribe() {
154+
if (once.compareAndSet(false, true)) {
155+
cancel.unsubscribe();
156+
parent.remove(this);
157+
}
158+
}
159+
public void add(Subscription s) {
160+
cancel.add(s);
161+
}
162+
/**
163+
* Adds a parent to this ScheduledAction so when it is
164+
* cancelled or terminates, it can remove itself from this parent.
165+
* @param parent
166+
*/
167+
public void addParent(CompositeSubscription parent) {
168+
cancel.add(new Remover(this, parent));
169+
}
146170
}
147171

148172
@Override
149173
public void unsubscribe() {
174+
executor.shutdown();
150175
innerSubscription.unsubscribe();
151176
}
152177

@@ -156,11 +181,4 @@ public boolean isUnsubscribed() {
156181
}
157182

158183
}
159-
160-
/* package */static interface OnActionComplete {
161-
162-
public void complete(Subscription s);
163-
164-
}
165-
166184
}

0 commit comments

Comments
 (0)