Skip to content

Commit ea03b91

Browse files
authored
2.x: allow subscribeOn to work with blocking create (#4770)
1 parent 07d24c2 commit ea03b91

File tree

3 files changed

+68
-6
lines changed

3 files changed

+68
-6
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12251,7 +12251,7 @@ public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
1225112251
@SchedulerSupport(SchedulerSupport.CUSTOM)
1225212252
public final Flowable<T> subscribeOn(Scheduler scheduler) {
1225312253
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
12254-
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler));
12254+
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, this instanceof FlowableCreate));
1225512255
}
1225612256

1225712257
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,28 @@
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2222
import io.reactivex.internal.util.BackpressureHelper;
2323

24+
/**
25+
* Subscribes to the source Flowable on the specified Scheduler and makes
26+
* sure downstream requests are scheduled there as well.
27+
*
28+
* @param <T> the value type emitted
29+
*/
2430
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
31+
2532
final Scheduler scheduler;
2633

27-
public FlowableSubscribeOn(Publisher<T> source, Scheduler scheduler) {
34+
final boolean nonScheduledRequests;
35+
36+
public FlowableSubscribeOn(Publisher<T> source, Scheduler scheduler, boolean nonScheduledRequests) {
2837
super(source);
2938
this.scheduler = scheduler;
39+
this.nonScheduledRequests = nonScheduledRequests;
3040
}
3141

3242
@Override
3343
public void subscribeActual(final Subscriber<? super T> s) {
3444
Scheduler.Worker w = scheduler.createWorker();
35-
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source);
45+
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
3646
s.onSubscribe(sos);
3747

3848
w.schedule(sos);
@@ -42,21 +52,26 @@ static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
4252
implements Subscriber<T>, Subscription, Runnable {
4353

4454
private static final long serialVersionUID = 8094547886072529208L;
55+
4556
final Subscriber<? super T> actual;
57+
4658
final Scheduler.Worker worker;
4759

4860
final AtomicReference<Subscription> s;
4961

5062
final AtomicLong requested;
5163

64+
final boolean nonScheduledRequests;
65+
5266
Publisher<T> source;
5367

54-
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source) {
68+
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean nonScheduledRequests) {
5569
this.actual = actual;
5670
this.worker = worker;
5771
this.source = source;
5872
this.s = new AtomicReference<Subscription>();
5973
this.requested = new AtomicLong();
74+
this.nonScheduledRequests = nonScheduledRequests;
6075
}
6176

6277
@Override
@@ -114,7 +129,7 @@ public void request(final long n) {
114129
}
115130

116131
void requestUpstream(final long n, final Subscription s) {
117-
if (Thread.currentThread() == get()) {
132+
if (nonScheduledRequests || Thread.currentThread() == get()) {
118133
s.request(n);
119134
} else {
120135
worker.schedule(new Runnable() {

src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.*;
2525
import io.reactivex.Scheduler.Worker;
2626
import io.reactivex.disposables.Disposable;
27+
import io.reactivex.internal.functions.Functions;
2728
import io.reactivex.internal.operators.flowable.FlowableSubscribeOn.SubscribeOnSubscriber;
2829
import io.reactivex.internal.subscriptions.BooleanSubscription;
2930
import io.reactivex.schedulers.*;
@@ -295,7 +296,7 @@ public void deferredRequestRace() {
295296

296297
Worker w = Schedulers.computation().createWorker();
297298

298-
final SubscribeOnSubscriber<Integer> so = new SubscribeOnSubscriber<Integer>(ts, w, Flowable.<Integer>never());
299+
final SubscribeOnSubscriber<Integer> so = new SubscribeOnSubscriber<Integer>(ts, w, Flowable.<Integer>never(), true);
299300
ts.onSubscribe(so);
300301

301302
final BooleanSubscription bs = new BooleanSubscription();
@@ -321,4 +322,50 @@ public void run() {
321322
}
322323
}
323324
}
325+
326+
@Test
327+
public void nonScheduledRequests() {
328+
TestSubscriber<Object> ts = Flowable.create(new FlowableOnSubscribe<Object>() {
329+
@Override
330+
public void subscribe(FlowableEmitter<Object> s) throws Exception {
331+
for (int i = 1; i < 1001; i++) {
332+
s.onNext(i);
333+
Thread.sleep(1);
334+
}
335+
s.onComplete();
336+
}
337+
}, BackpressureStrategy.DROP)
338+
.subscribeOn(Schedulers.single())
339+
.observeOn(Schedulers.computation())
340+
.test()
341+
.awaitDone(5, TimeUnit.SECONDS)
342+
.assertNoErrors()
343+
.assertComplete();
344+
345+
int c = ts.valueCount();
346+
347+
assertTrue("" + c, c > Flowable.bufferSize());
348+
}
349+
350+
@Test
351+
public void scheduledRequests() {
352+
Flowable.create(new FlowableOnSubscribe<Object>() {
353+
@Override
354+
public void subscribe(FlowableEmitter<Object> s) throws Exception {
355+
for (int i = 1; i < 1001; i++) {
356+
s.onNext(i);
357+
Thread.sleep(1);
358+
}
359+
s.onComplete();
360+
}
361+
}, BackpressureStrategy.DROP)
362+
.map(Functions.identity())
363+
.subscribeOn(Schedulers.single())
364+
.observeOn(Schedulers.computation())
365+
.test()
366+
.awaitDone(5, TimeUnit.SECONDS)
367+
.assertValueCount(Flowable.bufferSize())
368+
.assertNoErrors()
369+
.assertComplete();
370+
}
324371
}

0 commit comments

Comments
 (0)