Skip to content

Commit 55941d9

Browse files
zireael-0endboss-leftyakarnokd
authored
3.x: Allow Single.zip and Maybe.zip result to be garbage collected (#7196)
* #7195 Allow zip result to be garbage collected * Also `null` collected values on error and complete * Update MaybeZipArray.java * Update SingleZipArray.java * Update MaybeZipArray.java * Update SingleZipArray.java * Update MaybeZipArray.java * Update MaybeZipArray.java Co-authored-by: Alex <[email protected]> Co-authored-by: David Karnok <[email protected]>
1 parent 22c5e0b commit 55941d9

File tree

2 files changed

+23
-4
lines changed

2 files changed

+23
-4
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipArray.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.rxjava3.internal.operators.maybe;
1515

16+
import java.util.Arrays;
1617
import java.util.Objects;
1718
import java.util.concurrent.atomic.*;
1819

@@ -73,7 +74,7 @@ static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposa
7374

7475
final ZipMaybeObserver<T>[] observers;
7576

76-
final Object[] values;
77+
Object[] values;
7778

7879
@SuppressWarnings("unchecked")
7980
ZipCoordinator(MaybeObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
@@ -99,22 +100,29 @@ public void dispose() {
99100
for (ZipMaybeObserver<?> d : observers) {
100101
d.dispose();
101102
}
103+
104+
values = null;
102105
}
103106
}
104107

105108
void innerSuccess(T value, int index) {
106-
values[index] = value;
109+
Object[] values = this.values;
110+
if (values != null) {
111+
values[index] = value;
112+
}
107113
if (decrementAndGet() == 0) {
108114
R v;
109115

110116
try {
111117
v = Objects.requireNonNull(zipper.apply(values), "The zipper returned a null value");
112118
} catch (Throwable ex) {
113119
Exceptions.throwIfFatal(ex);
120+
this.values = null;
114121
downstream.onError(ex);
115122
return;
116123
}
117124

125+
this.values = null;
118126
downstream.onSuccess(v);
119127
}
120128
}
@@ -133,6 +141,7 @@ void disposeExcept(int index) {
133141
void innerError(Throwable ex, int index) {
134142
if (getAndSet(0) > 0) {
135143
disposeExcept(index);
144+
values = null;
136145
downstream.onError(ex);
137146
} else {
138147
RxJavaPlugins.onError(ex);
@@ -142,6 +151,7 @@ void innerError(Throwable ex, int index) {
142151
void innerComplete(int index) {
143152
if (getAndSet(0) > 0) {
144153
disposeExcept(index);
154+
values = null;
145155
downstream.onComplete();
146156
}
147157
}

src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleZipArray.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.rxjava3.internal.operators.single;
1515

16+
import java.util.Arrays;
1617
import java.util.Objects;
1718
import java.util.concurrent.atomic.*;
1819

@@ -74,7 +75,7 @@ static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposa
7475

7576
final ZipSingleObserver<T>[] observers;
7677

77-
final Object[] values;
78+
Object[] values;
7879

7980
@SuppressWarnings("unchecked")
8081
ZipCoordinator(SingleObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
@@ -100,22 +101,29 @@ public void dispose() {
100101
for (ZipSingleObserver<?> d : observers) {
101102
d.dispose();
102103
}
104+
105+
values = null;
103106
}
104107
}
105108

106109
void innerSuccess(T value, int index) {
107-
values[index] = value;
110+
Object[] values = this.values;
111+
if (values != null) {
112+
values[index] = value;
113+
}
108114
if (decrementAndGet() == 0) {
109115
R v;
110116

111117
try {
112118
v = Objects.requireNonNull(zipper.apply(values), "The zipper returned a null value");
113119
} catch (Throwable ex) {
114120
Exceptions.throwIfFatal(ex);
121+
this.values = null;
115122
downstream.onError(ex);
116123
return;
117124
}
118125

126+
this.values = null;
119127
downstream.onSuccess(v);
120128
}
121129
}
@@ -134,6 +142,7 @@ void disposeExcept(int index) {
134142
void innerError(Throwable ex, int index) {
135143
if (getAndSet(0) > 0) {
136144
disposeExcept(index);
145+
values = null;
137146
downstream.onError(ex);
138147
} else {
139148
RxJavaPlugins.onError(ex);

0 commit comments

Comments
 (0)