Skip to content

Commit 4e5c17c

Browse files
committed
1.x: proposal: onTerminateDetach - detach upstream/downstream for GC (#3888)
1 parent 04ef4e6 commit 4e5c17c

File tree

3 files changed

+350
-0
lines changed

3 files changed

+350
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6611,6 +6611,23 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
66116611
return lift((Operator<T, T>)OperatorOnErrorResumeNextViaFunction.withException(resumeSequence));
66126612
}
66136613

6614+
6615+
/**
6616+
* Nulls out references to the upstream producer and downstream Subscriber if
6617+
* the sequence is terminated or downstream unsubscribes.
6618+
* <dl>
6619+
* <dt><b>Scheduler:</b></dt>
6620+
* <dd>{@code onTerminateDetach} does not operate by default on a particular {@link Scheduler}.</dd>
6621+
* </dl>
6622+
* @return an Observable which out references to the upstream producer and downstream Subscriber if
6623+
* the sequence is terminated or downstream unsubscribes
6624+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
6625+
*/
6626+
@Experimental
6627+
public final Observable<T> onTerminateDetach() {
6628+
return create(new OnSubscribeDetach<T>(this));
6629+
}
6630+
66146631
/**
66156632
* Returns a {@link ConnectableObservable}, which is a variety of Observable that waits until its
66166633
* {@link ConnectableObservable#connect connect} method is called before it begins emitting items to those
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.*;
19+
20+
import rx.*;
21+
import rx.Observable.OnSubscribe;
22+
import rx.internal.util.RxJavaPluginUtils;
23+
24+
/**
25+
* Nulls out references to upstream data structures when the source terminates or
26+
* the child unsubscribes.
27+
* @param <T> the value type
28+
*/
29+
public final class OnSubscribeDetach<T> implements OnSubscribe<T> {
30+
31+
final Observable<T> source;
32+
33+
public OnSubscribeDetach(Observable<T> source) {
34+
this.source = source;
35+
}
36+
37+
@Override
38+
public void call(Subscriber<? super T> t) {
39+
DetachSubscriber<T> parent = new DetachSubscriber<T>(t);
40+
DetachProducer<T> producer = new DetachProducer<T>(parent);
41+
42+
t.add(producer);
43+
t.setProducer(producer);
44+
45+
source.unsafeSubscribe(parent);
46+
}
47+
48+
/**
49+
* The parent subscriber that forwards events and cleans up on a terminal state.
50+
* @param <T> the value type
51+
*/
52+
static final class DetachSubscriber<T> extends Subscriber<T> {
53+
54+
final AtomicReference<Subscriber<? super T>> actual;
55+
56+
final AtomicReference<Producer> producer;
57+
58+
final AtomicLong requested;
59+
60+
public DetachSubscriber(Subscriber<? super T> actual) {
61+
this.actual = new AtomicReference<Subscriber<? super T>>(actual);
62+
this.producer = new AtomicReference<Producer>();
63+
this.requested = new AtomicLong();
64+
}
65+
66+
@Override
67+
public void onNext(T t) {
68+
Subscriber<? super T> a = actual.get();
69+
70+
if (a != null) {
71+
a.onNext(t);
72+
}
73+
}
74+
75+
@Override
76+
public void onError(Throwable e) {
77+
producer.lazySet(TerminatedProducer.INSTANCE);
78+
Subscriber<? super T> a = actual.getAndSet(null);
79+
80+
if (a != null) {
81+
a.onError(e);
82+
} else {
83+
RxJavaPluginUtils.handleException(e);
84+
}
85+
}
86+
87+
88+
@Override
89+
public void onCompleted() {
90+
producer.lazySet(TerminatedProducer.INSTANCE);
91+
Subscriber<? super T> a = actual.getAndSet(null);
92+
93+
if (a != null) {
94+
a.onCompleted();
95+
}
96+
}
97+
98+
void innerRequest(long n) {
99+
if (n < 0L) {
100+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
101+
}
102+
Producer p = producer.get();
103+
if (p != null) {
104+
p.request(n);
105+
} else {
106+
BackpressureUtils.getAndAddRequest(requested, n);
107+
p = producer.get();
108+
if (p != null && p != TerminatedProducer.INSTANCE) {
109+
long r = requested.getAndSet(0L);
110+
p.request(r);
111+
}
112+
}
113+
}
114+
115+
@Override
116+
public void setProducer(Producer p) {
117+
if (producer.compareAndSet(null, p)) {
118+
long r = requested.getAndSet(0L);
119+
p.request(r);
120+
} else {
121+
if (producer.get() != TerminatedProducer.INSTANCE) {
122+
throw new IllegalStateException("Producer already set!");
123+
}
124+
}
125+
}
126+
127+
void innerUnsubscribe() {
128+
producer.lazySet(TerminatedProducer.INSTANCE);
129+
actual.lazySet(null);
130+
// full barrier in unsubscribe()
131+
unsubscribe();
132+
}
133+
}
134+
135+
/**
136+
* Callbacks from the child Subscriber.
137+
* @param <T> the value type
138+
*/
139+
static final class DetachProducer<T> implements Producer, Subscription {
140+
final DetachSubscriber<T> parent;
141+
142+
public DetachProducer(DetachSubscriber<T> parent) {
143+
this.parent = parent;
144+
}
145+
146+
@Override
147+
public void request(long n) {
148+
parent.innerRequest(n);
149+
}
150+
151+
@Override
152+
public boolean isUnsubscribed() {
153+
return parent.isUnsubscribed();
154+
}
155+
156+
@Override
157+
public void unsubscribe() {
158+
parent.innerUnsubscribe();
159+
}
160+
}
161+
162+
/**
163+
* Singleton instance via enum.
164+
*/
165+
enum TerminatedProducer implements Producer {
166+
INSTANCE;
167+
168+
@Override
169+
public void request(long n) {
170+
// ignored
171+
}
172+
}
173+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.lang.ref.WeakReference;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import org.junit.*;
22+
23+
import rx.*;
24+
import rx.Observable.OnSubscribe;
25+
import rx.exceptions.TestException;
26+
import rx.observers.TestSubscriber;
27+
28+
public class OnSubscribeDetachTest {
29+
30+
Object o;
31+
32+
@Test
33+
public void just() throws Exception {
34+
o = new Object();
35+
36+
WeakReference<Object> wr = new WeakReference<Object>(o);
37+
38+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
39+
40+
Observable.just(o).count().onTerminateDetach().subscribe(ts);
41+
42+
ts.assertValue(1);
43+
ts.assertCompleted();
44+
ts.assertNoErrors();
45+
46+
o = null;
47+
48+
System.gc();
49+
Thread.sleep(200);
50+
51+
Assert.assertNull("Object retained!", wr.get());
52+
53+
}
54+
55+
@Test
56+
public void error() {
57+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
58+
59+
Observable.error(new TestException()).onTerminateDetach().subscribe(ts);
60+
61+
ts.assertNoValues();
62+
ts.assertError(TestException.class);
63+
ts.assertNotCompleted();
64+
}
65+
66+
@Test
67+
public void empty() {
68+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
69+
70+
Observable.empty().onTerminateDetach().subscribe(ts);
71+
72+
ts.assertNoValues();
73+
ts.assertNoErrors();
74+
ts.assertCompleted();
75+
}
76+
77+
@Test
78+
public void range() {
79+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
80+
81+
Observable.range(1, 1000).onTerminateDetach().subscribe(ts);
82+
83+
ts.assertValueCount(1000);
84+
ts.assertNoErrors();
85+
ts.assertCompleted();
86+
}
87+
88+
89+
@Test
90+
public void backpressured() throws Exception {
91+
o = new Object();
92+
93+
WeakReference<Object> wr = new WeakReference<Object>(o);
94+
95+
TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
96+
97+
Observable.just(o).count().onTerminateDetach().subscribe(ts);
98+
99+
ts.assertNoValues();
100+
101+
ts.requestMore(1);
102+
103+
ts.assertValue(1);
104+
ts.assertCompleted();
105+
ts.assertNoErrors();
106+
107+
o = null;
108+
109+
System.gc();
110+
Thread.sleep(200);
111+
112+
Assert.assertNull("Object retained!", wr.get());
113+
}
114+
115+
@Test
116+
public void justUnsubscribed() throws Exception {
117+
o = new Object();
118+
119+
WeakReference<Object> wr = new WeakReference<Object>(o);
120+
121+
TestSubscriber<Object> ts = new TestSubscriber<Object>(0);
122+
123+
Observable.just(o).count().onTerminateDetach().subscribe(ts);
124+
125+
ts.unsubscribe();
126+
o = null;
127+
128+
System.gc();
129+
Thread.sleep(200);
130+
131+
Assert.assertNull("Object retained!", wr.get());
132+
133+
}
134+
135+
@Test
136+
public void deferredUpstreamProducer() {
137+
final AtomicReference<Subscriber<? super Object>> subscriber = new AtomicReference<Subscriber<? super Object>>();
138+
139+
TestSubscriber<Object> ts = new TestSubscriber<Object>(0);
140+
141+
Observable.create(new OnSubscribe<Object>() {
142+
@Override
143+
public void call(Subscriber<? super Object> t) {
144+
subscriber.set(t);
145+
}
146+
}).onTerminateDetach().subscribe(ts);
147+
148+
ts.requestMore(2);
149+
150+
new OnSubscribeRange(1, 3).call(subscriber.get());
151+
152+
ts.assertValues(1, 2);
153+
154+
ts.requestMore(1);
155+
156+
ts.assertValues(1, 2, 3);
157+
ts.assertCompleted();
158+
ts.assertNoErrors();
159+
}
160+
}

0 commit comments

Comments
 (0)