Skip to content

Commit 0dc4ba3

Browse files
authored
Added Scan and Query functionality (#68)
* Added Scan and Query functionality
1 parent 118842f commit 0dc4ba3

File tree

6 files changed

+476
-3
lines changed

6 files changed

+476
-3
lines changed

README.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ The documentation for this project can be found on [javadoc.io](https://www.java
3838
+ 10.1.3. [Embed Structure](#Embed-Structure)
3939
+ 10.1.4. [Reference Structure](#Reference-Structure)
4040
11. [Virtual Lists](#Virtual-Lists)
41+
12. [Scans](#Scans)
42+
13. [Queries](#Queries)
4143

4244
# Compatibility with Aerospike Clients
4345

@@ -2168,3 +2170,98 @@ public <T> T convertToObject(Class<T> clazz, Record record);
21682170

21692171
Note: At the moment not all CDT operations are supported, and if the underlying CDTs are of the wrong type, a different API call may be used. For example, if you invoke `getByKeyRange` on items represented in the database as a list, `getByValueRange` is invoked instead as a list has no key.
21702172

2173+
## Scans
2174+
Scans can be used to process every record in a set. The scan iterates through every item in the set and invokes a callback for every item in the set. For example:
2175+
2176+
```java
2177+
mapper.scan(Pseron.class, (person) -> {
2178+
// ... process person
2179+
return true;
2180+
});
2181+
```
2182+
2183+
If the processing method returns true, the scan continues. However, if the processing method returns false the scan will abort. Note that if the scan policy calls for multi-threading of the scans, the callback method may be invoked by multiple threads at once and hence must be thread safe. If one thread aborts the scan, other threads already in the processing method will finish processing their records.
2184+
2185+
Note that if you want to process only some records in a set you can attach an Expression on the optional policy passed to the scan. For example, if there is a `Person` class:
2186+
2187+
```java
2188+
@AerospikeRecord(namespace = "test", set = "testScan")
2189+
public class Person {
2190+
@AerospikeKey
2191+
private int id;
2192+
private String name;
2193+
private int age;
2194+
2195+
public Person(@ParamFrom("id") int id, @ParamFrom("name") String name, @ParamFrom("age") int age) {
2196+
super();
2197+
this.id = id;
2198+
this.name = name;
2199+
this.age = age;
2200+
}
2201+
2202+
public int getId() {
2203+
return id;
2204+
}
2205+
2206+
public String getName() {
2207+
return name;
2208+
}
2209+
2210+
public int getAge() {
2211+
return age;
2212+
}
2213+
}
2214+
```
2215+
2216+
and then several people are inserted:
2217+
2218+
```java
2219+
mapper.save(new Person(1, "Tim", 312),
2220+
new Person(2, "Bob", 44),
2221+
new Person(3, "Sue", 56),
2222+
new Person(4, "Rob", 23),
2223+
new Person(5, "Jim", 32),
2224+
new Person(6, "Bob", 78));
2225+
```
2226+
2227+
As a contrived example, let's say we want to count the number of people in the set called "Bob". We can simply do:
2228+
2229+
```java
2230+
AtomicInteger counter = new AtomicInteger(0);
2231+
ScanPolicy scanPolicy = new ScanPolicy(mapper.getScanPolicy(Person.class));
2232+
scanPolicy.filterExp = Exp.build(Exp.eq(Exp.stringBin("name"), Exp.val("Bob")));
2233+
mapper.scan(scanPolicy, Person.class, (person) -> {
2234+
counter.incrementAndGet();
2235+
return true;
2236+
});
2237+
```
2238+
2239+
Note that when we altered the ScanPolicy, we had to make a copy of it first. If we fail to do this, the ScanPolicy will be altered for all subsequent calls. To clarify, the **wrong** way to set the scan policy is
2240+
2241+
```java
2242+
ScanPolicy scanPolicy = mapper.getScanPolicy(Person.class);
2243+
scanPolicy.filterExp = Exp.build(Exp.eq(Exp.stringBin("name"), Exp.val("Bob")));
2244+
```
2245+
2246+
and the **right** way to set an expression is
2247+
2248+
```java
2249+
ScanPolicy scanPolicy = new ScanPolicy(mapper.getScanPolicy(Person.class));
2250+
scanPolicy.filterExp = Exp.build(Exp.eq(Exp.stringBin("name"), Exp.val("Bob")));
2251+
```
2252+
2253+
## Queries
2254+
2255+
Similar to Scans, Queries can processed using the AeroMapper. Syntactically, the only difference between a query and a scan is the addition of a `Filter` on the Query which dictates the criteria of the query. A secondary index must be defined on the Bin referenced in the Filter or an error will be thrown. If no filter is passed, the query will be turned into a scan.
2256+
2257+
Similar to Scans, returning `false` on the processing method will abort the Query and process no further records, and additional filter criteria can be added using Expressions on the QueryPolicy.
2258+
2259+
```java
2260+
mapper.query(A.class, (a) -> {
2261+
System.out.println(a);
2262+
counter.incrementAndGet();
2263+
return true;
2264+
}, Filter.range("age", 30, 54));
2265+
2266+
```
2267+

src/main/java/com/aerospike/mapper/tools/AeroMapper.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import java.lang.reflect.Array;
77
import java.util.ArrayList;
88
import java.util.List;
9+
import java.util.concurrent.atomic.AtomicBoolean;
910
import java.util.function.Function;
1011

1112
import javax.validation.constraints.NotNull;
1213

1314
import org.apache.commons.lang3.StringUtils;
1415

1516
import com.aerospike.client.AerospikeException;
17+
import com.aerospike.client.AerospikeException.ScanTerminated;
1618
import com.aerospike.client.Bin;
1719
import com.aerospike.client.IAerospikeClient;
1820
import com.aerospike.client.Key;
@@ -24,6 +26,7 @@
2426
import com.aerospike.client.policy.RecordExistsAction;
2527
import com.aerospike.client.policy.ScanPolicy;
2628
import com.aerospike.client.policy.WritePolicy;
29+
import com.aerospike.client.query.Filter;
2730
import com.aerospike.client.query.RecordSet;
2831
import com.aerospike.client.query.Statement;
2932
import com.aerospike.mapper.tools.ClassCache.PolicyType;
@@ -613,6 +616,153 @@ public <T> void find(@NotNull Class<T> clazz, Function<T, Boolean> function) thr
613616
}
614617
}
615618

619+
/**
620+
* Scan every record in the set associated with the passed class. Each record will be converted to the appropriate class then passed to the
621+
* processor. If the processor returns true, more records will be processed and if the processor returns false, the scan is aborted.
622+
* <p/>
623+
* Depending on the ScanPolicy set up for this class, it is possible for the processor to be called by multiple different
624+
* threads concurrently, so the processor should be thread-safe
625+
*
626+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
627+
* @param processor - the Processor used to process each record
628+
*/
629+
@Override
630+
public <T> void scan(@NotNull Class<T> clazz, @NotNull Processor<T> processor) {
631+
scan(null, clazz, processor);
632+
}
633+
634+
/**
635+
* Scan every record in the set associated with the passed class. Each record will be converted to the appropriate class then passed to the
636+
* processor. If the processor returns true, more records will be processed and if the processor returns false, the scan is aborted.
637+
* <p/>
638+
* Depending on the policy passed or set as the ScanPolicy for this class, it is possible for the processor to be called by multiple different
639+
* threads concurrently, so the processor should be thread-safe. Note that as a consequence of this, if the processor returns false to abort the
640+
* scan there is a chance that records are being concurrently processed in other threads and this processing will not be interrupted.
641+
* <p/>
642+
*
643+
* @param policy - the scan policy to use. If this is null, the default scan policy of the passed class will be used.
644+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
645+
* @param processor - the Processor used to process each record
646+
*/
647+
@Override
648+
public <T> void scan(ScanPolicy policy, @NotNull Class<T> clazz, @NotNull Processor<T> processor) {
649+
scan(policy, clazz, processor, -1);
650+
}
651+
652+
/**
653+
* Scan every record in the set associated with the passed class, limiting the throughput to the specified recordsPerSecond. Each record will be converted
654+
* to the appropriate class then passed to the
655+
* processor. If the processor returns true, more records will be processed and if the processor returns false, the scan is aborted.
656+
* <p/>
657+
* Depending on the ScanPolicy set up for this class, it is possible for the processor to be called by multiple different
658+
* threads concurrently, so the processor should be thread-safe
659+
*
660+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
661+
* @param processor - the Processor used to process each record
662+
* @param recordsPerSecond - the maximum number of records to be processed every second.
663+
*/
664+
@Override
665+
public <T> void scan(@NotNull Class<T> clazz, @NotNull Processor<T> processor, int recordsPerSecond) {
666+
scan(null, clazz, processor, recordsPerSecond);
667+
}
668+
669+
/**
670+
* Scan every record in the set associated with the passed class. Each record will be converted to the appropriate class then passed to the
671+
* processor. If the processor returns true, more records will be processed and if the processor returns false, the scan is aborted.
672+
* <p/>
673+
* Depending on the policy passed or set as the ScanPolicy for this class, it is possible for the processor to be called by multiple different
674+
* threads concurrently, so the processor should be thread-safe. Note that as a consequence of this, if the processor returns false to abort the
675+
* scan there is a chance that records are being concurrently processed in other threads and this processing will not be interrupted.
676+
* <p/>
677+
*
678+
* @param policy - the scan policy to use. If this is null, the default scan policy of the passed class will be used.
679+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
680+
* @param processor - the Processor used to process each record
681+
* @param recordsPerSecond - the number of records to process per second. Set to 0 for unlimited, &gt; 0 for a finite rate, &lt; 0 for no change
682+
* (use the value from the passed policy)
683+
*/
684+
@Override
685+
public <T> void scan(ScanPolicy policy, @NotNull Class<T> clazz, @NotNull Processor<T> processor, int recordsPerSecond) {
686+
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
687+
if (policy == null) {
688+
policy = entry.getScanPolicy();
689+
}
690+
if (recordsPerSecond >= 0) {
691+
// Ensure the underlying rate on the policy does not change
692+
policy = new ScanPolicy(policy);
693+
policy.recordsPerSecond = recordsPerSecond;
694+
}
695+
String namespace = entry.getNamespace();
696+
String setName = entry.getSetName();
697+
698+
AtomicBoolean userTerminated = new AtomicBoolean(false);
699+
try {
700+
mClient.scanAll(policy, namespace, setName, (key, record) -> {
701+
T object = this.getMappingConverter().convertToObject(clazz, record);
702+
if (!processor.process(object)) {
703+
userTerminated.set(true);
704+
throw new AerospikeException.ScanTerminated();
705+
}
706+
});
707+
} catch (ScanTerminated st) {
708+
if (!userTerminated.get()) {
709+
throw st;
710+
}
711+
}
712+
}
713+
714+
/**
715+
* Perform a secondary index query with the specified query policy. Each record will be converted
716+
* to the appropriate class then passed to the processor. If the processor returns false the query is aborted
717+
* whereas if the processor returns true subsequent records (if any) are processed.
718+
* <p/>
719+
* The query policy used will be the one associated with the passed classtype.
720+
*
721+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
722+
* @param processor - the Processor used to process each record
723+
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
724+
* associated with the passed classtype will be scanned, effectively turning the query into a scan
725+
*/
726+
@Override
727+
public <T> void query(@NotNull Class<T> clazz, @NotNull Processor<T> processor, Filter filter) {
728+
query(null, clazz, processor, filter);
729+
}
730+
731+
/**
732+
* Perform a secondary index query with the specified query policy. Each record will be converted
733+
* to the appropriate class then passed to the processor. If the processor returns false the query is aborted
734+
* whereas if the processor returns true subsequent records (if any) are processed.
735+
*
736+
* @param policy - The query policy to use. If this parameter is not passed, the query policy associated with the passed classtype will be used
737+
* @param clazz - the class used to determine which set to scan and to convert the returned records to.
738+
* @param processor - the Processor used to process each record
739+
* @param filter - the filter used to determine which secondary index to use. If this filter is null, every record in the set
740+
* associated with the passed classtype will be scanned, effectively turning the query into a scan
741+
*/
742+
@Override
743+
public <T> void query(QueryPolicy policy, @NotNull Class<T> clazz, @NotNull Processor<T> processor, Filter filter) {
744+
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
745+
if (policy == null) {
746+
policy = entry.getQueryPolicy();
747+
}
748+
Statement statement = new Statement();
749+
statement.setFilter(filter);
750+
statement.setNamespace(entry.getNamespace());
751+
statement.setSetName(entry.getSetName());
752+
753+
RecordSet recordSet = mClient.query(policy, statement);
754+
try {
755+
while (recordSet.next()) {
756+
T object = this.getMappingConverter().convertToObject(clazz, recordSet.getRecord());
757+
if (!processor.process(object)) {
758+
break;
759+
}
760+
}
761+
} finally {
762+
recordSet.close();
763+
}
764+
}
765+
616766
@Override
617767
public IAerospikeClient getClient() {
618768
return this.mClient;

src/main/java/com/aerospike/mapper/tools/IAeroMapper.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package com.aerospike.mapper.tools;
22

3+
import java.util.function.Function;
4+
5+
import javax.validation.constraints.NotNull;
6+
37
import com.aerospike.client.IAerospikeClient;
48
import com.aerospike.client.policy.BatchPolicy;
59
import com.aerospike.client.policy.Policy;
10+
import com.aerospike.client.policy.QueryPolicy;
11+
import com.aerospike.client.policy.ScanPolicy;
612
import com.aerospike.client.policy.WritePolicy;
13+
import com.aerospike.client.query.Filter;
714
import com.aerospike.mapper.tools.virtuallist.VirtualList;
815

9-
import javax.validation.constraints.NotNull;
10-
import java.util.function.Function;
11-
1216
public interface IAeroMapper extends IBaseAeroMapper {
1317

1418
void save(@NotNull Object... objects);
@@ -54,4 +58,16 @@ public interface IAeroMapper extends IBaseAeroMapper {
5458
<T> void find(@NotNull Class<T> clazz, Function<T, Boolean> function);
5559

5660
IAerospikeClient getClient();
61+
62+
<T> void query(@NotNull Class<T> clazz, @NotNull Processor<T> processor, Filter filter);
63+
64+
<T> void query(QueryPolicy policy, @NotNull Class<T> clazz, @NotNull Processor<T> processor, Filter filter);
65+
66+
<T> void scan(@NotNull Class<T> clazz, @NotNull Processor<T> processor);
67+
68+
<T> void scan(ScanPolicy policy, @NotNull Class<T> clazz, @NotNull Processor<T> processor);
69+
70+
<T> void scan(@NotNull Class<T> clazz, @NotNull Processor<T> processor, int recordsPerSecond);
71+
72+
<T> void scan(ScanPolicy policy, @NotNull Class<T> clazz, @NotNull Processor<T> processor, int recordsPerSecond);
5773
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.aerospike.mapper.tools;
2+
3+
public interface Processor<T> {
4+
/**
5+
* Process the given record.
6+
*
7+
* @param data - the record to be processed
8+
* @return true if further records should be processed, false if the processing loop should abort.
9+
*/
10+
boolean process(T data);
11+
}

0 commit comments

Comments
 (0)