Skip to content

Commit 850deea

Browse files
ekchangakarnokd
authored andcommitted
1.x: Add cache() to Single (#4757)
* Add cache to Single * Add Experimental annotation * Update javadoc
1 parent 5e3f6c4 commit 850deea

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.functions.*;
2222
import rx.internal.operators.*;
2323
import rx.internal.util.*;
24+
import rx.observables.ConnectableObservable;
2425
import rx.observers.*;
2526
import rx.plugins.RxJavaHooks;
2627
import rx.schedulers.Schedulers;
@@ -1269,6 +1270,62 @@ public static <R> Single<R> zip(Iterable<? extends Single<?>> singles, FuncN<? e
12691270
return SingleOperatorZip.zip(iterableToArray, zipFunction);
12701271
}
12711272

1273+
/**
1274+
* Returns a Single that subscribes to this Single lazily, caches its success or error event
1275+
* and replays it to all the downstream subscribers.
1276+
* <p>
1277+
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/cache.png" alt="">
1278+
* <p>
1279+
* This is useful when you want a Single to cache its response and you can't control the
1280+
* subscribe/unsubscribe behavior of all the {@link Subscriber}s.
1281+
* <p>
1282+
* The operator subscribes only when the first downstream subscriber subscribes and maintains
1283+
* a single subscription towards this Single. In contrast, the operator family of {@link Observable#replay()}
1284+
* that return a {@link ConnectableObservable} require an explicit call to {@link ConnectableObservable#connect()}.
1285+
* <p>
1286+
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the {@code cache}
1287+
* Observer so be careful not to use this Observer on Observables that emit an infinite or very large number
1288+
* of items that will use up memory.
1289+
* A possible workaround is to apply `takeUntil` with a predicate or
1290+
* another source before (and perhaps after) the application of cache().
1291+
* <pre><code>
1292+
* AtomicBoolean shouldStop = new AtomicBoolean();
1293+
*
1294+
* source.takeUntil(v -&gt; shouldStop.get())
1295+
* .cache()
1296+
* .takeUntil(v -&gt; shouldStop.get())
1297+
* .subscribe(...);
1298+
* </code></pre>
1299+
* Since the operator doesn't allow clearing the cached values either, the possible workaround is
1300+
* to forget all references to it via {@link Observable#onTerminateDetach()} applied along with the previous
1301+
* workaround:
1302+
* <pre><code>
1303+
* AtomicBoolean shouldStop = new AtomicBoolean();
1304+
*
1305+
* source.takeUntil(v -&gt; shouldStop.get())
1306+
* .onTerminateDetach()
1307+
* .cache()
1308+
* .takeUntil(v -&gt; shouldStop.get())
1309+
* .onTerminateDetach()
1310+
* .subscribe(...);
1311+
* </code></pre>
1312+
* <dl>
1313+
* <dt><b>Backpressure:</b></dt>
1314+
* <dd>The operator consumes this Single in an unbounded fashion but respects the backpressure
1315+
* of each downstream Subscriber individually.</dd>
1316+
* <dt><b>Scheduler:</b></dt>
1317+
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
1318+
* </dl>
1319+
*
1320+
* @return a Single that, when first subscribed to, caches its response for the
1321+
* benefit of subsequent subscribers
1322+
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
1323+
*/
1324+
@Experimental
1325+
public final Single<T> cache() {
1326+
return toObservable().cacheWithInitialCapacity(1).toSingle();
1327+
}
1328+
12721329
/**
12731330
* Returns an Observable that emits the item emitted by the source Single, then the item emitted by the
12741331
* specified Single.

src/test/java/rx/SingleTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,53 @@ public void testReturnUnsubscribedWhenHookThrowsError() {
503503
assertTrue(subscription.isUnsubscribed());
504504
}
505505

506+
@Test
507+
public void testCache() throws InterruptedException {
508+
final AtomicInteger counter = new AtomicInteger();
509+
Single<String> s = Single.create(new OnSubscribe<String>() {
510+
511+
@Override
512+
public void call(final SingleSubscriber<? super String> observer) {
513+
new Thread(new Runnable() {
514+
515+
@Override
516+
public void run() {
517+
counter.incrementAndGet();
518+
observer.onSuccess("one");
519+
}
520+
}).start();
521+
}
522+
}).cache();
523+
524+
// we then expect the following 2 subscriptions to get that same value
525+
final CountDownLatch latch = new CountDownLatch(2);
526+
527+
// subscribe once
528+
s.subscribe(new Action1<String>() {
529+
530+
@Override
531+
public void call(String v) {
532+
assertEquals("one", v);
533+
latch.countDown();
534+
}
535+
});
536+
537+
// subscribe again
538+
s.subscribe(new Action1<String>() {
539+
540+
@Override
541+
public void call(String v) {
542+
assertEquals("one", v);
543+
latch.countDown();
544+
}
545+
});
546+
547+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
548+
fail("subscriptions did not receive values");
549+
}
550+
assertEquals(1, counter.get());
551+
}
552+
506553
@Test
507554
public void testCreateSuccess() {
508555
TestSubscriber<String> ts = new TestSubscriber<String>();

0 commit comments

Comments
 (0)