Skip to content

Commit 5106a20

Browse files
authored
2.x: Fix bounded replay() memory leak due to bad node retention (#6371)
1 parent a85ddd1 commit 5106a20

File tree

6 files changed

+260
-8
lines changed

6 files changed

+260
-8
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,8 @@ public void dispose() {
566566
// the others had non-zero. By removing this 'blocking' child, the others
567567
// are now free to receive events
568568
parent.manageRequests();
569+
// make sure the last known node is not retained
570+
index = null;
569571
}
570572
}
571573
/**
@@ -824,6 +826,7 @@ public final void replay(InnerSubscription<T> output) {
824826
}
825827
for (;;) {
826828
if (output.isDisposed()) {
829+
output.index = null;
827830
return;
828831
}
829832

@@ -864,6 +867,7 @@ public final void replay(InnerSubscription<T> output) {
864867
break;
865868
}
866869
if (output.isDisposed()) {
870+
output.index = null;
867871
return;
868872
}
869873
}

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,8 @@ public void dispose() {
453453
cancelled = true;
454454
// remove this from the parent
455455
parent.remove(this);
456+
// make sure the last known node is not retained
457+
index = null;
456458
}
457459
}
458460
/**
@@ -686,6 +688,7 @@ public final void replay(InnerDisposable<T> output) {
686688

687689
for (;;) {
688690
if (output.isDisposed()) {
691+
output.index = null;
689692
return;
690693
}
691694

src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static org.mockito.ArgumentMatchers.*;
1818
import static org.mockito.Mockito.*;
1919

20+
import java.lang.management.*;
2021
import java.util.*;
2122
import java.util.concurrent.*;
2223
import java.util.concurrent.atomic.*;
@@ -1976,4 +1977,67 @@ public void currentDisposedWhenConnecting() {
19761977

19771978
assertFalse(fr.current.get().isDisposed());
19781979
}
1980+
1981+
@Test
1982+
public void noBoundedRetentionViaThreadLocal() throws Exception {
1983+
Flowable<byte[]> source = Flowable.range(1, 200)
1984+
.map(new Function<Integer, byte[]>() {
1985+
@Override
1986+
public byte[] apply(Integer v) throws Exception {
1987+
return new byte[1024 * 1024];
1988+
}
1989+
})
1990+
.replay(new Function<Flowable<byte[]>, Publisher<byte[]>>() {
1991+
@Override
1992+
public Publisher<byte[]> apply(final Flowable<byte[]> f) throws Exception {
1993+
return f.take(1)
1994+
.concatMap(new Function<byte[], Publisher<byte[]>>() {
1995+
@Override
1996+
public Publisher<byte[]> apply(byte[] v) throws Exception {
1997+
return f;
1998+
}
1999+
});
2000+
}
2001+
}, 1)
2002+
.takeLast(1)
2003+
;
2004+
2005+
System.out.println("Bounded Replay Leak check: Wait before GC");
2006+
Thread.sleep(1000);
2007+
2008+
System.out.println("Bounded Replay Leak check: GC");
2009+
System.gc();
2010+
2011+
Thread.sleep(500);
2012+
2013+
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
2014+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
2015+
long initial = memHeap.getUsed();
2016+
2017+
System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
2018+
2019+
final AtomicLong after = new AtomicLong();
2020+
2021+
source.subscribe(new Consumer<byte[]>() {
2022+
@Override
2023+
public void accept(byte[] v) throws Exception {
2024+
System.out.println("Bounded Replay Leak check: Wait before GC 2");
2025+
Thread.sleep(1000);
2026+
2027+
System.out.println("Bounded Replay Leak check: GC 2");
2028+
System.gc();
2029+
2030+
Thread.sleep(500);
2031+
2032+
after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
2033+
}
2034+
});
2035+
2036+
System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);
2037+
2038+
if (initial + 100 * 1024 * 1024 < after.get()) {
2039+
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
2040+
+ " -> " + after.get() / 1024.0 / 1024.0);
2041+
}
2042+
}
19792043
}

src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
import static org.mockito.ArgumentMatchers.*;
1818
import static org.mockito.Mockito.*;
1919

20+
import java.lang.management.*;
2021
import java.util.*;
2122
import java.util.concurrent.*;
22-
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.*;
2324

2425
import org.junit.*;
2526
import org.mockito.InOrder;
@@ -1713,4 +1714,66 @@ public void noHeadRetentionTime() {
17131714

17141715
assertSame(o, buf.get());
17151716
}
1716-
}
1717+
1718+
@Test
1719+
public void noBoundedRetentionViaThreadLocal() throws Exception {
1720+
Observable<byte[]> source = Observable.range(1, 200)
1721+
.map(new Function<Integer, byte[]>() {
1722+
@Override
1723+
public byte[] apply(Integer v) throws Exception {
1724+
return new byte[1024 * 1024];
1725+
}
1726+
})
1727+
.replay(new Function<Observable<byte[]>, Observable<byte[]>>() {
1728+
@Override
1729+
public Observable<byte[]> apply(final Observable<byte[]> o) throws Exception {
1730+
return o.take(1)
1731+
.concatMap(new Function<byte[], Observable<byte[]>>() {
1732+
@Override
1733+
public Observable<byte[]> apply(byte[] v) throws Exception {
1734+
return o;
1735+
}
1736+
});
1737+
}
1738+
}, 1)
1739+
.takeLast(1)
1740+
;
1741+
1742+
System.out.println("Bounded Replay Leak check: Wait before GC");
1743+
Thread.sleep(1000);
1744+
1745+
System.out.println("Bounded Replay Leak check: GC");
1746+
System.gc();
1747+
1748+
Thread.sleep(500);
1749+
1750+
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
1751+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
1752+
long initial = memHeap.getUsed();
1753+
1754+
System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
1755+
1756+
final AtomicLong after = new AtomicLong();
1757+
1758+
source.subscribe(new Consumer<byte[]>() {
1759+
@Override
1760+
public void accept(byte[] v) throws Exception {
1761+
System.out.println("Bounded Replay Leak check: Wait before GC 2");
1762+
Thread.sleep(1000);
1763+
1764+
System.out.println("Bounded Replay Leak check: GC 2");
1765+
System.gc();
1766+
1767+
Thread.sleep(500);
1768+
1769+
after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
1770+
}
1771+
});
1772+
1773+
System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);
1774+
1775+
if (initial + 100 * 1024 * 1024 < after.get()) {
1776+
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
1777+
+ " -> " + after.get() / 1024.0 / 1024.0);
1778+
}
1779+
}}

src/test/java/io/reactivex/processors/ReplayProcessorTest.java

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@
1717
import static org.mockito.ArgumentMatchers.*;
1818
import static org.mockito.Mockito.*;
1919

20+
import java.lang.management.*;
2021
import java.util.Arrays;
2122
import java.util.concurrent.*;
22-
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.concurrent.atomic.*;
2324

24-
import org.junit.Test;
25+
import org.junit.*;
2526
import org.mockito.*;
2627
import org.reactivestreams.*;
2728

2829
import io.reactivex.*;
2930
import io.reactivex.disposables.Disposable;
3031
import io.reactivex.exceptions.TestException;
31-
import io.reactivex.functions.Function;
32+
import io.reactivex.functions.*;
3233
import io.reactivex.internal.subscriptions.BooleanSubscription;
3334
import io.reactivex.processors.ReplayProcessor.*;
3435
import io.reactivex.schedulers.*;
@@ -1692,4 +1693,62 @@ public void noHeadRetentionTime() {
16921693
public void invalidRequest() {
16931694
TestHelper.assertBadRequestReported(ReplayProcessor.create());
16941695
}
1696+
1697+
@Test
1698+
public void noBoundedRetentionViaThreadLocal() throws Exception {
1699+
final ReplayProcessor<byte[]> rp = ReplayProcessor.createWithSize(1);
1700+
1701+
Flowable<byte[]> source = rp.take(1)
1702+
.concatMap(new Function<byte[], Publisher<byte[]>>() {
1703+
@Override
1704+
public Publisher<byte[]> apply(byte[] v) throws Exception {
1705+
return rp;
1706+
}
1707+
})
1708+
.takeLast(1)
1709+
;
1710+
1711+
System.out.println("Bounded Replay Leak check: Wait before GC");
1712+
Thread.sleep(1000);
1713+
1714+
System.out.println("Bounded Replay Leak check: GC");
1715+
System.gc();
1716+
1717+
Thread.sleep(500);
1718+
1719+
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
1720+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
1721+
long initial = memHeap.getUsed();
1722+
1723+
System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
1724+
1725+
final AtomicLong after = new AtomicLong();
1726+
1727+
source.subscribe(new Consumer<byte[]>() {
1728+
@Override
1729+
public void accept(byte[] v) throws Exception {
1730+
System.out.println("Bounded Replay Leak check: Wait before GC 2");
1731+
Thread.sleep(1000);
1732+
1733+
System.out.println("Bounded Replay Leak check: GC 2");
1734+
System.gc();
1735+
1736+
Thread.sleep(500);
1737+
1738+
after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
1739+
}
1740+
});
1741+
1742+
for (int i = 0; i < 200; i++) {
1743+
rp.onNext(new byte[1024 * 1024]);
1744+
}
1745+
rp.onComplete();
1746+
1747+
System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);
1748+
1749+
if (initial + 100 * 1024 * 1024 < after.get()) {
1750+
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
1751+
+ " -> " + after.get() / 1024.0 / 1024.0);
1752+
}
1753+
}
16951754
}

src/test/java/io/reactivex/subjects/ReplaySubjectTest.java

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717
import static org.mockito.ArgumentMatchers.*;
1818
import static org.mockito.Mockito.*;
1919

20+
import java.lang.management.*;
2021
import java.util.Arrays;
2122
import java.util.concurrent.*;
22-
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.concurrent.atomic.*;
2324

24-
import org.junit.Test;
25+
import org.junit.*;
2526
import org.mockito.*;
2627

2728
import io.reactivex.*;
2829
import io.reactivex.disposables.*;
2930
import io.reactivex.exceptions.TestException;
30-
import io.reactivex.functions.Function;
31+
import io.reactivex.functions.*;
3132
import io.reactivex.observers.*;
3233
import io.reactivex.schedulers.*;
3334
import io.reactivex.subjects.ReplaySubject.*;
@@ -1284,4 +1285,62 @@ public void noHeadRetentionTime() {
12841285

12851286
assertSame(o, buf.head);
12861287
}
1288+
1289+
@Test
1290+
public void noBoundedRetentionViaThreadLocal() throws Exception {
1291+
final ReplaySubject<byte[]> rs = ReplaySubject.createWithSize(1);
1292+
1293+
Observable<byte[]> source = rs.take(1)
1294+
.concatMap(new Function<byte[], Observable<byte[]>>() {
1295+
@Override
1296+
public Observable<byte[]> apply(byte[] v) throws Exception {
1297+
return rs;
1298+
}
1299+
})
1300+
.takeLast(1)
1301+
;
1302+
1303+
System.out.println("Bounded Replay Leak check: Wait before GC");
1304+
Thread.sleep(1000);
1305+
1306+
System.out.println("Bounded Replay Leak check: GC");
1307+
System.gc();
1308+
1309+
Thread.sleep(500);
1310+
1311+
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
1312+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
1313+
long initial = memHeap.getUsed();
1314+
1315+
System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
1316+
1317+
final AtomicLong after = new AtomicLong();
1318+
1319+
source.subscribe(new Consumer<byte[]>() {
1320+
@Override
1321+
public void accept(byte[] v) throws Exception {
1322+
System.out.println("Bounded Replay Leak check: Wait before GC 2");
1323+
Thread.sleep(1000);
1324+
1325+
System.out.println("Bounded Replay Leak check: GC 2");
1326+
System.gc();
1327+
1328+
Thread.sleep(500);
1329+
1330+
after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
1331+
}
1332+
});
1333+
1334+
for (int i = 0; i < 200; i++) {
1335+
rs.onNext(new byte[1024 * 1024]);
1336+
}
1337+
rs.onComplete();
1338+
1339+
System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);
1340+
1341+
if (initial + 100 * 1024 * 1024 < after.get()) {
1342+
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
1343+
+ " -> " + after.get() / 1024.0 / 1024.0);
1344+
}
1345+
}
12871346
}

0 commit comments

Comments
 (0)