Skip to content

Commit 83fdfc7

Browse files
authored
Reactive object mapper (#27)
Add reactive support for Java Object Mapper.
1 parent 9230b2e commit 83fdfc7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+4273
-402
lines changed

pom.xml

+37-1
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929
<junit.version>4.13.2</junit.version>
3030
<javax.validation-api.version>2.0.1.Final</javax.validation-api.version>
3131
<aerospike-client.version>5.1.2</aerospike-client.version>
32+
<aerospike-reactor.version>5.0.7</aerospike-reactor.version>
3233
<commons-lang3.version>3.12.0</commons-lang3.version>
3334
<jackson-dataformat-yaml.version>2.12.3</jackson-dataformat-yaml.version>
35+
<lombok.version>1.18.18</lombok.version>
36+
<reactor-test.version>3.4.5</reactor-test.version>
37+
<blockhound.version>1.0.6.RELEASE</blockhound.version>
3438
</properties>
3539

3640
<licenses>
@@ -89,7 +93,12 @@
8993
<artifactId>aerospike-client</artifactId>
9094
<version>${aerospike-client.version}</version>
9195
</dependency>
92-
96+
97+
<dependency>
98+
<groupId>com.aerospike</groupId>
99+
<artifactId>aerospike-reactor-client</artifactId>
100+
<version>${aerospike-reactor.version}</version>
101+
</dependency>
93102

94103
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
95104
<dependency>
@@ -105,6 +114,33 @@
105114
<artifactId>jackson-dataformat-yaml</artifactId>
106115
<version>${jackson-dataformat-yaml.version}</version>
107116
</dependency>
117+
118+
<dependency>
119+
<groupId>org.projectlombok</groupId>
120+
<artifactId>lombok</artifactId>
121+
<version>${lombok.version}</version>
122+
<scope>provided</scope>
123+
</dependency>
124+
125+
<dependency>
126+
<groupId>io.projectreactor</groupId>
127+
<artifactId>reactor-test</artifactId>
128+
<version>${reactor-test.version}</version>
129+
<scope>test</scope>
130+
</dependency>
131+
132+
<dependency>
133+
<groupId>io.projectreactor.tools</groupId>
134+
<artifactId>blockhound</artifactId>
135+
<version>${blockhound.version}</version>
136+
<scope>test</scope>
137+
</dependency>
138+
<dependency>
139+
<groupId>org.junit.jupiter</groupId>
140+
<artifactId>junit-jupiter</artifactId>
141+
<version>RELEASE</version>
142+
<scope>test</scope>
143+
</dependency>
108144
</dependencies>
109145
<build>
110146
<plugins>

src/main/java/com/aerospike/mapper/tools/AeroMapper.java

+99-266
Large diffs are not rendered by default.

src/main/java/com/aerospike/mapper/tools/ClassCache.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.aerospike.client.policy.QueryPolicy;
1313
import com.aerospike.client.policy.ScanPolicy;
1414
import com.aerospike.client.policy.WritePolicy;
15+
import com.aerospike.client.reactor.IAerospikeReactorClient;
1516
import com.aerospike.mapper.tools.configuration.ClassConfig;
1617
import com.aerospike.mapper.tools.configuration.Configuration;
1718

@@ -44,7 +45,7 @@ private ClassCache() {
4445
}
4546
}
4647

