Skip to content

Commit 75ffcbb

Browse files
authored
1.x: fix replay() retaining reference to the child Subscriber (#4229)
1 parent 479df31 commit 75ffcbb

File tree

2 files changed

+103
-5
lines changed

2 files changed

+103
-5
lines changed

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@ boolean add(InnerProducer<T> producer) {
410410
* Atomically removes the given producer from the producers array.
411411
* @param producer the producer to remove
412412
*/
413+
@SuppressWarnings("unchecked")
413414
void remove(InnerProducer<T> producer) {
414415
if (terminated) {
415416
return;
@@ -419,6 +420,9 @@ void remove(InnerProducer<T> producer) {
419420
return;
420421
}
421422
producers.remove(producer);
423+
if (producers.isEmpty()) {
424+
producersCache = EMPTY;
425+
}
422426
producersVersion++;
423427
}
424428
}
@@ -643,7 +647,7 @@ static final class InnerProducer<T> extends AtomicLong implements Producer, Subs
643647
*/
644648
final ReplaySubscriber<T> parent;
645649
/** The actual child subscriber. */
646-
final Subscriber<? super T> child;
650+
Subscriber<? super T> child;
647651
/**
648652
* Holds an object that represents the current location in the buffer.
649653
* Guarded by the emitter loop.
@@ -784,6 +788,8 @@ public void unsubscribe() {
784788
// the others had non-zero. By removing this 'blocking' child, the others
785789
// are now free to receive events
786790
parent.manageRequests(this);
791+
// break the reference
792+
child = null;
787793
}
788794
}
789795
}
@@ -878,20 +884,25 @@ public void replay(InnerProducer<T> output) {
878884
Integer destIndexObject = output.index();
879885
int destIndex = destIndexObject != null ? destIndexObject : 0;
880886

887+
Subscriber<? super T> child = output.child;
888+
if (child == null) {
889+
return;
890+
}
891+
881892
long r = output.get();
882893
long e = 0L;
883894

884895
while (e != r && destIndex < sourceIndex) {
885896
Object o = get(destIndex);
886897
try {
887-
if (nl.accept(output.child, o)) {
898+
if (nl.accept(child, o)) {
888899
return;
889900
}
890901
} catch (Throwable err) {
891902
Exceptions.throwIfFatal(err);
892903
output.unsubscribe();
893904
if (!nl.isError(o) && !nl.isCompleted(o)) {
894-
output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
905+
child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
895906
}
896907
return;
897908
}
@@ -1066,6 +1077,11 @@ public final void replay(InnerProducer<T> output) {
10661077
return;
10671078
}
10681079

1080+
Subscriber<? super T> child = output.child;
1081+
if (child == null) {
1082+
return;
1083+
}
1084+
10691085
long r = output.get();
10701086
long e = 0L;
10711087

@@ -1074,7 +1090,7 @@ public final void replay(InnerProducer<T> output) {
10741090
if (v != null) {
10751091
Object o = leaveTransform(v.value);
10761092
try {
1077-
if (nl.accept(output.child, o)) {
1093+
if (nl.accept(child, o)) {
10781094
output.index = null;
10791095
return;
10801096
}
@@ -1083,7 +1099,7 @@ public final void replay(InnerProducer<T> output) {
10831099
Exceptions.throwIfFatal(err);
10841100
output.unsubscribe();
10851101
if (!nl.isError(o) && !nl.isCompleted(o)) {
1086-
output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
1102+
child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
10871103
}
10881104
return;
10891105
}

src/test/java/rx/internal/operators/OperatorReplayTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.lang.management.*;
2223
import java.util.*;
2324
import java.util.concurrent.*;
2425
import java.util.concurrent.atomic.*;
@@ -1495,4 +1496,85 @@ public void timeSizeDefaultScheduler() {
14951496
ts.assertNoErrors();
14961497
ts.assertCompleted();
14971498
}
1499+
1500+
void replayNoRetention(Func1<Observable<Integer>, ConnectableObservable<Integer>> replayOp) throws InterruptedException {
1501+
System.gc();
1502+
1503+
Thread.sleep(500);
1504+
1505+
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
1506+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
1507+
long initial = memHeap.getUsed();
1508+
1509+
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
1510+
1511+
PublishSubject<Integer> ps = PublishSubject.create();
1512+
1513+
ConnectableObservable<Integer> co = replayOp.call(ps);
1514+
1515+
Subscription s = co.subscribe(new Action1<Integer>() {
1516+
int[] array = new int[1024 * 1024 * 32];
1517+
1518+
@Override
1519+
public void call(Integer t) {
1520+
System.out.println(array.length);
1521+
}
1522+
});
1523+
1524+
co.connect();
1525+
ps.onNext(1);
1526+
1527+
memHeap = memoryMXBean.getHeapMemoryUsage();
1528+
long middle = memHeap.getUsed();
1529+
1530+
System.out.printf("Starting: %.3f MB%n", middle / 1024.0 / 1024.0);
1531+
1532+
s.unsubscribe();
1533+
s = null;
1534+
1535+
System.gc();
1536+
1537+
Thread.sleep(500);
1538+
1539+
memHeap = memoryMXBean.getHeapMemoryUsage();
1540+
long finish = memHeap.getUsed();
1541+
1542+
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
1543+
1544+
if (finish > initial * 5) {
1545+
fail(String.format("Leak: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, middle / 1024 / 1024.0, finish / 1024 / 1024d));
1546+
}
1547+
1548+
}
1549+
1550+
@Test
1551+
public void replayNoRetentionUnbounded() throws Exception {
1552+
replayNoRetention(new Func1<Observable<Integer>, ConnectableObservable<Integer>>() {
1553+
@Override
1554+
public ConnectableObservable<Integer> call(Observable<Integer> o) {
1555+
return o.replay();
1556+
}
1557+
});
1558+
}
1559+
1560+
@Test
1561+
public void replayNoRetentionSizeBound() throws Exception {
1562+
replayNoRetention(new Func1<Observable<Integer>, ConnectableObservable<Integer>>() {
1563+
@Override
1564+
public ConnectableObservable<Integer> call(Observable<Integer> o) {
1565+
return o.replay(1);
1566+
}
1567+
});
1568+
}
1569+
1570+
@Test
1571+
public void replayNoRetentionTimebound() throws Exception {
1572+
replayNoRetention(new Func1<Observable<Integer>, ConnectableObservable<Integer>>() {
1573+
@Override
1574+
public ConnectableObservable<Integer> call(Observable<Integer> o) {
1575+
return o.replay(1, TimeUnit.DAYS);
1576+
}
1577+
});
1578+
}
1579+
14981580
}

0 commit comments

Comments
 (0)