Skip to content

Commit 1562be5

Browse files
authored
FMWK-496 Resolve potential memory leak related to object references (#165)
1 parent 9e89592 commit 1562be5

File tree

5 files changed

+121
-47
lines changed

5 files changed

+121
-47
lines changed

src/main/java/com/aerospike/mapper/tools/AeroMapper.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
11
package com.aerospike.mapper.tools;
22

3-
import java.lang.reflect.Array;
4-
import java.util.ArrayList;
5-
import java.util.List;
6-
import java.util.concurrent.atomic.AtomicBoolean;
7-
import java.util.function.Function;
8-
9-
import javax.validation.constraints.NotNull;
10-
113
import com.aerospike.client.AerospikeException;
124
import com.aerospike.client.AerospikeException.ScanTerminated;
135
import com.aerospike.client.Bin;
@@ -29,7 +21,13 @@
2921
import com.aerospike.mapper.tools.converters.MappingConverter;
3022
import com.aerospike.mapper.tools.utils.MapperUtils;
3123
import com.aerospike.mapper.tools.virtuallist.VirtualList;
32-
import reactor.core.publisher.Mono;
24+
25+
import javax.validation.constraints.NotNull;
26+
import java.lang.reflect.Array;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.function.Function;
3331

3432
public class AeroMapper implements IAeroMapper {
3533

@@ -42,7 +40,7 @@ private AeroMapper(@NotNull IAerospikeClient client) {
4240
}
4341

4442
/**
45-
* Create a new Builder to instantiate the AeroMapper.
43+
* Create a new Builder to instantiate the AeroMapper.
4644
* @author tfaulkes
4745
*
4846
*/
@@ -233,12 +231,10 @@ private <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key,
233231
} else {
234232
try {
235233
ThreadLocalKeySaver.save(key);
236-
LoadedObjectResolver.begin();
237234
return mappingConverter.convertToObject(clazz, key, record, entry, resolveDependencies);
238235
} catch (ReflectiveOperationException e) {
239236
throw new AerospikeException(e);
240237
} finally {
241-
LoadedObjectResolver.end();
242238
ThreadLocalKeySaver.clear();
243239
}
244240
}

src/main/java/com/aerospike/mapper/tools/LoadedObjectResolver.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
package com.aerospike.mapper.tools;
22

3+
import com.aerospike.client.Key;
4+
35
import java.util.HashMap;
46
import java.util.Map;
57

6-
import com.aerospike.client.Key;
7-
88
public class LoadedObjectResolver {
99

10-
private static class LoadedObjectMap {
11-
private int referenceCount = 0;
12-
private final Map<Key, Object> objectMap = new HashMap<>();
13-
}
14-
1510
private static final ThreadLocal<LoadedObjectMap> threadLocalObjects = ThreadLocal.withInitial(LoadedObjectMap::new);
1611

12+
private LoadedObjectResolver() {
13+
}
14+
1715
public static void begin() {
1816
LoadedObjectMap map = threadLocalObjects.get();
1917
map.referenceCount++;
@@ -23,7 +21,7 @@ public static void end() {
2321
LoadedObjectMap map = threadLocalObjects.get();
2422
map.referenceCount--;
2523
if (map.referenceCount == 0) {
26-
map.objectMap.clear();
24+
threadLocalObjects.remove();
2725
}
2826
}
2927

@@ -39,4 +37,9 @@ public static Object get(Key key) {
3937
LoadedObjectMap map = threadLocalObjects.get();
4038
return map.objectMap.get(key);
4139
}
42-
}
40+
41+
private static class LoadedObjectMap {
42+
private final Map<Key, Object> objectMap = new HashMap<>();
43+
private int referenceCount = 0;
44+
}
45+
}

src/main/java/com/aerospike/mapper/tools/ReactiveAeroMapper.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package com.aerospike.mapper.tools;
22

3-
import java.util.Arrays;
4-
import java.util.Objects;
5-
import java.util.function.Function;
6-
7-
import javax.validation.constraints.NotNull;
8-
93
import com.aerospike.client.AerospikeException;
104
import com.aerospike.client.Bin;
115
import com.aerospike.client.Key;
@@ -24,28 +18,20 @@
2418
import com.aerospike.mapper.tools.converters.MappingConverter;
2519
import com.aerospike.mapper.tools.utils.MapperUtils;
2620
import com.aerospike.mapper.tools.virtuallist.ReactiveVirtualList;
27-
2821
import reactor.core.publisher.Flux;
2922
import reactor.core.publisher.Mono;
3023

24+
import javax.validation.constraints.NotNull;
25+
import java.util.Arrays;
26+
import java.util.Objects;
27+
import java.util.function.Function;
28+
3129
public class ReactiveAeroMapper implements IReactiveAeroMapper {
3230

3331
private final IAerospikeReactorClient reactorClient;
3432
private final IAeroMapper aeroMapper;
3533
private final MappingConverter mappingConverter;
3634

37-
/**
38-
* Create a new Builder to instantiate the AeroMapper.
39-
* @author tfaulkes
40-
*
41-
*/
42-
public static class Builder extends AbstractBuilder<ReactiveAeroMapper> {
43-
public Builder(IAerospikeReactorClient reactorClient) {
44-
super(new ReactiveAeroMapper(reactorClient));
45-
ClassCache.getInstance().setReactiveDefaultPolicies(reactorClient);
46-
}
47-
}
48-
4935
private ReactiveAeroMapper(@NotNull IAerospikeReactorClient reactorClient) {
5036
this.reactorClient = reactorClient;
5137
this.aeroMapper = new AeroMapper.Builder(reactorClient.getAerospikeClient()).build();
@@ -205,7 +191,15 @@ public <T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNu
205191
return readBatch(batchPolicy, clazz, keys, entry, operations);
206192
}
207193

208-
private <T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key, @NotNull ClassCacheEntry<T> entry, boolean resolveDependencies) {
194+
@SuppressWarnings("unchecked")
195+
private <T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key,
196+
@NotNull ClassCacheEntry<T> entry, boolean resolveDependencies) {
197+
if (readPolicy == null || readPolicy.filterExp == null) {
198+
Object objectForKey = LoadedObjectResolver.get(key);
199+
if (objectForKey != null) {
200+
return Mono.just((T) objectForKey);
201+
}
202+
}
209203
if (readPolicy == null) {
210204
readPolicy = entry.getReadPolicy();
211205
}
@@ -368,7 +362,8 @@ public <T> ReactiveVirtualList<T> asBackedList(@NotNull Object object, @NotNull
368362
}
369363

370364
@Override
371-
public <T> ReactiveVirtualList<T> asBackedList(@NotNull Class<?> owningClazz, @NotNull Object key, @NotNull String binName, Class<T> elementClazz) {
365+
public <T> ReactiveVirtualList<T> asBackedList(@NotNull Class<?> owningClazz, @NotNull Object key,
366+
@NotNull String binName, Class<T> elementClazz) {
372367
return new ReactiveVirtualList<>(this, owningClazz, key, binName, elementClazz);
373368
}
374369

@@ -454,4 +449,14 @@ public Mono<Key> getRecordKey(Object obj) {
454449
ClassCacheEntry<?> entry = ClassCache.getInstance().loadClass(obj.getClass(), this);
455450
return entry == null ? null : Mono.just(new Key(entry.getNamespace(), entry.getSetName(), Value.get(entry.getKey(obj))));
456451
}
452+
453+
/**
454+
* Create a new Builder to instantiate the AeroMapper.
455+
*/
456+
public static class Builder extends AbstractBuilder<ReactiveAeroMapper> {
457+
public Builder(IAerospikeReactorClient reactorClient) {
458+
super(new ReactiveAeroMapper(reactorClient));
459+
ClassCache.getInstance().setReactiveDefaultPolicies(reactorClient);
460+
}
461+
}
457462
}

src/main/java/com/aerospike/mapper/tools/ThreadLocalKeySaver.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,32 @@
11
package com.aerospike.mapper.tools;
22

3+
import com.aerospike.client.Key;
4+
35
import java.util.ArrayDeque;
46
import java.util.Deque;
57

6-
import com.aerospike.client.Key;
7-
88
/**
9-
* Save the keys. Note that this is effectively a stack of keys, as A can load B which can load C, and C needs B's key, not A's.
10-
*
11-
* @author timfaulkes
9+
* Save the keys. Note that this is effectively a stack of keys, as A can load B which can load C, and C needs B's key,
10+
* not A's.
1211
*/
1312
public class ThreadLocalKeySaver {
13+
1414
private static final ThreadLocal<Deque<Key>> threadLocalKeys = ThreadLocal.withInitial(ArrayDeque::new);
1515

16+
private ThreadLocalKeySaver() {
17+
}
18+
1619
public static void save(Key key) {
1720
threadLocalKeys.get().addLast(key);
21+
LoadedObjectResolver.begin();
1822
}
1923

2024
public static void clear() {
25+
LoadedObjectResolver.end();
2126
threadLocalKeys.get().removeLast();
27+
if (threadLocalKeys.get().isEmpty()) {
28+
threadLocalKeys.remove();
29+
}
2230
}
2331

2432
public static Key get() {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.aerospike.mapper.reactive;
2+
3+
import com.aerospike.mapper.RecursiveObjectTest;
4+
import com.aerospike.mapper.tools.ReactiveAeroMapper;
5+
import org.junit.jupiter.api.Test;
6+
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertNotNull;
9+
10+
public class ReactiveRecursiveObjectTest extends ReactiveAeroMapperBaseTest {
11+
12+
@Test
13+
public void runTest() {
14+
RecursiveObjectTest.A a1 = new RecursiveObjectTest.A("a", 10, 1);
15+
a1.a = a1;
16+
17+
ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
18+
reactiveMapper.save(a1).block();
19+
20+
RecursiveObjectTest.A a2 = reactiveMapper.read(RecursiveObjectTest.A.class, a1.id).block();
21+
assertNotNull(a2);
22+
assertEquals(a1.age, a2.age);
23+
assertEquals(a1.name, a2.name);
24+
assertEquals(a1.id, a2.id);
25+
}
26+
27+
@Test
28+
public void runMultipleObjectTest() {
29+
RecursiveObjectTest.A a1 = new RecursiveObjectTest.A("a", 10, 1);
30+
a1.a = a1;
31+
RecursiveObjectTest.B b = new RecursiveObjectTest.B();
32+
b.id = 10;
33+
b.a = a1;
34+
35+
ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
36+
reactiveMapper.save(a1).block();
37+
reactiveMapper.save(b).block();
38+
39+
RecursiveObjectTest.B b2 = reactiveMapper.read(RecursiveObjectTest.B.class, b.id).block();
40+
assertNotNull(b2);
41+
assertEquals(b.id, b2.id);
42+
assertEquals(b.a.age, b2.a.age);
43+
assertEquals(b.a.name, b2.a.name);
44+
assertEquals(b.a.id, b2.a.id);
45+
}
46+
47+
@Test
48+
public void runTest2() {
49+
RecursiveObjectTest.A a1 = new RecursiveObjectTest.A("a", 10, 1);
50+
a1.a = new RecursiveObjectTest.A("a2", 11, 11);
51+
52+
ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
53+
reactiveMapper.save(a1, a1.a).blockLast();
54+
55+
RecursiveObjectTest.A a2 = reactiveMapper.read(RecursiveObjectTest.A.class, a1.id).block();
56+
assertNotNull(a2);
57+
assertEquals(a1.age, a2.age);
58+
assertEquals(a1.name, a2.name);
59+
assertEquals(a1.id, a2.id);
60+
assertEquals(a1.a.id, a2.a.id);
61+
}
62+
}

0 commit comments

Comments
 (0)