Skip to content

Commit eb7d33c

Browse files
authored
Support reactive scan/query operations. (#70)
* Support reactive scan/query operations.
1 parent b2be892 commit eb7d33c

File tree

4 files changed

+280
-4
lines changed

4 files changed

+280
-4
lines changed

src/main/java/com/aerospike/mapper/tools/IReactiveAeroMapper.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package com.aerospike.mapper.tools;
22

3-
import com.aerospike.client.policy.BatchPolicy;
4-
import com.aerospike.client.policy.Policy;
5-
import com.aerospike.client.policy.WritePolicy;
3+
import com.aerospike.client.policy.*;
4+
import com.aerospike.client.query.Filter;
65
import com.aerospike.client.reactor.IAerospikeReactorClient;
76
import com.aerospike.mapper.tools.virtuallist.ReactiveVirtualList;
87
import reactor.core.publisher.Flux;
@@ -56,4 +55,16 @@ public interface IReactiveAeroMapper extends IBaseAeroMapper {
5655
<T> Mono<Void> find(@NotNull Class<T> clazz, Function<T, Boolean> function);
5756

5857
IAerospikeReactorClient getReactorClient();
58+
59+
<T> Flux<T> query(@NotNull Class<T> clazz, Filter filter);
60+
61+
<T> Flux<T> query(QueryPolicy policy, @NotNull Class<T> clazz, Filter filter);
62+
63+
<T> Flux<T> scan(@NotNull Class<T> clazz);
64+
65+
<T> Flux<T> scan(ScanPolicy policy, @NotNull Class<T> clazz);
66+
67+
<T> Flux<T> scan(@NotNull Class<T> clazz, int recordsPerSecond);
68+
69+
<T> Flux<T> scan(ScanPolicy policy, @NotNull Class<T> clazz, int recordsPerSecond);
5970
}

src/main/java/com/aerospike/mapper/tools/ReactiveAeroMapper.java

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.aerospike.client.*;
44
import com.aerospike.client.policy.*;
5+
import com.aerospike.client.query.Filter;
6+
import com.aerospike.client.query.Statement;
57
import com.aerospike.client.reactor.IAerospikeReactorClient;
68
import com.aerospike.mapper.tools.configuration.ClassConfig;
79
import com.aerospike.mapper.tools.configuration.Configuration;
@@ -23,6 +25,7 @@
2325
import java.util.Arrays;
2426
import java.util.List;
2527
import java.util.Objects;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2629
import java.util.function.Function;
2730

2831
public class ReactiveAeroMapper implements IReactiveAeroMapper {
@@ -489,6 +492,106 @@ public <T> Mono<Void> find(@NotNull Class<T> clazz, Function<T, Boolean> functio
489492
});
490493
}
491494

