Skip to content

Commit 00d52c0

Browse files
authored
SCALA-330: introduction to Kafka with Scala (#776)
* SCALA-330: introduction to kafka with scala (simple produce-consume flow) * SCALA-330: introduction to kafka with scala (refactor produce-consume flow) * SCALA-330: introduction to kafka with scala (add avro value serde) * SCALA-330: introduction to kafka with scala (add transaction and isolate serde config) * SCALA-330: introduction to kafka with scala (fix jsr310 date serialization) * SCALA-330: introduction to kafka with scala (pin image version)
1 parent d35785c commit 00d52c0

21 files changed

+589
-2
lines changed

build.sbt

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,6 @@ val fs2Version = "3.7.0"
230230
val AkkaVersion = "2.8.0"
231231
val AkkaHttpVersion = "10.5.0"
232232
val reactiveMongo = "1.0.10"
233-
val spireVersion = "0.18.0"
234233

235234
lazy val scala_libraries = (project in file("scala-libraries"))
236235
.settings(
@@ -386,15 +385,34 @@ lazy val scala_libraries_4 = (project in file("scala-libraries-4"))
386385
IntegrationTest / fork := true
387386
)
388387

388+
val spireVersion = "0.18.0"
389+
val kafkaVersion = "3.5.0"
390+
val pureconfigVersion = "0.17.4"
391+
val jackSonVersion = "2.15.1"
392+
val log4jApiScalaVersion = "12.0"
393+
val log4jVersion = "2.20.0"
394+
val avro4sVersion = "3.1.1"
395+
val kafkaAvroSerializer = "6.0.0"
396+
389397
lazy val scala_libraries_5 = (project in file("scala-libraries-5"))
390398
.settings(
391399
name := "scala-libraries-5",
400+
resolvers += "Kafka avro serializer" at "https://packages.confluent.io/maven",
392401
scalaVersion := scalaV,
393402
libraryDependencies ++= scalaTestDeps,
394403
libraryDependencies ++= Seq(
395404
sparkSqlDep,
396405
sparkCoreDep,
397-
"org.typelevel" %% "spire" % spireVersion
406+
"org.typelevel" %% "spire" % spireVersion,
407+
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
408+
"com.github.pureconfig" %% "pureconfig" % pureconfigVersion,
409+
"com.fasterxml.jackson.core" % "jackson-databind" % jackSonVersion,
410+
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jackSonVersion,
411+
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jackSonVersion,
412+
"com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion,
413+
"io.confluent" % "kafka-avro-serializer" % kafkaAvroSerializer,
414+
"org.apache.logging.log4j" %% "log4j-api-scala" % log4jApiScalaVersion,
415+
"org.apache.logging.log4j" % "log4j-core" % log4jVersion % Runtime
398416
)
399417
)
400418

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
version: "2"
2+
3+
volumes:
4+
kafka_data:
5+
driver: local
6+
7+
networks:
8+
kafka_net:
9+
driver: bridge
10+
traccar-net:
11+
driver: bridge
12+
13+
services:
14+
kafka:
15+
image: docker.io/bitnami/kafka:3.4.0-debian-11-r23
16+
networks:
17+
- kafka_net
18+
ports:
19+
- "9094:9094"
20+
volumes:
21+
- "kafka_data:/bitnami"
22+
environment:
23+
- BITNAMI_DEBUG=yes
24+
- ALLOW_PLAINTEXT_LISTENER=yes
25+
- KAFKA_KRAFT_CLUSTER_ID=9YoavaRpTCOitT3Dm2OQFQ
26+
# For more details see See https://rmoff.net/2018/08/02/kafka-listeners-explained/
27+
- KAFKA_CFG_LISTENERS=CLIENT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
28+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9094
29+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
30+
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
31+
kafka-schema-registry:
32+
image: bitnami/schema-registry:6.0
33+
networks:
34+
- kafka_net
35+
ports:
36+
- "8081:8081"
37+
depends_on:
38+
kafka:
39+
condition: service_started
40+
environment:
41+
SCHEMA_REGISTRY_DEBUG: true
42+
SCHEMA_REGISTRY_KAFKA_BROKERS: PLAINTEXT://kafka:9092
43+
kafka-ui:
44+
image: provectuslabs/kafka-ui:v0.7.0
45+
networks:
46+
- kafka_net
47+
ports:
48+
- "8088:8080"
49+
depends_on:
50+
kafka:
51+
condition: service_started
52+
kafka-schema-registry:
53+
condition: service_started
54+
environment:
55+
KAFKA_CLUSTERS_0_NAME: baeldung
56+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
57+
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://kafka-schema-registry:8081
58+
DYNAMIC_CONFIG_ENABLED: 'true'
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
bootstrap-servers = "localhost:9094"
2+
schema-registry-url = "http://localhost:8081"
3+
topic="scala-articles-avro"
4+
5+
producer {
6+
client.id = baeldung-scala-kafka-producer
7+
bootstrap.servers = ${bootstrap-servers}
8+
transactional.id = "baeldung-scala-kafka-producer"
9+
}
10+
11+
serde {
12+
schema.registry.url = ${schema-registry-url}
13+
}
14+
15+
16+
consumer {
17+
group.id = baeldung-scala-kafka-consumer
18+
bootstrap.servers = ${bootstrap-servers}
19+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
bootstrap-servers = "localhost:9094"
2+
topic="scala-articles"
3+
4+
producer {
5+
client.id = baeldung-scala-kafka-producer
6+
bootstrap.servers = ${bootstrap-servers}
7+
}
8+
9+
10+
consumer {
11+
group.id = baeldung-scala-kafka-consumer
12+
bootstrap.servers = ${bootstrap-servers}
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<Configuration status="WARN">
3+
<Appenders>
4+
<Console name="Console" target="SYSTEM_OUT">
5+
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
6+
</Console>
7+
</Appenders>
8+
<Loggers>
9+
<Root level="info">
10+
<AppenderRef ref="Console"/>
11+
</Root>
12+
</Loggers>
13+
</Configuration>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.baeldung.scala.kafka.intro.common
2+
3+
import java.time.LocalDate
4+
5+
case class Article(
6+
id: String,
7+
title: String,
8+
content: String,
9+
created: LocalDate,
10+
author: Author
11+
)
12+
13+
case class Author(id: Int, name: String)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.baeldung.scala.kafka.intro.common
2+
3+
import com.typesafe.config.Config
4+
5+
import scala.jdk.CollectionConverters._
6+
trait ClientConfig {
7+
implicit class configMapperOps(config: Config) {
8+
def asJavaMap: java.util.Map[String, AnyRef] = config.toMap.asJava
9+
10+
def toMap: Map[String, AnyRef] = config
11+
.entrySet()
12+
.asScala
13+
.map(pair => (pair.getKey, config.getAnyRef(pair.getKey)))
14+
.toMap
15+
}
16+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.baeldung.scala.kafka.intro.common
2+
3+
import com.typesafe.config.Config
4+
import pureconfig.ConfigSource
5+
import pureconfig.generic.auto.exportReader
6+
7+
import java.util
8+
9+
case class SerdeConfig(serde: Config)
10+
11+
object SerdeConfig extends ClientConfig {
12+
def getConfig(resource: String): util.Map[String, AnyRef] = {
13+
val source =
14+
ConfigSource.resources(resource).loadOrThrow[SerdeConfig]
15+
val serde = source.serde.asJavaMap
16+
serde
17+
}
18+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.baeldung.scala.kafka.intro.consumer
2+
3+
import com.baeldung.scala.kafka.intro.common.{Article, SerdeConfig}
4+
import com.baeldung.scala.kafka.intro.consumer.common.{
5+
AvroDeSerializer,
6+
ConsumerConfig,
7+
ConsumerUtils
8+
}
9+
import com.sksamuel.avro4s.RecordFormat
10+
import org.apache.kafka.clients.consumer.KafkaConsumer
11+
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
12+
13+
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS}
14+
import scala.jdk.CollectionConverters._
15+
import scala.jdk.javaapi.CollectionConverters.asJavaCollection
16+
import scala.util.Try
17+
18+
object ArticleAvroConsumer
19+
extends App
20+
with ConsumerUtils[Article]
21+
with AvroDeSerializer {
22+
23+
private val (config, topic) =
24+
ConsumerConfig.getConfig("kafka-intro-avro.conf")
25+
private val serde = SerdeConfig.getConfig("kafka-intro-avro.conf")
26+
27+
val keyDeSerializer: StringDeserializer = new StringDeserializer()
28+
29+
implicit lazy val Valueformat: RecordFormat[Article] = RecordFormat[Article]
30+
val valueDeSerializer: Deserializer[Article] = deserializer[Article]
31+
valueDeSerializer.configure(serde, false)
32+
33+
private val consumer =
34+
new KafkaConsumer(config, keyDeSerializer, valueDeSerializer)
35+
36+
consumer.subscribe(asJavaCollection(List(topic)))
37+
38+
consumer.seekToBeginning(Nil.asJava)
39+
40+
Try {
41+
while (true) {
42+
val messages = pool(consumer, FiniteDuration(1, MILLISECONDS))
43+
44+
for ((_, article) <- messages) {
45+
logger.info(
46+
s"New article received. Title: ${article.title} . Author: ${article.author.name}, Date: ${article.created} "
47+
)
48+
}
49+
}
50+
}.recover { case error =>
51+
logger.error(error)
52+
logger.error(
53+
"Something went wrong when seeking messsages from begining. Unsubscribing"
54+
)
55+
consumer.unsubscribe();
56+
}
57+
consumer.close()
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.baeldung.scala.kafka.intro.consumer
2+
3+
import com.baeldung.scala.kafka.intro.common.Article
4+
import com.baeldung.scala.kafka.intro.consumer.common.{
5+
ConsumerConfig,
6+
ConsumerUtils,
7+
JsonStringDeSerializer
8+
}
9+
import org.apache.kafka.clients.consumer.KafkaConsumer
10+
11+
import java.util.concurrent.TimeUnit.SECONDS
12+
import scala.concurrent.duration.FiniteDuration
13+
import scala.jdk.javaapi.CollectionConverters.asJavaCollection
14+
15+
object ArticleJsonStringConsumer
16+
extends App
17+
with ConsumerUtils[Article]
18+
with JsonStringDeSerializer[Article] {
19+
20+
private val (config, topic) = ConsumerConfig.getConfig("kafka-intro.conf")
21+
22+
private val consumer =
23+
new KafkaConsumer(config, keyDeSerializer, valueDeSerializer)
24+
25+
consumer.subscribe(asJavaCollection(List(topic)))
26+
27+
while (true) {
28+
val messages = pool(consumer, FiniteDuration(1, SECONDS))
29+
for ((_, value) <- messages) {
30+
val article = fromJsonString(value)
31+
logger.info(
32+
s"New article received. Title: ${article.title} . Author: ${article.author.name} "
33+
)
34+
}
35+
consumer.commitAsync()
36+
}
37+
38+
}

0 commit comments

Comments
 (0)