Skip to content

Commit b88f9a3

Browse files
authored
#188601341 v1.2.0 (#17)
* v1.2.0 Improved concurrent batching with new configurable send-queue max size and max wait time * Log update
1 parent a46a384 commit b88f9a3

File tree

7 files changed

+190
-113
lines changed

7 files changed

+190
-113
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.moesif</groupId>
88
<artifactId>moesif-okhttp-interceptor</artifactId>
9-
<version>1.1.7</version>
9+
<version>1.2.0</version>
1010
<packaging>jar</packaging>
1111
<name>moesif-okhttp-interceptor</name>
1212
<url>https://www.moesif.com</url>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package com.moesif.sdk.okhttp3client;
2+
3+
import com.moesif.api.MoesifAPIClient;
4+
import com.moesif.api.controllers.APIController;
5+
import com.moesif.api.http.client.APICallBack;
6+
import com.moesif.api.http.client.HttpContext;
7+
import com.moesif.api.http.response.HttpResponse;
8+
import com.moesif.api.models.EventModel;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.concurrent.BlockingQueue;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
18+
public class BatchEventLogger implements Runnable {
19+
private static final Logger logger = LoggerFactory.getLogger(BatchEventLogger.class);
20+
21+
private final BlockingQueue<EventModel> queue;
22+
private final int batchSize;
23+
private final long maxWaitTimeMillis;
24+
private final APIController apiController;
25+
private final AtomicBoolean running = new AtomicBoolean(true);
26+
27+
public BatchEventLogger(BlockingQueue<EventModel> queue, int batchSize, long maxWaitTimeMillis, String applicationId) {
28+
this.queue = queue;
29+
this.batchSize = batchSize;
30+
this.maxWaitTimeMillis = maxWaitTimeMillis;
31+
MoesifAPIClient client = new MoesifAPIClient(applicationId);
32+
this.apiController = client.getAPI();
33+
}
34+
35+
@Override
36+
public void run() {
37+
try {
38+
List<EventModel> batch = new ArrayList<>();
39+
long batchStartTime = 0;
40+
41+
while (running.get() || !queue.isEmpty()) {
42+
long timeout = maxWaitTimeMillis;
43+
44+
if (!batch.isEmpty()) {
45+
long elapsedTime = System.currentTimeMillis() - batchStartTime;
46+
timeout = maxWaitTimeMillis - elapsedTime;
47+
if (timeout <= 0) {
48+
// Time limit reached, send the batch
49+
sendBatch(new ArrayList<>(batch));
50+
batch.clear();
51+
batchStartTime = 0;
52+
timeout = maxWaitTimeMillis;
53+
}
54+
}
55+
56+
// Poll for the next event with the calculated timeout
57+
EventModel event = queue.poll(timeout, TimeUnit.MILLISECONDS);
58+
59+
if (event != null) {
60+
if (batch.isEmpty()) {
61+
// Start the batch timer
62+
batchStartTime = System.currentTimeMillis();
63+
}
64+
batch.add(event);
65+
66+
if (batch.size() >= batchSize) {
67+
logger.debug("Seding batch of {} events after reaching batch size limit", batch.size());
68+
// Batch size limit reached, send the batch
69+
sendBatch(new ArrayList<>(batch));
70+
batch.clear();
71+
batchStartTime = 0;
72+
}
73+
} else {
74+
// No event received within timeout
75+
if (!batch.isEmpty()) {
76+
logger.debug("Seding batch of {} events after max wait timeout", batch.size());
77+
// Send any accumulated events
78+
sendBatch(new ArrayList<>(batch));
79+
batch.clear();
80+
batchStartTime = 0;
81+
}
82+
if (!running.get()) {
83+
// Exit if the running flag is false
84+
break;
85+
}
86+
}
87+
}
88+
} catch (InterruptedException e) {
89+
Thread.currentThread().interrupt();
90+
logger.info("EventConsumer interrupted, shutting down");
91+
} catch (Exception e) {
92+
logger.error("Error in EventConsumer", e);
93+
} finally {
94+
// Process any remaining events before exiting
95+
processRemainingEvents();
96+
}
97+
}
98+
99+
private void sendBatch(List<EventModel> batch) {
100+
if (!batch.isEmpty()) {
101+
try {
102+
apiController.createEventsBatchAsync(batch, new MoesifApiCallBack());
103+
} catch (Exception e) {
104+
// Handle exception during sending
105+
logger.error("Exception while sending event batch", e);
106+
}
107+
}
108+
}
109+
110+
private void processRemainingEvents() {
111+
List<EventModel> remainingEvents = new ArrayList<>();
112+
queue.drainTo(remainingEvents);
113+
if (!remainingEvents.isEmpty()) {
114+
logger.info("Processing remaining events before shutdown");
115+
sendBatch(remainingEvents);
116+
}
117+
}
118+
119+
public void shutdown() {
120+
running.set(false);
121+
}
122+
123+
public static class MoesifApiCallBack implements APICallBack<HttpResponse> {
124+
125+
public void onSuccess(HttpContext context, HttpResponse response) {
126+
int respStatusCode = response.getStatusCode();
127+
if (201 != respStatusCode)
128+
logger.debug("Received status code {}", respStatusCode);
129+
else
130+
logger.debug("Events submitted to Moesif");
131+
}
132+
133+
public void onFailure(HttpContext context, Throwable error) {
134+
logger.debug("onFailure {} {}", context.getResponse(), error.getMessage());
135+
}
136+
}
137+
}

