diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/avro-sink-config.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/avro-sink-config.json new file mode 100644 index 000000000000..90277f6d5bd2 --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/avro-sink-config.json @@ -0,0 +1,14 @@ +{ + "name": "avro-elasticsearch-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "avro_logs", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "true", + "schema.ignore": "false", + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": "http://schema-registry:8081" + } + } + \ No newline at end of file diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/dlq-sink-config.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/dlq-sink-config.json new file mode 100644 index 000000000000..a473c16e5421 --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/dlq-sink-config.json @@ -0,0 +1,20 @@ +{ + "name": "dlq-elasticsearch-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "failing-topic", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "true", + "schema.ignore": "true", + + "behavior.on.malformed.documents": "ignore", + "errors.tolerance": "all", + "errors.log.enable": "true", + "errors.log.include.messages": "true", + + "errors.deadletterqueue.topic.name": "dlq-logs", + "errors.deadletterqueue.topic.replication.factor": "1", + "errors.deadletterqueue.context.headers.enable": "true" + } +} diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/error-handling-sink-config.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/error-handling-sink-config.json new file mode 100644 index 000000000000..ace275d8d4de --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/error-handling-sink-config.json @@ -0,0 +1,16 @@ +{ + "name": "error-handling-elasticsearch-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "test-error-handling", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "true", + "schema.ignore": "true", + "behavior.on.malformed.documents": "warn", + "behavior.on.error": "LOG", + "errors.tolerance": "all", + "errors.log.enable": "true", + "errors.log.include.messages": "true" + } +} \ No newline at end of file diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/flush-optimization-sink.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/flush-optimization-sink.json new file mode 100644 index 000000000000..bf33f926e495 --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/flush-optimization-sink.json @@ -0,0 +1,23 @@ +{ + "name": "flush-optimization-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "logs", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "true", + "schema.ignore": "true", + + "flush.size": "500", + "linger.ms": "1000", + "batch.size": "200", + "max.buffered.records": "10000", + "max.retries": "10", + "retry.backoff.ms": "500", + "max.in.flight.requests": "10", + "behavior.on.malformed.documents": "warn", + "write.method": "bulk", + "request.timeout.ms": "30000", + "connection.timeout.ms": "10000" + } +} diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/message-id-sink-config.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/message-id-sink-config.json new file mode 100644 index 000000000000..9f92e00a96bd --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/message-id-sink-config.json @@ -0,0 +1,19 @@ +{ + "name": "message-id-elasticsearch-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "message_id_logs", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "false", + "schema.ignore": "true", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertKey,ExtractId", + "transforms.InsertKey.type": "org.apache.kafka.connect.transforms.ValueToKey", + "transforms.InsertKey.fields": "user_id", + "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key", + "transforms.ExtractId.field": "user_id" + } +} + \ No newline at end of file diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/partition-key-sink-config.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/partition-key-sink-config.json new file mode 100644 index 000000000000..4bb4a4b30d34 --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/partition-key-sink-config.json @@ -0,0 +1,12 @@ +{ + "name": "partition-key-elasticsearch-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "partition_key_logs", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "false", + "schema.ignore": "true", + "key.converter": "org.apache.kafka.connect.storage.StringConverter" + } +} diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/test-connector.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/test-connector.json new file mode 100644 index 000000000000..611c44e25be1 --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/test-connector.json @@ -0,0 +1,12 @@ +{ + "name": "elasticsearch-sink-connector-test", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "logs", + "connection.url": "http://elasticsearch:9200", + "type.name": "_doc", + "key.ignore": "true", + "schema.ignore": "true" + } +} \ No newline at end of file diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/timestamp-index-sink.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/timestamp-index-sink.json new file mode 100644 index 000000000000..f6ac3bc5a75b --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/timestamp-index-sink.json @@ -0,0 +1,20 @@ +{ + "name": "timestamp-index-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "test", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "true", + "schema.ignore": "true", + "transforms": "TimestampConverter, TimestampRouter", + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "timestamp", + "transforms.TimestampConverter.target.type": "string", + "transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ssX", + "transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter", + "transforms.TimestampRouter.topic.format": "${topic}-${timestamp}", + "transforms.TimestampRouter.timestamp.format": "yyyy-MM-dd", + "flush.synchronously": "true" + } +} \ No newline at end of file diff --git a/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/timestamp-transform-sink.json b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/timestamp-transform-sink.json new file mode 100644 index 000000000000..38aa7bf3a0e6 --- /dev/null +++ b/apache-kafka-3/src/main/resources/kafka-connector-elasticsearch/timestamp-transform-sink.json @@ -0,0 +1,17 @@ +{ + "name": "timestamp-transform-sink", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "epoch_logs", + "connection.url": "http://elasticsearch:9200", + "key.ignore": "true", + "schema.ignore": "true", + "transforms": "TimestampConverter", + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "timestamp", + "transforms.TimestampConverter.target.type": "string", + "transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ssZ" + } +} + \ No newline at end of file diff --git a/spring-batch-2/pom.xml b/spring-batch-2/pom.xml index 8cbe4ab99f22..8205853108e3 100644 --- a/spring-batch-2/pom.xml +++ b/spring-batch-2/pom.xml @@ -17,6 +17,20 @@ + + org.springframework.batch + spring-batch-core + ${spring.batch.version} + + + org.springframework.batch + spring-batch-infrastructure + ${spring.batch.version} + + + org.springframework.boot + spring-boot-starter-data-jpa + org.springframework.boot spring-boot-starter-batch @@ -69,6 +83,12 @@ ${jettison.version} compile + + org.assertj + assertj-core + ${assertj.version} + test + org.springframework.boot spring-boot-starter-quartz @@ -100,8 +120,9 @@ 4.5.14 4.0.2 1.5.4 - 5.1.2 + 5.2.0 4.2.1 + 3.24.2 com.baeldung.batch.SpringBootBatchProcessingApplication diff --git a/spring-batch-2/src/main/java/com/baeldung/batch/CompositeItemReaderConfig.java b/spring-batch-2/src/main/java/com/baeldung/batch/CompositeItemReaderConfig.java new file mode 100644 index 000000000000..7c39a1d792b1 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batch/CompositeItemReaderConfig.java @@ -0,0 +1,100 @@ +package com.baeldung.batch; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; +import org.springframework.batch.item.support.CompositeItemReader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.jdbc.DataSourceBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; +import java.util.Arrays; + +import com.baeldung.batch.model.Product; + +@Configuration +@EnableBatchProcessing +public class CompositeItemReaderConfig { + + @Bean + public DataSource dataSource() { + return DataSourceBuilder.create() + .driverClassName("org.h2.Driver") + .url("jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;") + .username("sa") + .password("") + .build(); + } + + + @Bean + public FlatFileItemReader fileReader() { + return new FlatFileItemReaderBuilder() + .name("fileReader") + .resource(new ClassPathResource("products.csv")) + .delimited() + .names("productId", "productName", "stock", "price") + .linesToSkip(1) + .targetType(Product.class) + .build(); + } + + @Bean + public JdbcCursorItemReader dbReader() { + return new JdbcCursorItemReaderBuilder() + .name("dbReader") + .dataSource(dataSource()) + .sql("SELECT productid, productname, stock, price FROM products") + .rowMapper((rs, rowNum) -> new Product( + rs.getLong("productid"), + rs.getString("productname"), + rs.getInt("stock"), + rs.getBigDecimal("price") + )) + .build(); + } + + @Bean + public CompositeItemReader compositeReader() { + return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader())); + } + + @Bean + public ItemWriter productWriter() { + return items -> { + for (Product product : items) { + System.out.println("Writing product: " + product); + } + }; + } + + @Bean + public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("compositeReader") ItemReader compositeReader, ItemWriter productWriter) { + return new StepBuilder("productStep", jobRepository) + .chunk(10, transactionManager) + .reader(compositeReader) + .writer(productWriter) + .build(); + } + + @Bean + public Job productJob(JobRepository jobRepository, Step step) { + return new JobBuilder("productJob", jobRepository) + .start(step) + .build(); + } +} \ No newline at end of file diff --git a/spring-batch-2/src/main/java/com/baeldung/batch/model/Product.java b/spring-batch-2/src/main/java/com/baeldung/batch/model/Product.java new file mode 100644 index 000000000000..c4586dceb10d --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batch/model/Product.java @@ -0,0 +1,63 @@ +package com.baeldung.batch.model; + +import java.math.BigDecimal; +import jakarta.persistence.*; + +@Entity +@Table(name = "products") +public class Product { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "productid") + private Long productId; + @Column(name = "productname") + private String productName; + @Column(name = "stock") + private Integer stock; + @Column(name = "price") + private BigDecimal price; + + public Product() { + + } + + public Product(Long productId, String productName, Integer stock, BigDecimal price) { + this.productId = productId; + this.productName = productName; + this.stock = stock; + this.price = price; + } + + public Long getProductId() { + return productId; + } + + public void setProductId(Long productId) { + this.productId = productId; + } + + public String getProductName() { + return productName; + } + + public void setProductName(String productName) { + this.productName = productName; + } + + public Integer getStock() { + return stock; + } + + public void setStock(Integer stock) { + this.stock = stock; + } + + public BigDecimal getPrice() { + return price; + } + + public void setPrice(BigDecimal price) { + this.price = price; + } +} diff --git a/spring-batch-2/src/main/resources/products.csv b/spring-batch-2/src/main/resources/products.csv new file mode 100644 index 000000000000..d9f2d5d5ca0a --- /dev/null +++ b/spring-batch-2/src/main/resources/products.csv @@ -0,0 +1,2 @@ +productId,productName,stock,price +101,Apple,50,1.99 \ No newline at end of file diff --git a/spring-batch-2/src/test/java/com/baeldung/batch/CompositeItemReaderUnitTest.java b/spring-batch-2/src/test/java/com/baeldung/batch/CompositeItemReaderUnitTest.java new file mode 100644 index 000000000000..d716b3fd4617 --- /dev/null +++ b/spring-batch-2/src/test/java/com/baeldung/batch/CompositeItemReaderUnitTest.java @@ -0,0 +1,91 @@ +package com.baeldung.batch; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.math.BigDecimal; +import java.util.Arrays; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.batch.item.support.CompositeItemReader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.context.ContextConfiguration; + +import com.baeldung.batch.model.Product; + +@SpringBootTest +@EnableAutoConfiguration +@ContextConfiguration(classes = { CompositeItemReaderConfig.class }) +public class CompositeItemReaderUnitTest { + @Autowired + private CompositeItemReader compositeReader; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @BeforeEach + public void setUp() { + jdbcTemplate.update("DELETE FROM products"); + jdbcTemplate.update("INSERT INTO products (productid, productname, stock, price) VALUES (?, ?, ?, ?)", + 102, "Banana", 30, 1.49); + } + + @Test + public void givenTwoReaders_whenRead_thenProcessProductsInOrder() throws Exception { + StepExecution stepExecution = new StepExecution("testStep", new JobExecution(1L, new JobParameters()), 1L); + ExecutionContext executionContext = stepExecution.getExecutionContext(); + compositeReader.open(executionContext); + + Product product1 = compositeReader.read(); + assertNotNull(product1); + assertEquals(101, product1.getProductId()); + assertEquals("Apple", product1.getProductName()); + + Product product2 = compositeReader.read(); + assertNotNull(product2); + assertEquals(102, product2.getProductId()); + assertEquals("Banana", product2.getProductName()); + } + + @Test + public void givenMultipleReader_whenOneReaderReturnNull_thenProcessDataFromNextReader() throws Exception { + ItemStreamReader emptyReader = mock(ItemStreamReader.class); + when(emptyReader.read()).thenReturn(null); + + ItemStreamReader validReader = mock(ItemStreamReader.class); + when(validReader.read()).thenReturn(new Product(103L, "Cherry", 20, BigDecimal.valueOf(2.99)), null); + + CompositeItemReader compositeReader = new CompositeItemReader<>(Arrays.asList(emptyReader, validReader)); + + Product product = compositeReader.read(); + assertNotNull(product); + assertEquals(103, product.getProductId()); + assertEquals("Cherry", product.getProductName()); + } + + + @Test + public void givenEmptyReaders_whenRead_thenReturnNull() throws Exception { + ItemStreamReader emptyReader = mock(ItemStreamReader.class); + when(emptyReader.read()).thenReturn(null); + + CompositeItemReader compositeReader = new CompositeItemReader<>(Arrays.asList(emptyReader, emptyReader)); + + Product product = compositeReader.read(); + assertNull(product); + } +} diff --git a/spring-batch-2/src/test/java/com/baeldung/batch/SpringBatchRetryIntegrationTest.java b/spring-batch-2/src/test/java/com/baeldung/batch/SpringBatchRetryIntegrationTest.java index c912e46599b8..d58e560bc21a 100644 --- a/spring-batch-2/src/test/java/com/baeldung/batch/SpringBatchRetryIntegrationTest.java +++ b/spring-batch-2/src/test/java/com/baeldung/batch/SpringBatchRetryIntegrationTest.java @@ -5,12 +5,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -import static org.springframework.batch.test.AssertFile.assertFileEquals; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.springframework.batch.core.ExitStatus; @@ -76,7 +76,7 @@ public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception { assertEquals("retryBatchJob", actualJobInstance.getJobName()); assertEquals("COMPLETED", actualJobExitStatus.getExitCode()); - assertFileEquals(expectedResult, actualResult); + org.assertj.core.api.Assertions.assertThat(actualResult.getFile()).hasSameTextualContentAs(expectedResult.getFile()); } private JobParameters defaultJobParameters() {