Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
arunvariyath committed Nov 4, 2024
2 parents a7dd238 + 2e853bb commit 7219cb8
Show file tree
Hide file tree
Showing 53 changed files with 610 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ public String getBySocketConnection(String urlString) throws IOException {
.getHostAddress();
}
}
}

public boolean validate(final String ip) {
String PATTERN = "^((0|1\\d?\\d?|2[0-4]?\\d?|25[0-5]?|[3-9]\\d?)\\.){3}(0|1\\d?\\d?|2[0-4]?\\d?|25[0-5]?|[3-9]\\d?)$";

return ip.matches(PATTERN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class GetURLIPAddressUnitTest {
@Test
public void givenValidURL_whenGetByInetAddress_thenReturnAValidIPAddress() throws UnknownHostException {
URLIPAddress urlipAddress = new URLIPAddress();
assertTrue(validate(urlipAddress.getByInetAddress("www.example.com")));
assertTrue(urlipAddress.validate(urlipAddress.getByInetAddress("www.example.com")));
}

@Test
Expand All @@ -25,20 +25,12 @@ public void givenInvalidURL_whenGetByInetAddress_thenThrowUnknownHostException()
@Test
public void givenValidURL_whenGetBySocketConnection_thenReturnAValidIPAddress() throws IOException {
URLIPAddress urlipAddress = new URLIPAddress();
assertTrue(validate(urlipAddress.getBySocketConnection("google.com")));
assertTrue(urlipAddress.validate(urlipAddress.getBySocketConnection("google.com")));
}

@Test
public void givenInvalidURL_whenGetBySocketConnection_thenThrowUnknownHostException() {
URLIPAddress urlipAddress = new URLIPAddress();
assertThrows(UnknownHostException.class, () -> urlipAddress.getBySocketConnection("https://www.example.com"));
}

public static boolean validate(final String ip) {
System.out.println("ip = " + ip);
String PATTERN = "^((0|1\\d?\\d?|2[0-4]?\\d?|25[0-5]?|[3-9]\\d?)\\.){3}(0|1\\d?\\d?|2[0-4]?\\d?|25[0-5]?|[3-9]\\d?)$";

return ip.matches(PATTERN);
}

}
3 changes: 0 additions & 3 deletions parent-spring-4/README.md

This file was deleted.

53 changes: 0 additions & 53 deletions parent-spring-4/pom.xml

This file was deleted.

5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,6 @@
<module>parent-boot-1</module>
<module>parent-boot-2</module>
<module>parent-boot-3</module>
<module>parent-spring-4</module>
<module>parent-spring-5</module>
<module>parent-spring-6</module>
<module>apache-kafka</module>
Expand Down Expand Up @@ -644,7 +643,6 @@
<module>parent-boot-1</module>
<module>parent-boot-2</module>
<module>parent-boot-3</module>
<module>parent-spring-4</module>
<module>parent-spring-5</module>
<module>parent-spring-6</module>
<module>akka-modules</module>
Expand Down Expand Up @@ -966,7 +964,6 @@
<module>parent-boot-1</module>
<module>parent-boot-2</module>
<module>parent-boot-3</module>
<module>parent-spring-4</module>
<module>parent-spring-5</module>
<module>parent-spring-6</module>
<module>apache-kafka</module>
Expand Down Expand Up @@ -1030,7 +1027,6 @@
<module>parent-boot-1</module>
<module>parent-boot-2</module>
<module>parent-boot-3</module>
<module>parent-spring-4</module>
<module>parent-spring-5</module>
<module>parent-spring-6</module>
<module>akka-modules</module>
Expand Down Expand Up @@ -1311,7 +1307,6 @@
<modules>
<module>parent-boot-1</module>
<module>parent-boot-2</module>
<module>parent-spring-4</module>
<module>parent-spring-5</module>
<module>parent-spring-6</module>
</modules>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.baeldung.kafka.batch;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class DataLakeService {
private final Logger logger = LoggerFactory.getLogger(DataLakeService.class);
public void save(List<String> messages) {
logger.info("Transform and save the data into the data lake");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.baeldung.kafka.batch;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.baeldung.countingmessages.Application;

@SpringBootApplication
public class KafkaBatchApplication {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.baeldung.kafka.batch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KpiBatchConsumer {
private final Logger logger = LoggerFactory.getLogger(KpiBatchConsumer.class);

private CountDownLatch latch = new CountDownLatch(1);
@Autowired
private DataLakeService dataLakeService;
private List<String> receivedMessages = new ArrayList<>();

@KafkaListener(id = "kpi-batch-listener", topics = "kpi_batch_topic", batch = "true", containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecords<String, String> records) throws InterruptedException {
logger.info("Number of elements in the records: {}", records.count());
records.forEach(record -> receivedMessages.add(record.value()));

latch.await();

dataLakeService.save(receivedMessages);
latch = new CountDownLatch(1);
}

public CountDownLatch getLatch() {
return latch;
}

public List<String> getReceivedMessages() {
return receivedMessages;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.baeldung.kafka.batch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Profile("no-batch")
public class KpiConsumer {
private final Logger logger = LoggerFactory.getLogger(KpiConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);

private ConsumerRecord<String, String> message;
@Autowired
private DataLakeService dataLakeService;

@KafkaListener(id = "kpi-listener", topics = "kpi_topic", containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) throws InterruptedException {

logger.info("messages received: {}", record.value());

this.message = record;
//pause the current thread and resume it when the count-down latch is reset to 0
latch.await();

List<String> messages = new ArrayList<>();
messages.add(record.value());
dataLakeService.save(messages);
//reset the latch
latch = new CountDownLatch(1);
}

public ConsumerRecord<String, String> getMessage() {
return message;
}

public CountDownLatch getLatch() {
return latch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.baeldung.kafka.batch;

import java.util.concurrent.ExecutionException;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KpiProducer {
private final KafkaTemplate<String, String> kafkaTemplate;

public KpiProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(String topic, String message) throws ExecutionException, InterruptedException {
kafkaTemplate.send(topic, message).get();
this.kafkaTemplate.flush();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.baeldung.kafka.batch;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.ExecutionException;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.ActiveProfiles;

@SpringBootTest
@Import(KafkaKpiConsumerWithBatchConfig.class)
@ActiveProfiles("batch")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "kpi_batch_topic" }, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class KafkaBatchProcessingLiveTest {

private final Logger logger = LoggerFactory.getLogger(KafkaBatchProcessingLiveTest.class);
@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private KpiProducer kpiProducer;

@Autowired
private KpiBatchConsumer kpiBatchConsumer;

@BeforeAll
void setup() throws ExecutionException, InterruptedException {
assertThat(embeddedKafka).isNotNull();
publishMessages();
}

private void publishMessages() throws ExecutionException, InterruptedException {
int count = 1;
String messageTemplate = "Test KPI Message-";
while (count <= 100) {
logger.info("publishing message number {}", count);
kpiProducer.sendMessage("kpi_batch_topic", messageTemplate.concat(Integer.valueOf(count).toString()));
count++;
}
}

@RepeatedTest(5)
void givenKafka_whenMessagesOnTopic_thenListenerConsumesMessages() {
int messageSize = kpiBatchConsumer.getReceivedMessages().size();
logger.info("The message received by test {}", messageSize);
assertThat(messageSize % 20).isEqualTo(0);
kpiBatchConsumer.getLatch().countDown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.baeldung.kafka.batch;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

@TestConfiguration
@Profile("batch")
public class KafkaKpiConsumerWithBatchConfig {
@Bean(name="kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBatchListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory();

Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20");
consumerFactory.updateConfigs(configProps);
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(3000);
factory.setBatchListener(true);

return factory;
}
}
Loading

0 comments on commit 7219cb8

Please sign in to comment.