src/main/java/com/moesif/sdk/okhttp3client/EventModelBuffer.java

-36
This file was deleted.

src/main/java/com/moesif/sdk/okhttp3client/MoesifApiLogEvent.java

-48
This file was deleted.

src/main/java/com/moesif/sdk/okhttp3client/MoesifOkHttp3Interceptor.java

+40-15
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package com.moesif.sdk.okhttp3client;
22

3+
import com.moesif.api.models.EventModel;
34
import com.moesif.api.models.EventRequestModel;
45
import com.moesif.api.models.EventResponseModel;
6+
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesif;
7+
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesifImpl;
58
import com.moesif.sdk.okhttp3client.config.MoesifApiConnConfig;
69
import com.moesif.sdk.okhttp3client.models.OkHttp3RequestMapper;
710
import com.moesif.sdk.okhttp3client.models.OkHttp3ResponseMapper;
811
import com.moesif.sdk.okhttp3client.models.filter.IInterceptEventFilter;
912
import com.moesif.sdk.okhttp3client.util.ResponseWrap;
10-
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesifImpl;
11-
import com.moesif.external.facebook.stetho.inspector.network.NetworkEventReporterMoesif;
1213
import okhttp3.*;
1314
import okio.BufferedSource;
1415
import okio.Okio;
@@ -20,12 +21,15 @@
2021
import java.io.ByteArrayOutputStream;
2122
import java.io.IOException;
2223
import java.io.InputStream;
24+
import java.time.Instant;
2325
import java.util.Collection;
2426
import java.util.Date;
27+
import java.util.concurrent.BlockingQueue;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.LinkedBlockingQueue;
2531
import java.util.concurrent.atomic.AtomicInteger;
2632

27-
import java.time.Instant;
28-
2933
/**
3034
* MoesifOkHttp3Interceptor
3135
* This intrceptor can be used both as Application and Network interceptor
@@ -39,12 +43,14 @@
3943
*/
4044
public class MoesifOkHttp3Interceptor implements Interceptor {
4145
private static final Logger logger = LoggerFactory.getLogger(
42-
MoesifOkHttp3Interceptor.class);
43-
46+
MoesifOkHttp3Interceptor.class);
47+
private static MoesifApiConnConfig connConfig;
4448
private final NetworkEventReporterMoesif mEventReporter =
45-
NetworkEventReporterMoesifImpl.get();
49+
NetworkEventReporterMoesifImpl.get();
4650
private final AtomicInteger mNextRequestId = new AtomicInteger(0);
47-
private static MoesifApiConnConfig connConfig;
51+
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
52+
private BlockingQueue<EventModel> eventQueue;
53+
private BatchEventLogger batchLogger;
4854