495+
/**
496+
* Scan every record in the set associated with the passed class. Each record will be converted to the appropriate class.
497+
*
498+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
499+
*/
500+
@Override
501+
public <T> Flux<T> scan(@NotNull Class<T> clazz) {
502+
return scan(null, clazz);
503+
}
504+
505+
/**
506+
* Scan every record in the set associated with the passed class. Each record will be converted to the appropriate class.
507+
*
508+
* @param policy - the scan policy to use. If this is null, the default scan policy of the passed class will be used.
509+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
510+
*/
511+
@Override
512+
public <T> Flux<T> scan(ScanPolicy policy, @NotNull Class<T> clazz) {
513+
return scan(policy, clazz, -1);
514+
}
515+
516+
/**
517+
* Scan every record in the set associated with the passed class, limiting the throughput to the specified recordsPerSecond. Each record will be converted
518+
* to the appropriate class.
519+
*
520+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
521+
* @param recordsPerSecond - the maximum number of records to be processed every second.
522+
*/
523+
@Override
524+
public <T> Flux<T> scan(@NotNull Class<T> clazz, int recordsPerSecond) {
525+
return scan(null, clazz, recordsPerSecond);
526+
}
527+
528+
/**
529+
* Scan every record in the set associated with the passed class. Each record will be converted to the appropriate class.
530+
*
531+
* @param policy - the scan policy to use. If this is null, the default scan policy of the passed class will be used.
532+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
533+
* @param recordsPerSecond - the number of records to process per second. Set to 0 for unlimited, &gt; 0 for a finite rate, &lt; 0 for no change
534+
* (use the value from the passed policy)
535+
*/
536+
@Override
537+
public <T> Flux<T> scan(ScanPolicy policy, @NotNull Class<T> clazz, int recordsPerSecond) {
538+
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
539+
if (policy == null) {
540+
policy = entry.getScanPolicy();
541+
}
542+
if (recordsPerSecond >= 0) {
543+
// Ensure the underlying rate on the policy does not change
544+
policy = new ScanPolicy(policy);
545+
policy.recordsPerSecond = recordsPerSecond;
546+
}
547+
String namespace = entry.getNamespace();
548+
String setName = entry.getSetName();
549+
550+
return reactorClient.scanAll(policy, namespace, setName)
551+
.map(keyRecord -> this.getMappingConverter().convertToObject(clazz, keyRecord.record));
552+
}
553+
554+
/**
555+
* Perform a secondary index query with the specified query policy. Each record will be converted
556+
* to the appropriate class then passed to the processor. If the processor returns false the query is aborted
557+
* whereas if the processor returns true subsequent records (if any) are processed.
558+
* <p/>
559+
* The query policy used will be the one associated with the passed classtype.
560+
*
561+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
562+
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
563+
* associated with the passed classtype will be scanned, effectively turning the query into a scan
564+
*/
565+
@Override
566+
public <T> Flux<T> query(@NotNull Class<T> clazz, Filter filter) {
567+
return query(null, clazz, filter);
568+
}
569+
570+
/**
571+
* Perform a secondary index query with the specified query policy. Each record will be converted
572+
* to the appropriate class then passed to the processor. If the processor returns false the query is aborted
573+
* whereas if the processor returns true subsequent records (if any) are processed.
574+
*
575+
* @param policy - The query policy to use. If this parameter is not passed, the query policy associated with the passed classtype will be used
576+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
577+
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
578+
* associated with the passed classtype will be scanned, effectively turning the query into a scan
579+
*/
580+
@Override
581+
public <T> Flux<T> query(QueryPolicy policy, @NotNull Class<T> clazz, Filter filter) {
582+
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
583+
if (policy == null) {
584+
policy = entry.getQueryPolicy();
585+
}
586+
Statement statement = new Statement();
587+
statement.setFilter(filter);
588+
statement.setNamespace(entry.getNamespace());
589+
statement.setSetName(entry.getSetName());
590+
591+
return reactorClient.query(policy, statement)
592+
.map(keyRecord -> this.getMappingConverter().convertToObject(clazz, keyRecord.record));
593+
}
594+
492595
@Override
493596
public IAerospikeReactorClient getReactorClient() {
494597
return this.reactorClient;
@@ -646,7 +749,7 @@ private <T> Flux<T> readBatch(BatchPolicy batchPolicy, @NotNull Class<T> clazz,
646749

647750
private Throwable translateError(Throwable e) {
648751
if (e instanceof AerospikeException) {
649-
return translateError((AerospikeException) e);
752+
return translateError(e);
650753
}
651754
return e;
652755
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package com.aerospike.mapper.reactive;
2+
3+
import com.aerospike.client.AerospikeException;
4+
import com.aerospike.client.query.Filter;
5+
import com.aerospike.client.query.IndexType;
6+
import com.aerospike.mapper.annotations.AerospikeKey;
7+
import com.aerospike.mapper.annotations.AerospikeRecord;
8+
import com.aerospike.mapper.annotations.ParamFrom;
9+
import com.aerospike.mapper.tools.ReactiveAeroMapper;
10+
import org.junit.jupiter.api.Test;
11+
import reactor.core.scheduler.Schedulers;
12+
13+
import java.util.List;
14+
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
17+
public class ReactiveQueryTest extends ReactiveAeroMapperBaseTest {
18+
@AerospikeRecord(namespace = "test", set = "testScan")
19+
public static class A {
20+
@AerospikeKey
21+
private final int id;
22+
private final String name;
23+
private final int age;
24+
25+
public A(@ParamFrom("id") int id, @ParamFrom("name") String name, @ParamFrom("age") int age) {
26+
super();
27+
this.id = id;
28+
this.name = name;
29+
this.age = age;
30+
}
31+
32+
public int getId() {
33+
return id;
34+
}
35+
36+
public String getName() {
37+
return name;
38+
}
39+
40+
public int getAge() {
41+
return age;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return String.format("id:%d, name:%s, age:%d", id, name, age);
47+
}
48+
}
49+
50+
private ReactiveAeroMapper populate() {
51+
reactorClient.getAerospikeClient().truncate(null, "test", "testScan", null);
52+
ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
53+
reactiveMapper.save(new A(1, "Tim", 312),
54+
new A(2, "Bob", 44),
55+
new A(3, "Sue", 56),
56+
new A(4, "Rob", 23),
57+
new A(5, "Jim", 32),
58+
new A(6, "Bob", 78),
59+
new A(7, "Fred", 23),
60+
new A(8, "Wilma", 11),
61+
new A(9, "Barney", 54),
62+
new A(10, "Steve", 72),
63+
new A(11, "Bam Bam", 19),
64+
new A(12, "Betty", 34),
65+
new A(13, "Del", 7),
66+
new A(14, "Khon", 98),
67+
new A(15, "Dave", 21),
68+
new A(16, "Mike", 32),
69+
new A(17, "Darren", 14),
70+
new A(18, "Lucy", 45),
71+
new A(19, "Gertrude", 36),
72+
new A(20, "Lucinda", 63)).subscribeOn(Schedulers.parallel()).collectList().block();
73+
74+
try {
75+
reactorClient.getAerospikeClient().createIndex(null, "test", "testScan", "age_idx", "age", IndexType.NUMERIC).waitTillComplete();
76+
} catch (AerospikeException ae) {
77+
// swallow the exception
78+
}
79+
return reactiveMapper;
80+
}
81+
82+
@Test
83+
public void queryTest() {
84+
ReactiveAeroMapper reactiveMapper = populate();
85+
List<A> results = reactiveMapper.query(A.class, Filter.range("age", 30, 54)).subscribeOn(Schedulers.parallel()).collectList().block();
86+
assert results != null;
87+
assertEquals(7, results.size());
88+
}
89+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.aerospike.mapper.reactive;
2+
3+
import com.aerospike.client.exp.Exp;
4+
import com.aerospike.client.policy.ScanPolicy;
5+
import com.aerospike.mapper.annotations.AerospikeKey;
6+
import com.aerospike.mapper.annotations.AerospikeRecord;
7+
import com.aerospike.mapper.annotations.ParamFrom;
8+
import com.aerospike.mapper.tools.ReactiveAeroMapper;
9+
import org.junit.jupiter.api.Test;
10+
import reactor.core.scheduler.Schedulers;
11+
12+
import java.util.List;
13+
14+
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
16+
public class ReactiveScanTest extends ReactiveAeroMapperBaseTest {
17+
@AerospikeRecord(namespace = "test", set = "testScan")
18+
public static class Person {
19+
@AerospikeKey
20+
private final int id;
21+
private final String name;
22+
private final int age;
23+
24+
public Person(@ParamFrom("id") int id, @ParamFrom("name") String name, @ParamFrom("age") int age) {
25+
super();
26+
this.id = id;
27+
this.name = name;
28+
this.age = age;
29+
}
30+
31+
public int getId() {
32+
return id;
33+
}
34+
35+
public String getName() {
36+
return name;
37+
}
38+
39+
public int getAge() {
40+
return age;
41+
}
42+
}
43+
44+
private ReactiveAeroMapper populate() {
45+
reactorClient.getAerospikeClient().truncate(null, "test", "testScan", null);
46+
ReactiveAeroMapper reactiveMapper = new ReactiveAeroMapper.Builder(reactorClient).build();
47+
reactiveMapper.save(new Person(1, "Tim", 312),
48+
new Person(2, "Bob", 44),
49+
new Person(3, "Sue", 56),
50+
new Person(4, "Rob", 23),
51+
new Person(5, "Jim", 32),
52+
new Person(6, "Bob", 78)).subscribeOn(Schedulers.parallel()).collectList().block();
53+
return reactiveMapper;
54+
}
55+
56+
@Test
57+
public void scanTest() {
58+
ReactiveAeroMapper reactiveMapper = populate();
59+
List<Person> results = reactiveMapper.scan(Person.class).subscribeOn(Schedulers.parallel()).collectList().block();
60+
assert results != null;
61+
assertEquals(6, results.size());
62+
}
63+
64+
@Test
65+
public void scanTestWithFilter() {
66+
ReactiveAeroMapper reactiveMapper = populate();
67+
ScanPolicy scanPolicy = new ScanPolicy(reactiveMapper.getScanPolicy(Person.class));
68+
scanPolicy.filterExp = Exp.build(Exp.eq(Exp.stringBin("name"), Exp.val("Bob")));
69+
List<Person> results = reactiveMapper.scan(scanPolicy, Person.class).subscribeOn(Schedulers.parallel()).collectList().block();
70+
assert results != null;
71+
assertEquals(2, results.size());
72+
}
73+
}

0 commit comments

Comments
 (0)