35
35
import java .util .ArrayList ;
36
36
import java .util .Collections ;
37
37
import java .util .List ;
38
- import java .util .concurrent .CompletionStage ;
39
38
import java .util .concurrent .Executors ;
40
39
import java .util .concurrent .ScheduledExecutorService ;
41
40
import java .util .concurrent .ScheduledFuture ;
42
41
import java .util .concurrent .TimeUnit ;
42
+ import java .util .concurrent .CompletableFuture ;
43
43
import java .util .concurrent .atomic .AtomicInteger ;
44
44
import java .util .concurrent .locks .ReentrantLock ;
45
45
import java .util .function .Function ;
@@ -80,9 +80,9 @@ private static class RequestExecution<Context> {
80
80
public final long id ;
81
81
public final BulkRequest request ;
82
82
public final List <Context > contexts ;
83
- public final CompletionStage <BulkResponse > futureResponse ;
83
+ public final CompletableFuture <BulkResponse > futureResponse ;
84
84
85
- RequestExecution (long id , BulkRequest request , List <Context > contexts , CompletionStage <BulkResponse > futureResponse ) {
85
+ RequestExecution (long id , BulkRequest request , List <Context > contexts , CompletableFuture <BulkResponse > futureResponse ) {
86
86
this .id = id ;
87
87
this .request = request ;
88
88
this .contexts = contexts ;
@@ -99,7 +99,7 @@ private BulkIngester(Builder<Context> builder) {
99
99
this .maxOperations = builder .bulkOperations < 0 ? Integer .MAX_VALUE : builder .bulkOperations ;
100
100
this .listener = builder .listener ;
101
101
this .flushIntervalMillis = builder .flushIntervalMillis ;
102
-
102
+
103
103
if (flushIntervalMillis != null ) {
104
104
long flushInterval = flushIntervalMillis ;
105
105
@@ -119,7 +119,7 @@ private BulkIngester(Builder<Context> builder) {
119
119
// It's not ours, we will not close it.
120
120
scheduler = builder .scheduler ;
121
121
}
122
-
122
+
123
123
this .flushTask = scheduler .scheduleWithFixedDelay (
124
124
this ::failsafeFlush ,
125
125
flushInterval , flushInterval ,
@@ -271,7 +271,11 @@ private void failsafeFlush() {
271
271
}
272
272
}
273
273
274
- public void flush () {
274
+ /**
275
+ * @return A future of the response, or null if there was no operations to execute.
276
+ */
277
+ @ Nullable
278
+ public CompletableFuture <BulkResponse > flush () {
275
279
RequestExecution <Context > exec = sendRequestCondition .whenReadyIf (
276
280
() -> {
277
281
// May happen on manual and periodic flushes
@@ -294,7 +298,7 @@ public void flush() {
294
298
listener .beforeBulk (id , request , requestContexts );
295
299
}
296
300
297
- CompletionStage <BulkResponse > result = client .bulk (request );
301
+ CompletableFuture <BulkResponse > result = client .bulk (request );
298
302
requestsInFlightCount ++;
299
303
300
304
if (listener == null ) {
@@ -327,7 +331,11 @@ public void flush() {
327
331
}
328
332
return null ;
329
333
});
334
+
335
+ return exec .futureResponse ;
330
336
}
337
+
338
+ return null ;
331
339
}
332
340
333
341
public void add (BulkOperation operation , Context context ) {
@@ -483,7 +491,7 @@ public Builder<Context> flushInterval(long value, TimeUnit unit) {
483
491
/**
484
492
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
485
493
* <p>
486
- * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
494
+ * Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
487
495
*/
488
496
public Builder <Context > flushInterval (long value , TimeUnit unit , ScheduledExecutorService scheduler ) {
489
497
this .scheduler = scheduler ;
@@ -528,4 +536,3 @@ public BulkIngester<Context> build() {
528
536
}
529
537
}
530
538
}
531
-
0 commit comments