Skip to content

Commit 318bf43

Browse files
authored
2.x: cleanup & coverage 10/24-2 (#4763)
1 parent 3634c92 commit 318bf43

33 files changed

+1254
-338
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ public void onNext(Object t) {
484484

485485
@Override
486486
public void onError(Throwable t) {
487-
parent.innerError(t);
487+
parent.innerCloseError(t);
488488
}
489489

490490
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,10 @@ public void run() {
8282

8383
if (r != 0L) {
8484
actual.onNext(count++);
85-
if (r != Long.MAX_VALUE) {
86-
decrementAndGet();
87-
}
85+
BackpressureHelper.produced(this, 1);
8886
} else {
89-
try {
90-
actual.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
91-
} finally {
92-
DisposableHelper.dispose(resource);
93-
}
87+
actual.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
88+
DisposableHelper.dispose(resource);
9489
}
9590
}
9691
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableJoin.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ public FlowableJoin(
5555
@Override
5656
protected void subscribeActual(Subscriber<? super R> s) {
5757

58-
GroupJoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R> parent =
59-
new GroupJoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>(s, leftEnd, rightEnd, resultSelector);
58+
JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R> parent =
59+
new JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>(s, leftEnd, rightEnd, resultSelector);
6060

6161
s.onSubscribe(parent);
6262

@@ -69,7 +69,7 @@ protected void subscribeActual(Subscriber<? super R> s) {
6969
other.subscribe(right);
7070
}
7171

72-
static final class GroupJoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
72+
static final class JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
7373
extends AtomicInteger implements Subscription, JoinSupport {
7474

7575

@@ -111,7 +111,7 @@ static final class GroupJoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
111111

112112
static final Integer RIGHT_CLOSE = 4;
113113

114-
GroupJoinSubscription(Subscriber<? super R> actual, Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd,
114+
JoinSubscription(Subscriber<? super R> actual, Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd,
115115
Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd,
116116
BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) {
117117
this.actual = actual;

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,7 @@ public void onNext(T t) {
8181
long r = get();
8282
if (r != 0L) {
8383
actual.onNext(t);
84-
if (r != Long.MAX_VALUE) {
85-
decrementAndGet();
86-
}
84+
BackpressureHelper.produced(this, 1);
8785
} else {
8886
try {
8987
onDrop.accept(t);

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureError.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,7 @@ public void onNext(T t) {
6565
long r = get();
6666
if (r != 0L) {
6767
actual.onNext(t);
68-
if (r != Long.MAX_VALUE) {
69-
decrementAndGet();
70-
}
68+
BackpressureHelper.produced(this, 1);
7169
} else {
7270
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
7371
}

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -102,20 +102,14 @@ public void onNext(R t) {
102102

103103
@Override
104104
public void onError(Throwable t) {
105-
try {
106-
actual.onError(t);
107-
} finally {
108-
processor.dispose();
109-
}
105+
actual.onError(t);
106+
processor.dispose();
110107
}
111108

112109
@Override
113110
public void onComplete() {
114-
try {
115-
actual.onComplete();
116-
} finally {
117-
processor.dispose();
118-
}
111+
actual.onComplete();
112+
processor.dispose();
119113
}
120114

121115
@Override
@@ -214,12 +208,10 @@ public void onNext(T t) {
214208
if (done) {
215209
return;
216210
}
217-
if (sourceMode == QueueSubscription.NONE) {
218-
if (!queue.offer(t)) {
219-
SubscriptionHelper.cancel(s);
220-
onError(new MissingBackpressureException());
221-
return;
222-
}
211+
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
212+
s.get().cancel();
213+
onError(new MissingBackpressureException());
214+
return;
223215
}
224216
drain();
225217
}
@@ -473,20 +465,7 @@ static final class MulticastSubscription<T>
473465
@Override
474466
public void request(long n) {
475467
if (SubscriptionHelper.validate(n)) {
476-
for (;;) {
477-
long r = get();
478-
if (r == Long.MIN_VALUE) {
479-
return;
480-
}
481-
if (r != Long.MAX_VALUE) {
482-
long u = BackpressureHelper.addCap(r, n);
483-
if (compareAndSet(r, u)) {
484-
break;
485-
}
486-
} else {
487-
break;
488-
}
489-
}
468+
BackpressureHelper.addCancel(this, n);
490469
parent.drain();
491470
}
492471
}

0 commit comments

Comments
 (0)