File tree Expand file tree Collapse file tree 2 files changed +63
-0
lines changed
parallel-consumer-core/src
main/java/io/confluent/parallelconsumer
test/java/io/confluent/parallelconsumer/state Expand file tree Collapse file tree 2 files changed +63
-0
lines changed Original file line number Diff line number Diff line change @@ -419,6 +419,23 @@ public void validate() {
419
419
Objects .requireNonNull (consumer , "A consumer must be supplied" );
420
420
421
421
transactionsValidation ();
422
+ validateMinBatchParameters ();
423
+ }
424
+
425
+ private void validateMinBatchParameters () {
426
+ if (isEnforceMinBatch ()){
427
+ if (minBatchSize > batchSize )
428
+ throw new IllegalArgumentException (
429
+ msg ("minBatchSize can not by bigger than batchSize: {} > {}" ,
430
+ minBatchSize ,
431
+ batchSize ));
432
+ }
433
+ if (minBatchTimeoutInMillis < 0 ){
434
+ throw new IllegalArgumentException (
435
+ msg ("minBatchTimeoutInMillis should be non negative: {}" ,
436
+ minBatchTimeoutInMillis
437
+ ));
438
+ }
422
439
}
423
440
424
441
private void transactionsValidation () {
Original file line number Diff line number Diff line change
1
+ package io .confluent .parallelconsumer .state ;
2
+
3
+ /*-
4
+ * Copyright (C) 2020-2023 Confluent, Inc.
5
+ */
6
+
7
+ import io .confluent .parallelconsumer .ParallelConsumerOptions ;
8
+ import lombok .extern .slf4j .Slf4j ;
9
+ import org .apache .kafka .clients .consumer .Consumer ;
10
+ import org .junit .jupiter .api .Test ;
11
+ import org .mockito .Mockito ;
12
+
13
+ import static org .junit .jupiter .api .Assertions .assertThrows ;
14
+
15
+
16
+ @ Slf4j
17
+ public class ParallelConsumerOptionsTest {
18
+
19
+ private final Consumer <Object ,Object > mockConsumer = Mockito .mock (Consumer .class );
20
+
21
+
22
+ @ Test
23
+ void validateMinBatchParameters (){
24
+ assertThrows (
25
+ IllegalArgumentException .class ,
26
+ () -> ParallelConsumerOptions .builder ()
27
+ .minBatchSize (10 )
28
+ .minBatchTimeoutInMillis (100 )
29
+ .consumer (mockConsumer )
30
+ .batchSize (5 )
31
+ .build ()
32
+ .validate ()
33
+ );
34
+
35
+ assertThrows (
36
+ IllegalArgumentException .class ,
37
+ () -> ParallelConsumerOptions .builder ()
38
+ .minBatchSize (3 )
39
+ .minBatchTimeoutInMillis (-1 )
40
+ .consumer (mockConsumer )
41
+ .batchSize (5 )
42
+ .build ()
43
+ .validate ()
44
+ );
45
+ }
46
+ }
You can’t perform that action at this time.
0 commit comments