Skip to content

Commit c2c84c1

Browse files
OnSubscribeRefCount UnitTests
Additional tests on top of those started by @davidmoten
1 parent 3cecb90 commit c2c84c1

File tree

1 file changed

+328
-0
lines changed

1 file changed

+328
-0
lines changed
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
20+
import static org.junit.Assert.fail;
21+
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
26+
import org.junit.Test;
27+
28+
import rx.Observable;
29+
import rx.Observable.OnSubscribe;
30+
import rx.Subscriber;
31+
import rx.Subscription;
32+
import rx.functions.Action0;
33+
import rx.functions.Action1;
34+
import rx.observers.TestSubscriber;
35+
import rx.schedulers.Schedulers;
36+
37+
public class OnSubscribeRefCountTest {
38+
39+
@Test
40+
public void testRefCountAsync() {
41+
final AtomicInteger subscribeCount = new AtomicInteger();
42+
final AtomicInteger nextCount = new AtomicInteger();
43+
Observable<Long> r = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
44+
.doOnSubscribe(new Action0() {
45+
46+
@Override
47+
public void call() {
48+
subscribeCount.incrementAndGet();
49+
}
50+
51+
})
52+
.doOnNext(new Action1<Long>() {
53+
54+
@Override
55+
public void call(Long l) {
56+
nextCount.incrementAndGet();
57+
}
58+
59+
})
60+
.publish().refCount();
61+
62+
final AtomicInteger receivedCount = new AtomicInteger();
63+
Subscription s1 = r.subscribe(new Action1<Long>() {
64+
65+
@Override
66+
public void call(Long l) {
67+
receivedCount.incrementAndGet();
68+
}
69+
70+
});
71+
Subscription s2 = r.subscribe();
72+
73+
// give time to emit
74+
try {
75+
Thread.sleep(50);
76+
} catch (InterruptedException e) {
77+
}
78+
79+
// now unsubscribe
80+
s2.unsubscribe(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
81+
s1.unsubscribe();
82+
83+
System.out.println("onNext: " + nextCount.get());
84+
85+
// should emit once for both subscribers
86+
assertEquals(nextCount.get(), receivedCount.get());
87+
// only 1 subscribe
88+
assertEquals(1, subscribeCount.get());
89+
}
90+
91+
@Test
92+
public void testRefCountSynchronous() {
93+
final AtomicInteger subscribeCount = new AtomicInteger();
94+
final AtomicInteger nextCount = new AtomicInteger();
95+
Observable<Integer> r = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
96+
.doOnSubscribe(new Action0() {
97+
98+
@Override
99+
public void call() {
100+
subscribeCount.incrementAndGet();
101+
}
102+
103+
})
104+
.doOnNext(new Action1<Integer>() {
105+
106+
@Override
107+
public void call(Integer l) {
108+
nextCount.incrementAndGet();
109+
}
110+
111+
})
112+
.publish().refCount();
113+
114+
final AtomicInteger receivedCount = new AtomicInteger();
115+
Subscription s1 = r.subscribe(new Action1<Integer>() {
116+
117+
@Override
118+
public void call(Integer l) {
119+
receivedCount.incrementAndGet();
120+
}
121+
122+
});
123+
Subscription s2 = r.subscribe();
124+
125+
// give time to emit
126+
try {
127+
Thread.sleep(50);
128+
} catch (InterruptedException e) {
129+
}
130+
131+
// now unsubscribe
132+
s2.unsubscribe(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
133+
s1.unsubscribe();
134+
135+
System.out.println("onNext: " + nextCount.get());
136+
137+
// it will emit twice because it is synchronous
138+
assertEquals(nextCount.get(), receivedCount.get() * 2);
139+
// it will subscribe twice because it is synchronous
140+
assertEquals(2, subscribeCount.get());
141+
}
142+
143+
@Test
144+
public void testRefCountSynchronousTake() {
145+
final AtomicInteger nextCount = new AtomicInteger();
146+
Observable<Integer> r = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
147+
.doOnNext(new Action1<Integer>() {
148+
149+
@Override
150+
public void call(Integer l) {
151+
System.out.println("onNext --------> " + l);
152+
nextCount.incrementAndGet();
153+
}
154+
155+
})
156+
.take(4)
157+
.publish().refCount();
158+
159+
final AtomicInteger receivedCount = new AtomicInteger();
160+
r.subscribe(new Action1<Integer>() {
161+
162+
@Override
163+
public void call(Integer l) {
164+
receivedCount.incrementAndGet();
165+
}
166+
167+
});
168+
169+
System.out.println("onNext: " + nextCount.get());
170+
171+
assertEquals(4, receivedCount.get());
172+
assertEquals(4, receivedCount.get());
173+
}
174+
175+
@Test
176+
public void testRepeat() {
177+
final AtomicInteger subscribeCount = new AtomicInteger();
178+
final AtomicInteger unsubscribeCount = new AtomicInteger();
179+
Observable<Long> r = Observable.timer(0, 1, TimeUnit.MILLISECONDS)
180+
.doOnSubscribe(new Action0() {
181+
182+
@Override
183+
public void call() {
184+
System.out.println("******************************* Subscribe received");
185+
// when we are subscribed
186+
subscribeCount.incrementAndGet();
187+
}
188+
189+
})
190+
.doOnUnsubscribe(new Action0() {
191+
192+
@Override
193+
public void call() {
194+
System.out.println("******************************* Unsubscribe received");
195+
// when we are unsubscribed
196+
unsubscribeCount.incrementAndGet();
197+
}
198+
199+
})
200+
.publish().refCount();
201+
202+
for (int i = 0; i < 10; i++) {
203+
TestSubscriber<Long> ts1 = new TestSubscriber<Long>();
204+
TestSubscriber<Long> ts2 = new TestSubscriber<Long>();
205+
r.subscribe(ts1);
206+
r.subscribe(ts2);
207+
try {
208+
Thread.sleep(50);
209+
} catch (InterruptedException e) {
210+
}
211+
ts1.unsubscribe();
212+
ts2.unsubscribe();
213+
ts1.assertNoErrors();
214+
ts2.assertNoErrors();
215+
assertTrue(ts1.getOnNextEvents().size() > 0);
216+
assertTrue(ts2.getOnNextEvents().size() > 0);
217+
}
218+
219+
assertEquals(10, subscribeCount.get());
220+
assertEquals(10, unsubscribeCount.get());
221+
}
222+
223+
@Test
224+
public void testConnectUnsubscribe() throws InterruptedException {
225+
final CountDownLatch unsubscribeLatch = new CountDownLatch(1);
226+
final CountDownLatch subscribeLatch = new CountDownLatch(1);
227+
Observable<Long> o = synchronousInterval()
228+
.doOnSubscribe(new Action0() {
229+
230+
@Override
231+
public void call() {
232+
System.out.println("******************************* Subscribe received");
233+
// when we are subscribed
234+
subscribeLatch.countDown();
235+
}
236+
237+
})
238+
.doOnUnsubscribe(new Action0() {
239+
240+
@Override
241+
public void call() {
242+
System.out.println("******************************* Unsubscribe received");
243+
// when we are unsubscribed
244+
unsubscribeLatch.countDown();
245+
}
246+
247+
});
248+
TestSubscriber<Long> s = new TestSubscriber<Long>();
249+
o.publish().refCount().subscribeOn(Schedulers.newThread()).subscribe(s);
250+
System.out.println("send unsubscribe");
251+
// wait until connected
252+
subscribeLatch.await();
253+
// now unsubscribe
254+
s.unsubscribe();
255+
System.out.println("DONE sending unsubscribe ... now waiting");
256+
if (!unsubscribeLatch.await(3000, TimeUnit.MILLISECONDS)) {
257+
System.out.println("Errors: " + s.getOnErrorEvents());
258+
if (s.getOnErrorEvents().size() > 0) {
259+
s.getOnErrorEvents().get(0).printStackTrace();
260+
}
261+
fail("timed out waiting for unsubscribe");
262+
}
263+
s.assertNoErrors();
264+
}
265+
266+
@Test
267+
public void testConnectUnsubscribeRaceCondition() throws InterruptedException {
268+
final AtomicInteger subUnsubCount = new AtomicInteger();
269+
Observable<Long> o = synchronousInterval()
270+
.doOnUnsubscribe(new Action0() {
271+
272+
@Override
273+
public void call() {
274+
System.out.println("******************************* Unsubscribe received");
275+
// when we are unsubscribed
276+
subUnsubCount.decrementAndGet();
277+
}
278+
279+
})
280+
.doOnSubscribe(new Action0() {
281+
282+
@Override
283+
public void call() {
284+
System.out.println("******************************* SUBSCRIBE received");
285+
subUnsubCount.incrementAndGet();
286+
}
287+
288+
});
289+
290+
TestSubscriber<Long> s = new TestSubscriber<Long>();
291+
o.publish().refCount().subscribeOn(Schedulers.newThread()).subscribe(s);
292+
System.out.println("send unsubscribe");
293+
// now immediately unsubscribe while subscribeOn is racing to subscribe
294+
s.unsubscribe();
295+
// this generally will mean it won't even subscribe as it is already unsubscribed by the time connect() gets scheduled
296+
297+
// either we subscribed and then unsubscribed, or we didn't ever even subscribe
298+
assertEquals(0, subUnsubCount.get());
299+
300+
System.out.println("DONE sending unsubscribe ... now waiting");
301+
System.out.println("Errors: " + s.getOnErrorEvents());
302+
if (s.getOnErrorEvents().size() > 0) {
303+
s.getOnErrorEvents().get(0).printStackTrace();
304+
}
305+
s.assertNoErrors();
306+
}
307+
308+
private Observable<Long> synchronousInterval() {
309+
return Observable.create(new OnSubscribe<Long>() {
310+
311+
@Override
312+
public void call(Subscriber<? super Long> subscriber) {
313+
while (!subscriber.isUnsubscribed()) {
314+
try {
315+
Thread.sleep(100);
316+
} catch (InterruptedException e) {
317+
}
318+
subscriber.onNext(1L);
319+
}
320+
}
321+
});
322+
}
323+
324+
@Test
325+
public void testConcurrency() {
326+
327+
}
328+
}

0 commit comments

Comments
 (0)