47-
public <T> ClassCacheEntry<T> loadClass(@NotNull Class<T> clazz, AeroMapper mapper) {
48+
public <T> ClassCacheEntry<T> loadClass(@NotNull Class<T> clazz, IBaseAeroMapper mapper) {
4849
if (clazz.isPrimitive() || clazz.equals(Object.class) || clazz.equals(String.class) || clazz.equals(Character.class) || Number.class.isAssignableFrom(clazz)) {
4950
return null;
5051
}
@@ -90,6 +91,14 @@ void setDefaultPolicies(IAerospikeClient client) {
9091
this.defaultPolicies.put(PolicyType.QUERY, client.getQueryPolicyDefault());
9192
this.defaultPolicies.put(PolicyType.SCAN, client.getScanPolicyDefault());
9293
}
94+
95+
void setReactiveDefaultPolicies(IAerospikeReactorClient reactorClient) {
96+
this.defaultPolicies.put(PolicyType.READ, reactorClient.getReadPolicyDefault());
97+
this.defaultPolicies.put(PolicyType.WRITE, reactorClient.getWritePolicyDefault());
98+
this.defaultPolicies.put(PolicyType.BATCH, reactorClient.getBatchPolicyDefault());
99+
this.defaultPolicies.put(PolicyType.QUERY, reactorClient.getQueryPolicyDefault());
100+
this.defaultPolicies.put(PolicyType.SCAN, reactorClient.getScanPolicyDefault());
101+
}
93102

94103
void setDefaultPolicy(PolicyType policyType, Policy policy) {
95104
this.defaultPolicies.put(policyType, policy);
@@ -103,7 +112,6 @@ void setSpecificPolicy(PolicyType policyType, Class<?> parentClass, Policy polic
103112
this.childrenPolicies.get(policyType).put(parentClass, policy);
104113
}
105114

106-
107115
public boolean hasClass(Class<?> clazz) {
108116
return cacheMap.containsKey(clazz);
109117
}

src/main/java/com/aerospike/mapper/tools/ClassCacheEntry.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class ClassCacheEntry<T> {
4040
private final TreeMap<String, ValueType> values = new TreeMap<>();
4141
private final ClassCacheEntry<?> superClazz;
4242
private final int binCount;
43-
private final AeroMapper mapper;
43+
private final IBaseAeroMapper mapper;
4444
private Map<Integer, String> ordinals = null;
4545
private Set<String> fieldsWithOrdinals = null;
4646
private final ClassConfig classConfig;
@@ -61,7 +61,7 @@ public class ClassCacheEntry<T> {
6161

6262

6363
// package visibility only.
64-
ClassCacheEntry(@NotNull Class<T> clazz, AeroMapper mapper, ClassConfig config,
64+
ClassCacheEntry(@NotNull Class<T> clazz, IBaseAeroMapper mapper, ClassConfig config,
6565
@NotNull Policy readPolicy, @NotNull WritePolicy writePolicy,
6666
@NotNull BatchPolicy batchPolicy, @NotNull QueryPolicy queryPolicy,
6767
@NotNull ScanPolicy scanPolicy) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.aerospike.mapper.tools;
2+
3+
import com.aerospike.client.IAerospikeClient;
4+
import com.aerospike.client.policy.BatchPolicy;
5+
import com.aerospike.client.policy.Policy;
6+
import com.aerospike.client.policy.WritePolicy;
7+
import com.aerospike.mapper.tools.converters.MappingConverter;
8+
9+
import javax.validation.constraints.NotNull;
10+
import java.util.function.Function;
11+
12+
public interface IAeroMapper extends IBaseAeroMapper {
13+
14+
void save(@NotNull Object ... objects);
15+
16+
void save(@NotNull Object object, String ...binNames);
17+
18+
void save(@NotNull WritePolicy writePolicy, @NotNull Object object, String ...binNames);
19+
20+
void update(@NotNull Object object, String ... binNames);
21+
22+
<T> T readFromDigest(Policy readPolicy, @NotNull Class<T> clazz, @NotNull byte[] digest);
23+
24+
<T> T readFromDigest(Policy readPolicy, @NotNull Class<T> clazz, @NotNull byte[] digest, boolean resolveDependencies);
25+
26+
<T> T readFromDigest(@NotNull Class<T> clazz, @NotNull byte[] digest, boolean resolveDependencies);
27+
28+
<T> T readFromDigest(@NotNull Class<T> clazz, @NotNull byte[] digest);
29+
30+
<T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Object userKey);
31+
32+
<T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Object userKey, boolean resolveDependencies);
33+
34+
<T> T read(@NotNull Class<T> clazz, @NotNull Object userKey);
35+
36+
<T> T read(@NotNull Class<T> clazz, @NotNull Object userKey, boolean resolveDependencies);
37+
38+
<T> T[] read(@NotNull Class<T> clazz, @NotNull Object ... userKeys);
39+
40+
<T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object ... userKeys);
41+
42+
<T> boolean delete(@NotNull Class<T> clazz, @NotNull Object userKey);
43+
44+
<T> boolean delete(WritePolicy writePolicy, @NotNull Class<T> clazz, @NotNull Object userKey);
45+
46+
boolean delete(@NotNull Object object);
47+
48+
boolean delete(WritePolicy writePolicy, @NotNull Object object);
49+
50+
<T> VirtualList<T> asBackedList(@NotNull Object object, @NotNull String binName, Class<T> elementClazz);
51+
52+
<T> VirtualList<T> asBackedList(@NotNull Class<?> owningClazz, @NotNull Object key, @NotNull String binName, Class<T> elementClazz);
53+
54+
<T> void find(@NotNull Class<T> clazz, Function<T, Boolean> function);
55+
56+
IAerospikeClient getClient();
57+
58+
MappingConverter getMappingConverter();
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.aerospike.mapper.tools;
2+
3+
import com.aerospike.client.policy.BatchPolicy;
4+
import com.aerospike.client.policy.Policy;
5+
import com.aerospike.client.policy.ScanPolicy;
6+
import com.aerospike.client.policy.WritePolicy;
7+
8+
public interface IBaseAeroMapper {
9+
Policy getReadPolicy(Class<?> clazz);
10+
11+
WritePolicy getWritePolicy(Class<?> clazz);
12+
13+
BatchPolicy getBatchPolicy(Class<?> clazz);
14+
15+
ScanPolicy getScanPolicy(Class<?> clazz);
16+
17+
Policy getQueryPolicy(Class<?> clazz);
18+
19+
IAeroMapper asMapper();
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.aerospike.mapper.tools;
2+
3+
import com.aerospike.client.policy.BatchPolicy;
4+
import com.aerospike.client.policy.Policy;
5+
import com.aerospike.client.policy.WritePolicy;
6+
import com.aerospike.client.reactor.IAerospikeReactorClient;
7+
import com.aerospike.mapper.tools.converters.MappingConverter;
8+
import reactor.core.publisher.Flux;
9+
import reactor.core.publisher.Mono;
10+
11+
import javax.validation.constraints.NotNull;
12+
import java.util.function.Function;
13+
14+
public interface IReactiveAeroMapper extends IBaseAeroMapper {
15+
16+
<T> Flux<T> save(@NotNull T ... objects);
17+
18+
<T> Mono<T> save(@NotNull T object, String ...binNames);
19+
20+
<T> Mono<T> save(@NotNull WritePolicy writePolicy, @NotNull T object, String ...binNames);
21+
22+
<T> Mono<T> update(@NotNull T object, String ... binNames);
23+
24+
<T> Mono<T> readFromDigest(Policy readPolicy, @NotNull Class<T> clazz, @NotNull byte[] digest);
25+
26+
/**
27+
* This method should not be used except by mappers
28+
*/
29+
<T> Mono<T> readFromDigest(Policy readPolicy, @NotNull Class<T> clazz, @NotNull byte[] digest, boolean resolveDependencies);
30+
31+
<T> Mono<T> readFromDigest(@NotNull Class<T> clazz, @NotNull byte[] digest);
32+
33+
/**
34+
* This method should not be used except by mappers
35+
*/
36+
<T> Mono<T> readFromDigest(@NotNull Class<T> clazz, @NotNull byte[] digest, boolean resolveDependencies);
37+
38+
<T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Object userKey);
39+
40+
/**
41+
* This method should not be used except by mappers
42+
*/
43+
<T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Object userKey, boolean resolveDependencies);
44+
45+
<T> Mono<T> read(@NotNull Class<T> clazz, @NotNull Object userKey);
46+
47+
<T> Mono<T> read(@NotNull Class<T> clazz, @NotNull Object userKey, boolean resolveDependencies);
48+
49+
<T> Flux<T> read(@NotNull Class<T> clazz, @NotNull Object ... userKeys);
50+
51+
<T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object ... userKeys);
52+
53+
<T> Mono<Boolean> delete(@NotNull Class<T> clazz, @NotNull Object userKey);
54+
55+
<T> Mono<Boolean> delete(WritePolicy writePolicy, @NotNull Class<T> clazz, @NotNull Object userKey);
56+
57+
Mono<Boolean> delete(@NotNull Object object);
58+
59+
Mono<Boolean> delete(WritePolicy writePolicy, @NotNull Object object);
60+
61+
<T> ReactiveVirtualList<T> asBackedList(@NotNull Object object, @NotNull String binName, Class<T> elementClazz);
62+
63+
<T> ReactiveVirtualList<T> asBackedList(@NotNull Class<?> owningClazz, @NotNull Object key, @NotNull String binName, Class<T> elementClazz);
64+
65+
<T> Mono<Void> find(@NotNull Class<T> clazz, Function<T, Boolean> function);
66+
67+
IAerospikeReactorClient getReactorClient();
68+
69+
MappingConverter getMappingConverter();
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.aerospike.mapper.tools;
2+
3+
import com.aerospike.client.AerospikeException;
4+
import org.apache.commons.lang3.StringUtils;
5+
6+
public class MapperUtils {
7+
public static <T> ClassCacheEntry<T> getEntryAndValidateNamespace(Class<T> clazz, IBaseAeroMapper mapper) {
8+
ClassCacheEntry<T> entry = ClassCache.getInstance().loadClass(clazz, mapper);
9+
String namespace = null;
10+
if (entry != null) {
11+
namespace = entry.getNamespace();
12+
}
13+
if (StringUtils.isBlank(namespace)) {
14+
throw new AerospikeException("Namespace not specified to perform database operation on a record of type " + clazz.getName());
15+
}
16+
return entry;
17+
}
18+
}

src/main/java/com/aerospike/mapper/tools/PropertyDefinition.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ public enum SetterParamType {
2424
private String name;
2525
private Class<?> clazz;
2626
private TypeMapper typeMapper;
27-
private final AeroMapper mapper;
27+
private final IBaseAeroMapper mapper;
2828
private SetterParamType setterParamType = SetterParamType.NONE;
2929

30-
public PropertyDefinition(String name, AeroMapper mapper) {
30+
public PropertyDefinition(String name, IBaseAeroMapper mapper) {
3131
this.name = name;
3232
this.mapper = mapper;
3333
}

0 commit comments

Comments
 (0)