Skip to content

Commit 10d46da

Browse files
authored
Feature/batch read operations (#84)
1. Fix reactive batch read by resolving dependencies for each fetched object after subscription. 2. Batch read with operations support.
1 parent f7d8119 commit 10d46da

File tree

7 files changed

+286
-70
lines changed

7 files changed

+286
-70
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
3030
<javax.validation-api.version>2.0.1.Final</javax.validation-api.version>
3131
<aerospike-client.version>5.1.8</aerospike-client.version>
32-
<aerospike-reactor.version>5.0.7</aerospike-reactor.version>
32+
<aerospike-reactor.version>5.1.8</aerospike-reactor.version>
3333
<commons-lang3.version>3.12.0</commons-lang3.version>
3434
<jackson-dataformat-yaml.version>2.12.5</jackson-dataformat-yaml.version>
3535
<lombok.version>1.18.20</lombok.version>

src/main/java/com/aerospike/mapper/tools/AeroMapper.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
package com.aerospike.mapper.tools;
22

3-
import com.aerospike.client.AerospikeException;
3+
import com.aerospike.client.*;
44
import com.aerospike.client.AerospikeException.ScanTerminated;
5-
import com.aerospike.client.Bin;
6-
import com.aerospike.client.IAerospikeClient;
7-
import com.aerospike.client.Key;
85
import com.aerospike.client.Record;
9-
import com.aerospike.client.Value;
106
import com.aerospike.client.policy.BatchPolicy;
117
import com.aerospike.client.policy.Policy;
128
import com.aerospike.client.policy.QueryPolicy;
@@ -299,12 +295,22 @@ public <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Object us
299295
}
300296

301297
@Override
302-
public <T> T[] read(@NotNull Class<T> clazz, @NotNull Object... userKeys) throws AerospikeException {
303-
return this.read(null, clazz, userKeys);
298+
public <T> T[] read(@NotNull Class<T> clazz, @NotNull Object[] userKeys) throws AerospikeException {
299+
return read(null, clazz, userKeys);
304300
}
305301

306302
@Override
307-
public <T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object... userKeys) throws AerospikeException {
303+
public <T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys) throws AerospikeException {
304+
return read(batchPolicy, clazz, userKeys, (Operation[]) null);
305+
}
306+
307+
@Override
308+
public <T> T[] read(@NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations) {
309+
return read(null, clazz, userKeys, operations);
310+
}
311+
312+
@Override
313+
public <T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations) {
308314
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
309315
String set = entry.getSetName();
310316
Key[] keys = new Key[userKeys.length];
@@ -316,7 +322,7 @@ public <T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull O
316322
}
317323
}
318324

319-
return readBatch(batchPolicy, clazz, keys, entry);
325+
return readBatch(batchPolicy, clazz, keys, entry, operations);
320326
}
321327

322328
@SuppressWarnings({"deprecation", "unchecked"})
@@ -349,11 +355,19 @@ private <T> T read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key,
349355
}
350356

