Skip to content

Commit 0ddad63

Browse files
authored
Merge pull request #592 from splitio/FME-8359-uniquekeys-chunks
Fme 8359 uniquekeys chunks
2 parents a686521 + a2a251e commit 0ddad63

File tree

3 files changed

+131
-6
lines changed

3 files changed

+131
-6
lines changed

client/src/main/java/io/split/client/impressions/UniqueKeysTrackerImp.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.client.impressions;
22

3+
import com.google.common.collect.Lists;
34
import io.split.client.dtos.UniqueKeys;
45
import io.split.client.impressions.filters.BloomFilterImp;
56
import io.split.client.impressions.filters.Filter;
@@ -21,12 +22,14 @@
2122
import java.util.concurrent.ThreadFactory;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
2426

2527
public class UniqueKeysTrackerImp implements UniqueKeysTracker{
2628
private static final Logger _log = LoggerFactory.getLogger(UniqueKeysTrackerImp.class);
2729
private static final double MARGIN_ERROR = 0.01;
28-
private static final int MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS = 30000;
30+
private static final int MAX_UNIQUE_KEYS_POST_SIZE = 5000;
2931
private static final int MAX_AMOUNT_OF_KEYS = 10000000;
32+
private final AtomicInteger trackerKeysSize = new AtomicInteger(0);
3033
private FilterAdapter filterAdapter;
3134
private final TelemetrySynchronizer _telemetrySynchronizer;
3235
private final ScheduledExecutorService _uniqueKeysSyncScheduledExecutorService;
@@ -59,10 +62,11 @@ public boolean track(String featureFlagName, String key) {
5962
(feature, current) -> {
6063
HashSet<String> keysByFeature = Optional.ofNullable(current).orElse(new HashSet<>());
6164
keysByFeature.add(key);
65+
trackerKeysSize.incrementAndGet();
6266
return keysByFeature;
6367
});
6468
_logger.debug("The feature flag " + featureFlagName + " and key " + key + " was added");
65-
if (uniqueKeysTracker.size() >= MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS){
69+
if (trackerKeysSize.intValue() >= MAX_UNIQUE_KEYS_POST_SIZE){
6670
_logger.warn("The UniqueKeysTracker size reached the maximum limit");
6771
try {
6872
sendUniqueKeys();
@@ -107,6 +111,7 @@ public HashMap<String,HashSet<String>> popAll(){
107111
HashSet<String> value = uniqueKeysTracker.remove(key);
108112
toReturn.put(key, value);
109113
}
114+
trackerKeysSize.set(0);
110115
return toReturn;
111116
}
112117

@@ -115,26 +120,71 @@ private void sendUniqueKeys(){
115120
_log.debug("SendUniqueKeys already running");
116121
return;
117122
}
123+
118124
try {
119-
if (uniqueKeysTracker.size() == 0) {
125+
if (uniqueKeysTracker.isEmpty()) {
120126
_log.debug("The Unique Keys Tracker is empty");
121127
return;
122128
}
129+
123130
HashMap<String, HashSet<String>> uniqueKeysHashMap = popAll();
124131
List<UniqueKeys.UniqueKey> uniqueKeysFromPopAll = new ArrayList<>();
125132
for (Map.Entry<String, HashSet<String>> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) {
126133
UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue()));
127134
uniqueKeysFromPopAll.add(uniqueKey);
128135
}
129-
_telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(uniqueKeysFromPopAll));
136+
uniqueKeysFromPopAll = capChunksToMaxSize(uniqueKeysFromPopAll);
137+
138+
for (List<UniqueKeys.UniqueKey> chunk : getChunks(uniqueKeysFromPopAll)) {
139+
_telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(chunk));
140+
}
130141
} finally {
131142
sendGuard.set(false);
132143
}
133144
}
134145

