Skip to content

Commit b535d14

Browse files
Adds avro and schema registry
1 parent 9342c0a commit b535d14

10 files changed

+75
-19
lines changed

build.gradle.kts

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ plugins {
55
id("io.spring.dependency-management") version "1.1.0"
66
kotlin("jvm") version "1.8.21"
77
kotlin("plugin.spring") version "1.8.21"
8+
kotlin("plugin.serialization") version "1.8.21"
89
}
910

1011
group = "com.omprakash"
@@ -13,6 +14,10 @@ java.sourceCompatibility = JavaVersion.VERSION_17
1314

1415
repositories {
1516
mavenCentral()
17+
18+
maven {
19+
url= uri("https://packages.confluent.io/maven/")
20+
}
1621
}
1722

1823
dependencies {
@@ -34,6 +39,10 @@ dependencies {
3439
// Kafka
3540
implementation("org.springframework.kafka:spring-kafka")
3641
implementation("com.fasterxml.jackson.core:jackson-databind")
42+
43+
// Avro
44+
implementation("io.confluent:kafka-avro-serializer:7.4.0")
45+
3746
}
3847

3948
tasks.withType<KotlinCompile> {

docker-compose.yaml

+14
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,20 @@ services:
4343
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
4444
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
4545
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
46+
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
47+
48+
schema-registry:
49+
image: confluentinc/cp-schema-registry:7.3.2
50+
hostname: schema-registry
51+
container_name: schema-registry
52+
depends_on:
53+
- kafka
54+
ports:
55+
- "8081:8081"
56+
environment:
57+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
58+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
59+
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
4660

4761
volumes:
4862
postgres_dev:

src/main/kotlin/com/omprakash/springbootplayground/SpringBootPlaygroundApplication.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ class SpringBootPlaygroundApplication
1010

1111
fun main(args: Array<String>) {
1212
runApplication<SpringBootPlaygroundApplication>(*args)
13-
}
13+
}

src/main/kotlin/com/omprakash/springbootplayground/controllers/BookController.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.omprakash.springbootplayground.controllers
22

33
import com.omprakash.springbootplayground.kafka.BookCreatedKafkaProducer
4-
import com.omprakash.springbootplayground.kafka.message.BookCreated
54
import com.omprakash.springbootplayground.models.Book
65
import com.omprakash.springbootplayground.services.BookService
76
import kotlinx.coroutines.flow.Flow
@@ -21,7 +20,7 @@ class BookController(private val bookService: BookService, private val bookKafka
2120
@GetMapping("/publish")
2221
suspend fun publishBook(): String {
2322
return try {
24-
bookKafkaProducer.publishBook(BookCreated(1, "Test", "1234")).toString()
23+
bookKafkaProducer.publishBook(1L, "Test", "1234").toString()
2524
} catch (e: Exception) {
2625
println("Exception while publishing books ${e.message}")
2726
e.message ?: "Exception"
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package com.omprakash.springbootplayground.kafka
22

3-
import com.omprakash.springbootplayground.kafka.message.BookCreated
3+
import org.apache.avro.generic.GenericRecord
44
import org.springframework.kafka.annotation.KafkaListener
55
import org.springframework.stereotype.Service
66

77
@Service
88
class BookCreatedKafkaConsumer {
99

1010
@KafkaListener(topics = [TOPICS.BOOKS_CREATED], groupId = GROUP_ID.BOOKS_CREATED_CONSUMER, containerFactory = "bookCreatedKafkaListenerContainerFactory")
11-
fun onBookCreated(bookCreated: BookCreated) {
11+
fun onBookCreated(bookCreated: GenericRecord) {
1212
println("Consumed message $bookCreated")
1313
}
1414
}
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
11
package com.omprakash.springbootplayground.kafka
22

33
import com.omprakash.springbootplayground.kafka.message.BookCreated
4+
import org.apache.avro.Schema
5+
import org.apache.avro.generic.GenericRecord
6+
import org.apache.avro.generic.GenericRecordBuilder
47
import org.apache.kafka.clients.producer.ProducerRecord
58
import org.apache.kafka.clients.producer.RecordMetadata
69
import org.springframework.kafka.core.KafkaTemplate
710
import org.springframework.stereotype.Service
11+
import java.io.File
812

913
@Service
10-
class BookCreatedKafkaProducer(private val kafkaTemplate: KafkaTemplate<String, BookCreated>) {
14+
class BookCreatedKafkaProducer(private val kafkaTemplate: KafkaTemplate<String, GenericRecord>) {
1115

12-
suspend fun publishBook(bookCreated: BookCreated): RecordMetadata {
13-
val sendResult = kafkaTemplate.sendAsync(ProducerRecord(TOPICS.BOOKS_CREATED, "${bookCreated.id}", bookCreated))
16+
suspend fun publishBook(id: Long = 1L, title: String = "", isbn: String = ""): RecordMetadata {
17+
val bookCreated = BookCreated(id, title, isbn)
18+
val schema = Schema.Parser().parse(File("src/main/resources/book_created.avsc"))
19+
val avroRecord = GenericRecordBuilder(schema).apply {
20+
set("id", bookCreated.id)
21+
set("title", bookCreated.title)
22+
set("isbn", bookCreated.isbn)
23+
}.build()
24+
val sendResult = kafkaTemplate.sendAsync(ProducerRecord(TOPICS.BOOKS_CREATED, "${bookCreated.id}", avroRecord))
1425
return sendResult.recordMetadata
1526
}
1627
}

src/main/kotlin/com/omprakash/springbootplayground/kafka/ConsumerConfig.kt

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.omprakash.springbootplayground.kafka
22

3-
import com.omprakash.springbootplayground.kafka.message.BookCreated
3+
import io.confluent.kafka.serializers.KafkaAvroDeserializer
4+
import org.apache.avro.generic.GenericRecord
45
import org.apache.kafka.clients.consumer.ConsumerConfig.*
56
import org.apache.kafka.common.serialization.StringDeserializer
67
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
@@ -18,19 +19,20 @@ object GROUP_ID {
1819
@Configuration
1920
class ConsumerConfig(val kafkaProperties: KafkaProperties) {
2021
@Bean
21-
fun consumerFactory(): ConsumerFactory<String, BookCreated> {
22-
val props = mapOf<String, Any>(
22+
fun consumerFactory(): ConsumerFactory<String, GenericRecord> {
23+
val props = mapOf<String, Any>(
2324
BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
2425
KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
25-
VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
26+
VALUE_DESERIALIZER_CLASS_CONFIG to KafkaAvroDeserializer::class.java,
27+
"schema.registry.url" to "http://localhost:8081",
2628
JsonDeserializer.TRUSTED_PACKAGES to "*"
2729
)
2830
return DefaultKafkaConsumerFactory(props)
2931
}
3032

3133
@Bean
32-
fun bookCreatedKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, BookCreated> {
33-
return ConcurrentKafkaListenerContainerFactory<String, BookCreated>().apply {
34+
fun bookCreatedKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, GenericRecord> {
35+
return ConcurrentKafkaListenerContainerFactory<String, GenericRecord>().apply {
3436
consumerFactory = consumerFactory()
3537
}
3638
}

src/main/kotlin/com/omprakash/springbootplayground/kafka/ProducerConfig.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.omprakash.springbootplayground.kafka
22

3-
import com.omprakash.springbootplayground.kafka.message.BookCreated
3+
import io.confluent.kafka.serializers.KafkaAvroSerializer
4+
import org.apache.avro.generic.GenericRecord
45
import org.apache.kafka.clients.producer.ProducerConfig.*
56
import org.apache.kafka.common.serialization.StringSerializer
67
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
@@ -9,17 +10,17 @@ import org.springframework.context.annotation.Configuration
910
import org.springframework.kafka.core.DefaultKafkaProducerFactory
1011
import org.springframework.kafka.core.KafkaTemplate
1112
import org.springframework.kafka.core.ProducerFactory
12-
import org.springframework.kafka.support.serializer.JsonSerializer
1313

1414
@Configuration
1515
class ProducerConfig(val kafkaProperties: KafkaProperties) {
1616

1717
@Bean
18-
fun producerFactory(): ProducerFactory<String, BookCreated> {
18+
fun producerFactory(): ProducerFactory<String, GenericRecord> {
1919
return mapOf<String, Any>(
2020
BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
2121
KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
22-
VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java
22+
VALUE_SERIALIZER_CLASS_CONFIG to KafkaAvroSerializer::class.java,
23+
"schema.registry.url" to "http://localhost:8081",
2324
).let(::DefaultKafkaProducerFactory)
2425
}
2526

Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
package com.omprakash.springbootplayground.kafka.message
22

3-
data class BookCreated(val id: Long = 1L, val title: String , val isbn: String)
3+
4+
data class BookCreated(val id: Long = 1L, val title: String = "", val isbn: String = "")

src/main/resources/book_created.avsc

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"type": "record",
3+
"name": "BookCreated",
4+
"namespace": "com.omprakash.avro",
5+
"fields": [
6+
{
7+
"name": "id",
8+
"type": "long"
9+
},
10+
{
11+
"name": "title",
12+
"type": "string"
13+
},
14+
{
15+
"name": "isbn",
16+
"type": "string"
17+
}
18+
]
19+
}

0 commit comments

Comments
 (0)