Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
arunvariyath committed Feb 21, 2025
2 parents 25c8e4d + d755812 commit 246b904
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "avro-elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "avro_logs",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "dlq-elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "failing-topic",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",

"behavior.on.malformed.documents": "ignore",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",

"errors.deadletterqueue.topic.name": "dlq-logs",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "error-handling-elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test-error-handling",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.malformed.documents": "warn",
"behavior.on.error": "LOG",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "flush-optimization-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "logs",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",

"flush.size": "500",
"linger.ms": "1000",
"batch.size": "200",
"max.buffered.records": "10000",
"max.retries": "10",
"retry.backoff.ms": "500",
"max.in.flight.requests": "10",
"behavior.on.malformed.documents": "warn",
"write.method": "bulk",
"request.timeout.ms": "30000",
"connection.timeout.ms": "10000"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "message-id-elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "message_id_logs",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "false",
"schema.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "InsertKey,ExtractId",
"transforms.InsertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields": "user_id",
"transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractId.field": "user_id"
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "partition-key-elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "partition_key_logs",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "elasticsearch-sink-connector-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "logs",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "timestamp-index-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",
"transforms": "TimestampConverter, TimestampRouter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.target.type": "string",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ssX",
"transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.TimestampRouter.topic.format": "${topic}-${timestamp}",
"transforms.TimestampRouter.timestamp.format": "yyyy-MM-dd",
"flush.synchronously": "true"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "timestamp-transform-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "epoch_logs",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.target.type": "string",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ssZ"
}
}

23 changes: 22 additions & 1 deletion spring-batch-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
Expand Down Expand Up @@ -69,6 +83,12 @@
<version>${jettison.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
Expand Down Expand Up @@ -100,8 +120,9 @@
<http-client.version>4.5.14</http-client.version>
<jaxb.version>4.0.2</jaxb.version>
<jettison.version>1.5.4</jettison.version>
<spring.batch.version>5.1.2</spring.batch.version>
<spring.batch.version>5.2.0</spring.batch.version>
<awaitility.version>4.2.1</awaitility.version>
<assertj.version>3.24.2</assertj.version>
<start-class>com.baeldung.batch.SpringBootBatchProcessingApplication</start-class>
</properties>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.baeldung.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.Arrays;

import com.baeldung.batch.model.Product;

@Configuration
@EnableBatchProcessing
public class CompositeItemReaderConfig {

@Bean
public DataSource dataSource() {
return DataSourceBuilder.create()
.driverClassName("org.h2.Driver")
.url("jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;")
.username("sa")
.password("")
.build();
}


@Bean
public FlatFileItemReader<Product> fileReader() {
return new FlatFileItemReaderBuilder<Product>()
.name("fileReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("productId", "productName", "stock", "price")
.linesToSkip(1)
.targetType(Product.class)
.build();
}

@Bean
public JdbcCursorItemReader<Product> dbReader() {
return new JdbcCursorItemReaderBuilder<Product>()
.name("dbReader")
.dataSource(dataSource())
.sql("SELECT productid, productname, stock, price FROM products")
.rowMapper((rs, rowNum) -> new Product(
rs.getLong("productid"),
rs.getString("productname"),
rs.getInt("stock"),
rs.getBigDecimal("price")
))
.build();
}

@Bean
public CompositeItemReader<Product> compositeReader() {
return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader()));
}

@Bean
public ItemWriter<Product> productWriter() {
return items -> {
for (Product product : items) {
System.out.println("Writing product: " + product);
}
};
}

@Bean
public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("compositeReader") ItemReader<Product> compositeReader, ItemWriter<Product> productWriter) {
return new StepBuilder("productStep", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(compositeReader)
.writer(productWriter)
.build();
}

@Bean
public Job productJob(JobRepository jobRepository, Step step) {
return new JobBuilder("productJob", jobRepository)
.start(step)
.build();
}
}
63 changes: 63 additions & 0 deletions spring-batch-2/src/main/java/com/baeldung/batch/model/Product.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.baeldung.batch.model;

import java.math.BigDecimal;
import jakarta.persistence.*;

@Entity
@Table(name = "products")
public class Product {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "productid")
private Long productId;
@Column(name = "productname")
private String productName;
@Column(name = "stock")
private Integer stock;
@Column(name = "price")
private BigDecimal price;

public Product() {

}

public Product(Long productId, String productName, Integer stock, BigDecimal price) {
this.productId = productId;
this.productName = productName;
this.stock = stock;
this.price = price;
}

public Long getProductId() {
return productId;
}

public void setProductId(Long productId) {
this.productId = productId;
}

public String getProductName() {
return productName;
}

public void setProductName(String productName) {
this.productName = productName;
}

public Integer getStock() {
return stock;
}

public void setStock(Integer stock) {
this.stock = stock;
}

public BigDecimal getPrice() {
return price;
}

public void setPrice(BigDecimal price) {
this.price = price;
}
}
2 changes: 2 additions & 0 deletions spring-batch-2/src/main/resources/products.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
productId,productName,stock,price
101,Apple,50,1.99
Loading

0 comments on commit 246b904

Please sign in to comment.