|
32 | 32 | import java.util.ArrayList;
|
33 | 33 | import java.util.Arrays;
|
34 | 34 | import java.util.Collection;
|
| 35 | +import java.util.Collections; |
35 | 36 | import java.util.HashMap;
|
36 | 37 | import java.util.List;
|
37 | 38 | import java.util.Map;
|
@@ -1401,7 +1402,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
1401 | 1402 | public void testAckModeCount() throws Exception {
|
1402 | 1403 | ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
|
1403 | 1404 | Consumer<Integer, String> consumer = mock(Consumer.class);
|
1404 |
| - given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer); |
| 1405 | + given(cf.createConsumer(isNull(), eq("clientId"))).willReturn(consumer); |
1405 | 1406 | TopicPartition topicPartition = new TopicPartition("foo", 0);
|
1406 | 1407 | final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records1 = new HashMap<>();
|
1407 | 1408 | records1.put(topicPartition, Arrays.asList(
|
@@ -1446,13 +1447,13 @@ public void testAckModeCount() throws Exception {
|
1446 | 1447 | ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
|
1447 | 1448 | containerProps.setAckMode(AckMode.COUNT);
|
1448 | 1449 | containerProps.setAckCount(3);
|
1449 |
| - containerProps.setClientId("clientId"); |
1450 | 1450 | AtomicInteger recordCount = new AtomicInteger();
|
1451 | 1451 | containerProps.setMessageListener((MessageListener) r -> {
|
1452 | 1452 | recordCount.incrementAndGet();
|
1453 | 1453 | });
|
1454 | 1454 | KafkaMessageListenerContainer<Integer, String> container =
|
1455 | 1455 | new KafkaMessageListenerContainer<>(cf, containerProps);
|
| 1456 | + container.setClientIdSuffix("clientId"); |
1456 | 1457 | container.start();
|
1457 | 1458 | assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
|
1458 | 1459 | assertThat(recordCount.get()).isEqualTo(7);
|
|
0 commit comments