Skip to content

Commit 0da08a4

Browse files
tim-aeroroimenashe
andauthored
FMWK-466 Add Insert Only Method (#158)
* Fixed issue with CREATE_ONLY RecordExistsAction If a policy specified either by defaultPolicy or using the policy hierarchy has a RecordExistsAction of CREATE_ONLY, the save method was overwriting this with REPLACE. Changed this to only replace the policy with REPLACE if the passed RecordExistsAction is UPDATE. * 1. The fix is wrong, no need to allow global policy changes to affect records exists action 2. Refactor and cleanup (use generics instead of Objects) 3. Add insert() method for both reactive and none-reactive flows and add integration tests --------- Co-authored-by: roimenashe <[email protected]>
1 parent 47b9952 commit 0da08a4

File tree

6 files changed

+227
-57
lines changed

6 files changed

+227
-57
lines changed

src/main/java/com/aerospike/mapper/tools/AeroMapper.java

+44-31
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.aerospike.mapper.tools.converters.MappingConverter;
3030
import com.aerospike.mapper.tools.utils.MapperUtils;
3131
import com.aerospike.mapper.tools.virtuallist.VirtualList;
32+
import reactor.core.publisher.Mono;
3233

3334
public class AeroMapper implements IAeroMapper {
3435

@@ -52,48 +53,30 @@ public Builder(IAerospikeClient client) {
5253

5354
}
5455
}
55-
56+
5657
@Override
57-
public void save(@NotNull Object... objects) throws AerospikeException {
58-
for (Object thisObject : objects) {
58+
public <T> void save(@NotNull T... objects) throws AerospikeException {
59+
for (T thisObject : objects) {
5960
this.save(thisObject);
6061
}
6162
}
6263

6364
@Override
64-
public void save(@NotNull Object object, String... binNames) throws AerospikeException {
65-
save(null, object, RecordExistsAction.REPLACE, binNames);
65+
public <T> void save(@NotNull T object, String... binNames) throws AerospikeException {
66+
WritePolicy writePolicy = generateWritePolicyFromObject(object);
67+
writePolicy.recordExistsAction = RecordExistsAction.REPLACE;
68+
save(writePolicy, object, binNames);
6669
}
6770

71+
@SuppressWarnings("unchecked")
6872
@Override
69-
public void save(@NotNull WritePolicy writePolicy, @NotNull Object object, String... binNames)
73+
public <T> void save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames)
7074
throws AerospikeException {
71-
save(writePolicy, object, null, binNames);
72-
}
73-
74-
@SuppressWarnings("unchecked")
75-
private <T> void save(WritePolicy writePolicy, @NotNull T object, RecordExistsAction recordExistsAction,
76-
String[] binNames) {
7775
Class<T> clazz = (Class<T>) object.getClass();
7876
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
79-
if (writePolicy == null) {
80-
writePolicy = new WritePolicy(entry.getWritePolicy());
81-
if (recordExistsAction != null) {
82-
writePolicy.recordExistsAction = recordExistsAction;
83-
}
84-
85-
// #132 -- Ensure that if an overriding TTL / sendkey is passed in the policy it
86-
// is NOT overwritten. Hence
87-
// only if the policy is null do we override these settings.
88-
Integer ttl = entry.getTtl();
89-
Boolean sendKey = entry.getSendKey();
9077

91-
if (ttl != null) {
92-
writePolicy.expiration = ttl;
93-
}
94-
if (sendKey != null) {
95-
writePolicy.sendKey = sendKey;
96-
}
78+
if (writePolicy == null) {
79+
writePolicy = generateWritePolicyFromObject(object);
9780
}
9881

9982
String set = entry.getSetName();
@@ -109,8 +92,38 @@ private <T> void save(WritePolicy writePolicy, @NotNull T object, RecordExistsAc
10992
}
11093

11194
@Override
112-
public void update(@NotNull Object object, String... binNames) throws AerospikeException {
113-
save(null, object, RecordExistsAction.UPDATE, binNames);
95+
public <T> void insert(@NotNull T object, String... binNames) {
96+
WritePolicy writePolicy = generateWritePolicyFromObject(object);
97+
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
98+
save(writePolicy, object, binNames);
99+
}
100+
101+
@Override
102+
public <T> void update(@NotNull T object, String... binNames) throws AerospikeException {
103+
WritePolicy writePolicy = generateWritePolicyFromObject(object);
104+
writePolicy.recordExistsAction = RecordExistsAction.UPDATE;
105+
save(writePolicy, object, binNames);
106+
}
107+
108+
@SuppressWarnings("unchecked")
109+
private <T> WritePolicy generateWritePolicyFromObject(T object) {
110+
Class<T> clazz = (Class<T>) object.getClass();
111+
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
112+
113+
WritePolicy writePolicy = new WritePolicy(entry.getWritePolicy());
114+
115+
// #132 -- Ensure that if an overriding TTL / sendKey is passed in the policy it
116+
// is NOT overwritten. Hence, only if the policy is null do we override these settings.
117+
Integer ttl = entry.getTtl();
118+
Boolean sendKey = entry.getSendKey();
119+
120+
if (ttl != null) {
121+
writePolicy.expiration = ttl;
122+
}
123+
if (sendKey != null) {
124+
writePolicy.sendKey = sendKey;
125+
}
126+
return writePolicy;
114127
}
115128

116129
@Override

src/main/java/com/aerospike/mapper/tools/IAeroMapper.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public interface IAeroMapper extends IBaseAeroMapper {
3333
* @param objects One or two objects to save.
3434
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
3535
*/
36-
void save(@NotNull Object... objects);
36+
<T> void save(@NotNull T... objects);
3737

3838
/**
3939
* Save an object in the database. This method will perform a REPLACE on the existing record so any existing
@@ -42,7 +42,7 @@ public interface IAeroMapper extends IBaseAeroMapper {
4242
* @param object The object to save.
4343
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
4444
*/
45-
void save(@NotNull Object object, String... binNames);
45+
<T> void save(@NotNull T object, String... binNames);
4646

4747
/**
4848
* Save an object in the database with the given WritePolicy. This write policy will override any other set writePolicy so
@@ -52,7 +52,16 @@ public interface IAeroMapper extends IBaseAeroMapper {
5252
* @param object The object to save.
5353
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
5454
*/
55-
void save(@NotNull WritePolicy writePolicy, @NotNull Object object, String... binNames);
55+
<T> void save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames);
56+
57+
/**
58+
* Insert an object to the database This uses the RecordExistsAction
59+
* of CREATE_ONLY. If bins are specified, only bins with the passed names will be inserted (or all of them if null is passed)
60+
*
61+
* @param object The object to insert.
62+
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
63+
*/
64+
<T> void insert(@NotNull T object, String... binNames);
5665

5766
/**
5867
* Updates the object in the database, merging the record with the existing record. This uses the RecordExistsAction
@@ -61,7 +70,7 @@ public interface IAeroMapper extends IBaseAeroMapper {
6170
* @param object The object to update.
6271
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
6372
*/
64-
void update(@NotNull Object object, String... binNames);
73+
<T> void update(@NotNull T object, String... binNames);
6574

6675
/**
6776
* Read a record from the repository and map it to an instance of the passed class, by providing a digest.

src/main/java/com/aerospike/mapper/tools/IReactiveAeroMapper.java

+9
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ public interface IReactiveAeroMapper extends IBaseAeroMapper {
5656
*/
5757
<T> Mono<T> save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames);
5858

59+
/**
60+
* Insert an object to the database This uses the RecordExistsAction
61+
* of CREATE_ONLY. If bins are specified, only bins with the passed names will be inserted (or all of them if null is passed)
62+
*
63+
* @param object The object to insert.
64+
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
65+
*/
66+
<T> Mono<T> insert(@NotNull T object, String... binNames);
67+
5968
/**
6069
* Updates the object in the database, merging the record with the existing record. This uses the RecordExistsAction
6170
* of UPDATE. If bins are specified, only bins with the passed names will be updated (or all of them if null is passed)

src/main/java/com/aerospike/mapper/tools/ReactiveAeroMapper.java

+37-22
Original file line numberDiff line numberDiff line change
@@ -60,34 +60,19 @@ public <T> Flux<T> save(@NotNull T... objects) {
6060

6161
@Override
6262
public <T> Mono<T> save(@NotNull T object, String... binNames) {
63-
return save(null, object, RecordExistsAction.REPLACE, binNames);
63+
WritePolicy writePolicy = generateWritePolicyFromObject(object);
64+
writePolicy.recordExistsAction = RecordExistsAction.REPLACE;
65+
return save(writePolicy, object, binNames);
6466
}
6567

68+
@SuppressWarnings("unchecked")
6669
@Override
6770
public <T> Mono<T> save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames) {
68-
return save(writePolicy, object, null, binNames);
69-
}
70-
71-
@SuppressWarnings("unchecked")
72-
private <T> Mono<T> save(WritePolicy writePolicy, @NotNull T object, RecordExistsAction recordExistsAction, String[] binNames) {
7371
Class<T> clazz = (Class<T>) object.getClass();
7472
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
75-
if (writePolicy == null) {
76-
writePolicy = new WritePolicy(entry.getWritePolicy());
77-
if (recordExistsAction != null) {
78-
writePolicy.recordExistsAction = recordExistsAction;
79-
}
80-
81-
// #132 -- Only override the TTL / send key if the policy was not passed in.
82-
Integer ttl = entry.getTtl();
83-
Boolean sendKey = entry.getSendKey();
8473

85-
if (ttl != null) {
86-
writePolicy.expiration = ttl;
87-
}
88-
if (sendKey != null) {
89-
writePolicy.sendKey = sendKey;
90-
}
74+
if (writePolicy == null) {
75+
writePolicy = generateWritePolicyFromObject(object);
9176
}
9277

9378
String set = entry.getSetName();
@@ -104,9 +89,39 @@ private <T> Mono<T> save(WritePolicy writePolicy, @NotNull T object, RecordExist
10489
.map(docKey -> object);
10590
}
10691

92+
@Override
93+
public <T> Mono<T> insert(@NotNull T object, String... binNames) {
94+
WritePolicy writePolicy = generateWritePolicyFromObject(object);
95+
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
96+
return save(writePolicy, object, binNames);
97+
}
98+
10799
@Override
108100
public <T> Mono<T> update(@NotNull T object, String... binNames) {
109-
return save(null, object, RecordExistsAction.UPDATE, binNames);
101+
WritePolicy writePolicy = generateWritePolicyFromObject(object);
102+
writePolicy.recordExistsAction = RecordExistsAction.UPDATE;
103+
return save(writePolicy, object, binNames);
104+
}
105+
106+
@SuppressWarnings("unchecked")
107+
private <T> WritePolicy generateWritePolicyFromObject(T object) {
108+
Class<T> clazz = (Class<T>) object.getClass();
109+
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
110+
111+
WritePolicy writePolicy = new WritePolicy(entry.getWritePolicy());
112+
113+
// #132 -- Ensure that if an overriding TTL / sendKey is passed in the policy it
114+
// is NOT overwritten. Hence, only if the policy is null do we override these settings.
115+
Integer ttl = entry.getTtl();
116+
Boolean sendKey = entry.getSendKey();
117+
118+
if (ttl != null) {
119+
writePolicy.expiration = ttl;
120+
}
121+
if (sendKey != null) {
122+
writePolicy.sendKey = sendKey;
123+
}
124+
return writePolicy;
110125
}
111126

112127
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.aerospike.mapper;
2+
3+
import com.aerospike.client.AerospikeException;
4+
import com.aerospike.client.Key;
5+
import com.aerospike.client.Record;
6+
import com.aerospike.client.policy.WritePolicy;
7+
import com.aerospike.mapper.annotations.AerospikeKey;
8+
import com.aerospike.mapper.annotations.AerospikeRecord;
9+
import com.aerospike.mapper.tools.AeroMapper;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertThrows;
15+
16+
public class InsertOnlyTest extends AeroMapperBaseTest {
17+
18+
@AerospikeRecord(namespace = "test", set = "testSet")
19+
public static class DataClass {
20+
@AerospikeKey
21+
int a;
22+
int b;
23+
int c;
24+
int d;
25+
int e;
26+
}
27+
28+
@BeforeEach
29+
public void setup() {
30+
client.delete(null, new Key("test", "testSet", 1));
31+
}
32+
33+
@Test
34+
public void testInsertOnly() {
35+
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
36+
writePolicy.totalTimeout = 2000;
37+
writePolicy.socketTimeout = 100;
38+
AeroMapper mapper = new AeroMapper.Builder(client)
39+
.withWritePolicy(writePolicy).forClasses(DataClass.class)
40+
.build();
41+
42+
DataClass dataClass = new DataClass();
43+
dataClass.a = 1;
44+
dataClass.b = 2;
45+
dataClass.c = 3;
46+
dataClass.d = 4;
47+
dataClass.e = 5;
48+
49+
// Insert
50+
mapper.insert(dataClass);
51+
52+
Key key = new Key("test", "testSet", 1);
53+
Record record = client.get(null, key);
54+
assertEquals(5, record.bins.size());
55+
assertEquals(3, record.getInt("c"));
56+
57+
// Try to insert again and get an exception
58+
dataClass.c = 9;
59+
dataClass.e = 11;
60+
assertThrows(AerospikeException.class, () -> mapper.insert(dataClass, "a", "c", "e"));
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.aerospike.mapper.reactive;
2+
3+
import com.aerospike.client.AerospikeException;
4+
import com.aerospike.client.Key;
5+
import com.aerospike.client.Record;
6+
import com.aerospike.client.policy.WritePolicy;
7+
import com.aerospike.mapper.annotations.AerospikeKey;
8+
import com.aerospike.mapper.annotations.AerospikeRecord;
9+
import com.aerospike.mapper.tools.ReactiveAeroMapper;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertThrows;
15+
16+
public class ReactiveInsertOnlyTest extends ReactiveAeroMapperBaseTest {
17+
18+
@AerospikeRecord(namespace = "test", set = "testSet")
19+
public static class DataClass {
20+
@AerospikeKey
21+
int a;
22+
int b;
23+
int c;
24+
int d;
25+
int e;
26+
}
27+
28+
@BeforeEach
29+
public void setup() {
30+
client.delete(null, new Key("test", "testSet", 1));
31+
}
32+
33+
@Test
34+
public void testInsertOnly() {
35+
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
36+
writePolicy.totalTimeout = 2000;
37+
writePolicy.socketTimeout = 100;
38+
ReactiveAeroMapper mapper = new ReactiveAeroMapper.Builder(reactorClient)
39+
.withWritePolicy(writePolicy).forClasses(DataClass.class)
40+
.build();
41+
42+
DataClass dataClass = new DataClass();
43+
dataClass.a = 1;
44+
dataClass.b = 2;
45+
dataClass.c = 3;
46+
dataClass.d = 4;
47+
dataClass.e = 5;
48+
49+
// Insert
50+
mapper.insert(dataClass).block();
51+
52+
Key key = new Key("test", "testSet", 1);
53+
Record record = client.get(null, key);
54+
assertEquals(5, record.bins.size());
55+
assertEquals(3, record.getInt("c"));
56+
57+
// Try to insert again and get an exception
58+
dataClass.c = 9;
59+
dataClass.e = 11;
60+
assertThrows(AerospikeException.class, () -> mapper.insert(dataClass, "a", "c", "e").block());
61+
}
62+
}

0 commit comments

Comments
 (0)