4955
/**
5056
* Initialize the Interceptor
@@ -70,7 +76,7 @@ public MoesifOkHttp3Interceptor(String moesifApplicationId) {
7076
* to collector
7177
*/
7278
public MoesifOkHttp3Interceptor(String moesifApplicationId, Integer eventsBufferSize) {
73-
MoesifApiConnConfig c = new MoesifApiConnConfig(moesifApplicationId);
79+
MoesifApiConnConfig c = new MoesifApiConnConfig(moesifApplicationId);
7480
c.setEventsBufferSize(eventsBufferSize);
7581
init(c);
7682
}
@@ -82,10 +88,11 @@ public MoesifOkHttp3Interceptor(String moesifApplicationId, Integer eventsBuffer
8288
* to collector
8389
*/
8490
public MoesifOkHttp3Interceptor(Integer eventsBufferSize) {
85-
MoesifApiConnConfig c = new MoesifApiConnConfig(null);
91+
MoesifApiConnConfig c = new MoesifApiConnConfig(null);
8692
c.setEventsBufferSize(eventsBufferSize);
8793
init(c);
8894
}
95+
8996
/**
9097
* Initialize the Interceptor
9198
* @param connConfig MoesifApiConnConfig object
@@ -96,7 +103,27 @@ public MoesifOkHttp3Interceptor(MoesifApiConnConfig connConfig) {
96103

97104
public void init(MoesifApiConnConfig connConfig) {
98105
MoesifOkHttp3Interceptor.connConfig = (null == connConfig)
99-
? new MoesifApiConnConfig() : connConfig;
106+
? new MoesifApiConnConfig() : connConfig;
107+
eventQueue = new LinkedBlockingQueue<>(MoesifOkHttp3Interceptor.connConfig.maxQueueSize);
108+
batchLogger = new BatchEventLogger(
109+
eventQueue,
110+
MoesifOkHttp3Interceptor.connConfig.eventsBufferSize,
111+
MoesifOkHttp3Interceptor.connConfig.eventTimeoutMillis,
112+
MoesifOkHttp3Interceptor.connConfig.getApplicationId()
113+
);
114+
executorService.submit(batchLogger);
115+
// Add shutdown hook to clean up and gracefully exit
116+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
117+
batchLogger.shutdown();
118+
executorService.shutdown();
119+
try {
120+
if (!executorService.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
121+
executorService.shutdownNow();
122+
}
123+
} catch (InterruptedException e) {
124+
executorService.shutdownNow();
125+
}
126+
}));
100127
if (getConnConfig().isDebug()) {
101128
logger.debug("MoesifOkHttp3Interceptor initialized with config: {}", getConnConfig());
102129
}
@@ -173,9 +200,8 @@ public Response intercept(Chain chain) throws IOException {
173200
loggedResponse,
174201
outputStream,
175202
respw.isJsonHeader(),
176-
connConfig.getApplicationId(),
203+
eventQueue,
177204
connConfig.getMaxAllowedBodyBytesResponse(),
178-
connConfig.getEventsBufferSize(),
179205
filter.identifyUser(request, response).orElse(null),
180206
filter.identifyCompany(request, response).orElse(null),
181207
filter.sessionToken(request, response).orElse(null),
@@ -201,8 +227,7 @@ public Response intercept(Chain chain) throws IOException {
201227
} catch (Exception e) {
202228
logger.warn("Error parsing response body", e);
203229
}
204-
}
205-
else {
230+
} else {
206231
logger.warn("Body is null");
207232
}
208233
return response;

0 commit comments

Comments
 (0)