351357
@SuppressWarnings("unchecked")
352-
private <T> T[] readBatch(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Key[] keys, @NotNull ClassCacheEntry<T> entry) {
358+
private <T> T[] readBatch(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Key[] keys,
359+
@NotNull ClassCacheEntry<T> entry, Operation... operations) {
353360
if (batchPolicy == null) {
354361
batchPolicy = entry.getBatchPolicy();
355362
}
356-
Record[] records = mClient.get(batchPolicy, keys);
363+
364+
Record[] records;
365+
if (operations != null && operations.length > 0) {
366+
records = mClient.get(batchPolicy, keys, operations);
367+
} else {
368+
records = mClient.get(batchPolicy, keys);
369+
}
370+
357371
T[] results = (T[]) Array.newInstance(clazz, records.length);
358372
for (int i = 0; i < records.length; i++) {
359373
if (records[i] == null) {

src/main/java/com/aerospike/mapper/tools/IAeroMapper.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.aerospike.client.AerospikeException;
99
import com.aerospike.client.IAerospikeClient;
10+
import com.aerospike.client.Operation;
1011
import com.aerospike.client.policy.BatchPolicy;
1112
import com.aerospike.client.policy.Policy;
1213
import com.aerospike.client.policy.QueryPolicy;
@@ -131,7 +132,7 @@ public interface IAeroMapper extends IBaseAeroMapper {
131132
* @return The returned mapped records.
132133
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
133134
*/
134-
<T> T[] read(@NotNull Class<T> clazz, @NotNull Object... userKeys);
135+
<T> T[] read(@NotNull Class<T> clazz, @NotNull Object[] userKeys);
135136

136137
/**
137138
* Read a batch of records from the repository and map them to an instance of the passed class.
@@ -142,7 +143,30 @@ public interface IAeroMapper extends IBaseAeroMapper {
142143
* @return The returned mapped records.
143144
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
144145
*/
145-
<T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object... userKeys);
146+
<T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys);
147+
148+
/**
149+
* Read a batch of records from the repository using read operations in one batch call and map them to an instance of the passed class.
150+
*
151+
* @param clazz - The type of be returned.
152+
* @param userKeys - The keys of the record. The namespace and set will be derived from the values specified on the passed class.
153+
* @param operations - array of read operations on record.
154+
* @return The returned mapped records.
155+
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
156+
*/
157+
<T> T[] read(@NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations);
158+
159+
/**
160+
* Read a batch of records from the repository using read operations in one batch call and map them to an instance of the passed class.
161+
*
162+
* @param batchPolicy A given batch policy.
163+
* @param clazz - The type of be returned.
164+
* @param userKeys - The keys of the record. The namespace and set will be derived from the values specified on the passed class.
165+
* @param operations - array of read operations on record.
166+
* @return The returned mapped records.
167+
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
168+
*/
169+
<T> T[] read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations);
146170

147171
/**
148172
* Delete a record by specifying a class and a user key.
@@ -301,9 +325,9 @@ public interface IAeroMapper extends IBaseAeroMapper {
301325
* <p/>
302326
* The query policy used will be the one associated with the passed classtype.
303327
*
304-
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
305-
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
306-
* associated with the passed classtype will be scanned, effectively turning the query into a scan
328+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
329+
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
330+
* associated with the passed classtype will be scanned, effectively turning the query into a scan
307331
* @return List of records converted to the appropriate class
308332
*/
309333
<T> List<T> query(@NotNull Class<T> clazz, Filter filter);
@@ -312,10 +336,10 @@ public interface IAeroMapper extends IBaseAeroMapper {
312336
* Perform a secondary index query with the specified query policy
313337
* and returns the list of records converted to the appropriate class.
314338
*
315-
* @param policy - The query policy to use. If this parameter is not passed, the query policy associated with the passed classtype will be used
316-
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
317-
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
318-
* associated with the passed classtype will be scanned, effectively turning the query into a scan
339+
* @param policy - The query policy to use. If this parameter is not passed, the query policy associated with the passed classtype will be used
340+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
341+
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
342+
* associated with the passed classtype will be scanned, effectively turning the query into a scan
319343
* @return List of records converted to the appropriate class
320344
*/
321345
<T> List<T> query(QueryPolicy policy, @NotNull Class<T> clazz, Filter filter);

src/main/java/com/aerospike/mapper/tools/IReactiveAeroMapper.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.aerospike.mapper.tools;
22

33
import com.aerospike.client.AerospikeException;
4+
import com.aerospike.client.Operation;
45
import com.aerospike.client.policy.*;
56
import com.aerospike.client.query.Filter;
67
import com.aerospike.client.reactor.IAerospikeReactorClient;
@@ -127,7 +128,7 @@ public interface IReactiveAeroMapper extends IBaseAeroMapper {
127128
* @return The returned mapped records.
128129
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
129130
*/
130-
<T> Flux<T> read(@NotNull Class<T> clazz, @NotNull Object... userKeys);
131+
<T> Flux<T> read(@NotNull Class<T> clazz, @NotNull Object[] userKeys);
131132

132133
/**
133134
* Read a batch of records from the repository and map them to an instance of the passed class.
@@ -138,7 +139,30 @@ public interface IReactiveAeroMapper extends IBaseAeroMapper {
138139
* @return The returned mapped records.
139140
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
140141
*/
141-
<T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object... userKeys);
142+
<T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys);
143+
144+
/**
145+
* Read a batch of records from the repository using read operations in one batch call and map them to an instance of the passed class.
146+
*
147+
* @param clazz - The type of the record.
148+
* @param userKeys - The keys of the record. The namespace and set will be derived from the values specified on the passed class.
149+
* @param operations - array of read operations on record.
150+
* @return The returned mapped records.
151+
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
152+
*/
153+
<T> Flux<T> read(@NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations);
154+
155+
/**
156+
* Read a batch of records from the repository using read operations in one batch call and map them to an instance of the passed class.
157+
*
158+
* @param batchPolicy A given batch policy.
159+
* @param clazz - The type of the record.
160+
* @param userKeys - The keys of the record. The namespace and set will be derived from the values specified on the passed class.
161+
* @param operations - array of read operations on record.
162+
* @return The returned mapped records.
163+
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
164+
*/
165+
<T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations);
142166

143167
/**
144168
* Delete a record by specifying a class and a user key.

src/main/java/com/aerospike/mapper/tools/ReactiveAeroMapper.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.aerospike.client.*;
44
import com.aerospike.client.policy.*;
55
import com.aerospike.client.query.Filter;
6+
import com.aerospike.client.query.KeyRecord;
67
import com.aerospike.client.query.Statement;
78
import com.aerospike.client.reactor.IAerospikeReactorClient;
89
import com.aerospike.mapper.tools.configuration.ClassConfig;
@@ -281,13 +282,22 @@ public <T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Obj
281282
}
282283

283284
@Override
284-
public <T> Flux<T> read(@NotNull Class<T> clazz, @NotNull Object... userKeys) {
285-
throw new UnsupportedOperationException("Batch reading is not supported in ReactiveAeroMapper yet.");
286-
//return this.read(null, clazz, userKeys);
285+
public <T> Flux<T> read(@NotNull Class<T> clazz, @NotNull Object[] userKeys) {
286+
return read(null, clazz, userKeys);
287287
}
288288

289289
@Override
290-
public <T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object... userKeys) {
290+
public <T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys) {
291+
return read(null, clazz, userKeys, (Operation[]) null);
292+
}
293+
294+
@Override
295+
public <T> Flux<T> read(@NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations) {
296+
return read(null, clazz, userKeys, operations);
297+
}
298+
299+
@Override
300+
public <T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Object[] userKeys, Operation... operations) {
291301
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
292302
String set = entry.getSetName();
293303
Key[] keys = new Key[userKeys.length];
@@ -299,7 +309,7 @@ public <T> Flux<T> read(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNu
299309
}
300310
}
301311

302-
return readBatch(batchPolicy, clazz, keys, entry);
312+
return readBatch(batchPolicy, clazz, keys, entry, operations);
303313
}
304314

305315
private <T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Key key, @NotNull ClassCacheEntry<T> entry, boolean resolveDependencies) {
@@ -322,32 +332,38 @@ private <T> Mono<T> read(Policy readPolicy, @NotNull Class<T> clazz, @NotNull Ke
322332
});
323333
}
324334

325-
private <T> Flux<T> readBatch(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Key[] keys, @NotNull ClassCacheEntry<T> entry) {
335+
private <T> Flux<T> readBatch(BatchPolicy batchPolicy, @NotNull Class<T> clazz, @NotNull Key[] keys,
336+
@NotNull ClassCacheEntry<T> entry, Operation... operations) {
326337
if (batchPolicy == null) {
327338
batchPolicy = entry.getBatchPolicy();
328339
}
329340

330-
Flux<T> results = reactorClient
331-
.getFlux(batchPolicy, keys)
332-
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
341+
Flux<KeyRecord> keyRecordFlux;
342+
343+
if (operations != null && operations.length > 0) {
344+
keyRecordFlux = reactorClient
345+
.getFlux(batchPolicy, keys, operations);
346+
} else {
347+
keyRecordFlux = reactorClient
348+
.getFlux(batchPolicy, keys);
349+
}
350+
351+
return keyRecordFlux.filter(keyRecord -> Objects.nonNull(keyRecord.record))
333352
.map(keyRecord -> {
334353
try {
335354
ThreadLocalKeySaver.save(keyRecord.key);
336-
return mappingConverter.convertToObject(clazz, keyRecord.record, entry, false);
355+
return mappingConverter.convertToObject(clazz, keyRecord.record, entry, true);
337356
} catch (ReflectiveOperationException e) {
338357
throw new AerospikeException(e);
339358
} finally {
340359
ThreadLocalKeySaver.clear();
341360
}
342361
});
343-
344-
mappingConverter.resolveDependencies(entry);
345-
return results;
346362
}
347363

348364
@Override
349365
public <T> Mono<Boolean> delete(@NotNull Class<T> clazz, @NotNull Object userKey) {
350-
return this.delete(null, clazz, userKey);
366+
return delete(null, clazz, userKey);
351367
}
352368

353369
@Override
@@ -430,7 +446,7 @@ public <T> Flux<T> scan(ScanPolicy policy, @NotNull Class<T> clazz, int recordsP
430446
String setName = entry.getSetName();
431447

432448
return reactorClient.scanAll(policy, namespace, setName)
433-
.map(keyRecord -> this.getMappingConverter().convertToObject(clazz, keyRecord.record));
449+
.map(keyRecord -> getMappingConverter().convertToObject(clazz, keyRecord.record));
434450
}
435451

436452
@Override
@@ -450,7 +466,7 @@ public <T> Flux<T> query(QueryPolicy policy, @NotNull Class<T> clazz, Filter fil
450466
statement.setSetName(entry.getSetName());
451467

452468
return reactorClient.query(policy, statement)
453-
.map(keyRecord -> this.getMappingConverter().convertToObject(clazz, keyRecord.record));
469+
.map(keyRecord -> getMappingConverter().convertToObject(clazz, keyRecord.record));
454470
}
455471

456472
@Override
@@ -465,12 +481,12 @@ public <T> ReactiveVirtualList<T> asBackedList(@NotNull Class<?> owningClazz, @N
465481

466482
@Override
467483
public IAerospikeReactorClient getReactorClient() {
468-
return this.reactorClient;
484+
return reactorClient;
469485
}
470486

471487
@Override
472488
public MappingConverter getMappingConverter() {
473-
return this.mappingConverter;
489+
return mappingConverter;
474490
}
475491

476492
@Override

0 commit comments

Comments
 (0)