Skip to content

Commit 869c855

Browse files
authored
2.x: coverage, minor fixes, cleanup 10/19-2 (#4732)
1 parent 3e7d63c commit 869c855

29 files changed

+890
-294
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
*/
1313
package io.reactivex.internal.operators.flowable;
1414

15+
import org.reactivestreams.*;
16+
1517
import io.reactivex.Flowable;
18+
import io.reactivex.exceptions.Exceptions;
1619
import io.reactivex.functions.*;
17-
import io.reactivex.internal.subscribers.SubscriptionLambdaSubscriber;
18-
19-
import org.reactivestreams.Subscriber;
20-
import org.reactivestreams.Subscription;
20+
import io.reactivex.internal.subscriptions.*;
21+
import io.reactivex.plugins.RxJavaPlugins;
2122

2223
public final class FlowableDoOnLifecycle<T> extends AbstractFlowableWithUpstream<T, T> {
2324
private final Consumer<? super Subscription> onSubscribe;
@@ -36,4 +37,78 @@ public FlowableDoOnLifecycle(Flowable<T> source, Consumer<? super Subscription>
3637
protected void subscribeActual(Subscriber<? super T> s) {
3738
source.subscribe(new SubscriptionLambdaSubscriber<T>(s, onSubscribe, onRequest, onCancel));
3839
}
40+
41+
static final class SubscriptionLambdaSubscriber<T> implements Subscriber<T>, Subscription {
42+
final Subscriber<? super T> actual;
43+
final Consumer<? super Subscription> onSubscribe;
44+
final LongConsumer onRequest;
45+
final Action onCancel;
46+
47+
Subscription s;
48+
49+
public SubscriptionLambdaSubscriber(Subscriber<? super T> actual,
50+
Consumer<? super Subscription> onSubscribe,
51+
LongConsumer onRequest,
52+
Action onCancel) {
53+
this.actual = actual;
54+
this.onSubscribe = onSubscribe;
55+
this.onCancel = onCancel;
56+
this.onRequest = onRequest;
57+
}
58+
59+
@Override
60+
public void onSubscribe(Subscription s) {
61+
// this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior
62+
try {
63+
onSubscribe.accept(s);
64+
} catch (Throwable e) {
65+
Exceptions.throwIfFatal(e);
66+
s.cancel();
67+
RxJavaPlugins.onError(e);
68+
EmptySubscription.error(e, actual);
69+
return;
70+
}
71+
if (SubscriptionHelper.validate(this.s, s)) {
72+
this.s = s;
73+
actual.onSubscribe(this);
74+
}
75+
}
76+
77+
@Override
78+
public void onNext(T t) {
79+
actual.onNext(t);
80+
}
81+
82+
@Override
83+
public void onError(Throwable t) {
84+
actual.onError(t);
85+
}
86+
87+
@Override
88+
public void onComplete() {
89+
actual.onComplete();
90+
}
91+
92+
@Override
93+
public void request(long n) {
94+
try {
95+
onRequest.accept(n);
96+
} catch (Throwable e) {
97+
Exceptions.throwIfFatal(e);
98+
RxJavaPlugins.onError(e);
99+
}
100+
s.request(n);
101+
}
102+
103+
@Override
104+
public void cancel() {
105+
try {
106+
onCancel.run();
107+
} catch (Throwable e) {
108+
Exceptions.throwIfFatal(e);
109+
RxJavaPlugins.onError(e);
110+
}
111+
s.cancel();
112+
}
113+
}
39114
}

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import java.util.Iterator;
16+
import java.util.*;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
import java.util.concurrent.locks.*;
1719

1820
import io.reactivex.ObservableSource;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
24+
import io.reactivex.internal.util.ExceptionHelper;
1925

2026
public final class BlockingObservableIterable<T> implements Iterable<T> {
2127
final ObservableSource<? extends T> source;
@@ -33,4 +39,118 @@ public Iterator<T> iterator() {
3339
source.subscribe(it);
3440
return it;
3541
}
42+
43+
static final class BlockingObservableIterator<T>
44+
extends AtomicReference<Disposable>
45+
implements io.reactivex.Observer<T>, Iterator<T>, Disposable {
46+
47+
48+
private static final long serialVersionUID = 6695226475494099826L;
49+
50+
final SpscLinkedArrayQueue<T> queue;
51+
52+
final Lock lock;
53+
54+
final Condition condition;
55+
56+
volatile boolean done;
57+
Throwable error;
58+
59+
public BlockingObservableIterator(int batchSize) {
60+
this.queue = new SpscLinkedArrayQueue<T>(batchSize);
61+
this.lock = new ReentrantLock();
62+
this.condition = lock.newCondition();
63+
}
64+
65+
@Override
66+
public boolean hasNext() {
67+
for (;;) {
68+
boolean d = done;
69+
boolean empty = queue.isEmpty();
70+
if (d) {
71+
Throwable e = error;
72+
if (e != null) {
73+
throw ExceptionHelper.wrapOrThrow(e);
74+
} else
75+
if (empty) {
76+
return false;
77+
}
78+
}
79+
if (empty) {
80+
try {
81+
lock.lock();
82+
try {
83+
while (!done && queue.isEmpty()) {
84+
condition.await();
85+
}
86+
} finally {
87+
lock.unlock();
88+
}
89+
} catch (InterruptedException ex) {
90+
DisposableHelper.dispose(this);
91+
signalConsumer();
92+
throw ExceptionHelper.wrapOrThrow(ex);
93+
}
94+
} else {
95+
return true;
96+
}
97+
}
98+
}
99+
100+
@Override
101+
public T next() {
102+
if (hasNext()) {
103+
return queue.poll();
104+
}
105+
throw new NoSuchElementException();
106+
}
107+
108+
@Override
109+
public void onSubscribe(Disposable s) {
110+
DisposableHelper.setOnce(this, s);
111+
}
112+
113+
@Override
114+
public void onNext(T t) {
115+
queue.offer(t);
116+
signalConsumer();
117+
}
118+
119+
@Override
120+
public void onError(Throwable t) {
121+
error = t;
122+
done = true;
123+
signalConsumer();
124+
}
125+
126+
@Override
127+
public void onComplete() {
128+
done = true;
129+
signalConsumer();
130+
}
131+
132+
void signalConsumer() {
133+
lock.lock();
134+
try {
135+
condition.signalAll();
136+
} finally {
137+
lock.unlock();
138+
}
139+
}
140+
141+
@Override // otherwise default method which isn't available in Java 7
142+
public void remove() {
143+
throw new UnsupportedOperationException("remove");
144+
}
145+
146+
@Override
147+
public void dispose() {
148+
DisposableHelper.dispose(this);
149+
}
150+
151+
@Override
152+
public boolean isDisposed() {
153+
return DisposableHelper.isDisposed(get());
154+
}
155+
}
36156
}

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterator.java

Lines changed: 0 additions & 137 deletions
This file was deleted.

src/main/java/io/reactivex/internal/operators/observable/ObservableFlattenIterable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public void onError(Throwable e) {
121121
RxJavaPlugins.onError(e);
122122
return;
123123
}
124+
d = DisposableHelper.DISPOSED;
124125
actual.onError(e);
125126
}
126127

@@ -129,6 +130,7 @@ public void onComplete() {
129130
if (d == DisposableHelper.DISPOSED) {
130131
return;
131132
}
133+
d = DisposableHelper.DISPOSED;
132134
actual.onComplete();
133135
}
134136

0 commit comments

Comments
 (0)