146+
private List<UniqueKeys.UniqueKey> capChunksToMaxSize(List<UniqueKeys.UniqueKey> uniqueKeys) {
147+
List<UniqueKeys.UniqueKey> finalChunk = new ArrayList<>();
148+
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeys) {
149+
if (uniqueKey.keysDto.size() > MAX_UNIQUE_KEYS_POST_SIZE) {
150+
for(List<String> subChunk : Lists.partition(uniqueKey.keysDto, MAX_UNIQUE_KEYS_POST_SIZE)) {
151+
finalChunk.add(new UniqueKeys.UniqueKey(uniqueKey.featureName, subChunk));
152+
}
153+
continue;
154+
}
155+
finalChunk.add(uniqueKey);
156+
}
157+
return finalChunk;
158+
}
159+
160+
private List<List<UniqueKeys.UniqueKey>> getChunks(List<UniqueKeys.UniqueKey> uniqueKeys) {
161+
List<List<UniqueKeys.UniqueKey>> chunks = new ArrayList<>();
162+
List<UniqueKeys.UniqueKey> intermediateChunk = new ArrayList<>();
163+
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeys) {
164+
if ((getChunkSize(intermediateChunk) + uniqueKey.keysDto.size()) > MAX_UNIQUE_KEYS_POST_SIZE) {
165+
chunks.add(intermediateChunk);
166+
intermediateChunk = new ArrayList<>();
167+
}
168+
intermediateChunk.add(uniqueKey);
169+
}
170+
if (!intermediateChunk.isEmpty()) {
171+
chunks.add(intermediateChunk);
172+
}
173+
return chunks;
174+
}
175+
176+
private int getChunkSize(List<UniqueKeys.UniqueKey> uniqueKeysChunk) {
177+
int totalSize = 0;
178+
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeysChunk) {
179+
totalSize += uniqueKey.keysDto.size();
180+
}
181+
return totalSize;
182+
}
183+
135184
private interface ExecuteUniqueKeysAction{
136185
void execute();
137186
}
187+
138188
private class ExecuteCleanFilter implements ExecuteUniqueKeysAction {
139189

140190
@Override

client/src/test/java/io/split/client/impressions/UniqueKeysTrackerImpTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package io.split.client.impressions;
22

3+
import io.split.client.dtos.UniqueKeys;
34
import io.split.telemetry.synchronizer.TelemetryInMemorySubmitter;
45
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
56
import org.junit.Assert;
67
import org.junit.Test;
78
import org.mockito.Mockito;
89

10+
import java.lang.reflect.Field;
11+
import java.lang.reflect.InvocationTargetException;
12+
import java.lang.reflect.Method;
913
import java.util.HashMap;
1014
import java.util.HashSet;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.ArrayList;
18+
import java.util.concurrent.atomic.AtomicInteger;
1119

1220
public class UniqueKeysTrackerImpTest {
1321
private static TelemetrySynchronizer _telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class);
@@ -100,4 +108,71 @@ public void testStopSynchronization() throws Exception {
100108
uniqueKeysTrackerImp.stop();
101109
Mockito.verify(telemetrySynchronizer, Mockito.times(1)).synchronizeUniqueKeys(Mockito.anyObject());
102110
}
111+
112+
@Test
113+
public void testUniqueKeysChunks() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
114+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_telemetrySynchronizer, 10000, 10000, null);
115+
HashMap<String, HashSet<String>> uniqueKeysHashMap = new HashMap<>();
116+
HashSet<String> feature1 = new HashSet<>();
117+
HashSet<String> feature2 = new HashSet<>();
118+
HashSet<String> feature3 = new HashSet<>();
119+
HashSet<String> feature4 = new HashSet<>();
120+
HashSet<String> feature5 = new HashSet<>();
121+
for (Integer i=1; i<6000; i++) {
122+
if (i <= 1000) {
123+
feature1.add("key" + i);
124+
}
125+
if (i <= 2000) {
126+
feature2.add("key" + i);
127+
}
128+
if (i <= 3000) {
129+
feature3.add("key" + i);
130+
}
131+
if (i <= 4000) {
132+
feature4.add("key" + i);
133+
}
134+
feature5.add("key" + i);
135+
}
136+
uniqueKeysHashMap.put("feature1", feature1);
137+
uniqueKeysHashMap.put("feature2", feature2);
138+
uniqueKeysHashMap.put("feature3", feature3);
139+
uniqueKeysHashMap.put("feature4", feature4);
140+
uniqueKeysHashMap.put("feature5", feature5);
141+
142+
List<UniqueKeys.UniqueKey> uniqueKeysFromPopAll = new ArrayList<>();
143+
for (Map.Entry<String, HashSet<String>> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) {
144+
UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue()));
145+
uniqueKeysFromPopAll.add(uniqueKey);
146+
}
147+
Method methodCapChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("capChunksToMaxSize", List.class);
148+
methodCapChunks.setAccessible(true);
149+
uniqueKeysFromPopAll = (List<UniqueKeys.UniqueKey>)methodCapChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);
150+
151+
Method methodGetChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("getChunks", List.class);
152+
methodGetChunks.setAccessible(true);
153+
List<List<UniqueKeys.UniqueKey>> keysChunks = (List<List<UniqueKeys.UniqueKey>>) methodGetChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);
154+
for (List<UniqueKeys.UniqueKey> chunk : keysChunks) {
155+
int chunkSize = 0;
156+
for (UniqueKeys.UniqueKey keys : chunk) {
157+
chunkSize += keys.keysDto.size();
158+
}
159+
Assert.assertTrue(chunkSize <= 5000);
160+
}
161+
}
162+
163+
@Test
164+
public void testTrackReachMaxKeys() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
165+
TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class);
166+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(telemetrySynchronizer, 10000, 10000, null);
167+
for (int i=1; i<6000; i++) {
168+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1", "key" + i));
169+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2", "key" + i));
170+
}
171+
Mockito.verify(telemetrySynchronizer, Mockito.times(2)).synchronizeUniqueKeys(Mockito.anyObject());
172+
173+
Field getTrackerSize = uniqueKeysTrackerImp.getClass().getDeclaredField("trackerKeysSize");
174+
getTrackerSize.setAccessible(true);
175+
AtomicInteger trackerSize = (AtomicInteger) getTrackerSize.get(uniqueKeysTrackerImp);
176+
Assert.assertTrue(trackerSize.intValue() == 1998);
177+
}
103178
}

testing/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</parent>
1010
<artifactId>java-client-testing</artifactId>
1111
<packaging>jar</packaging>
12-
<version>4.16.0</version>
12+
<version>4.16.1</version>
1313
<name>Java Client For Testing</name>
1414
<description>Testing suite for Java SDK for Split</description>
1515
<dependencies>
@@ -39,7 +39,7 @@
3939
<publishingServerId>central</publishingServerId>
4040
<autoPublish>false</autoPublish>
4141
<waitUntil>published</waitUntil>
42-
<ignorePublishedComponents>true</ignorePublishedComponents>
42+
<ignorePublishedComponents>false</ignorePublishedComponents>
4343
</configuration>
4444
</plugin>
4545
</plugins>

0 commit comments

Comments
 (0)