Skip to content

Commit 60d5182

Browse files
Adds distributed tracing but with bug in coroutines context: micrometer-metrics/tracing#174
1 parent b535d14 commit 60d5182

12 files changed

+128
-12
lines changed

build.gradle.kts

+9
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ dependencies {
4343
// Avro
4444
implementation("io.confluent:kafka-avro-serializer:7.4.0")
4545

46+
// Tracing
47+
implementation("org.springframework.boot:spring-boot-starter-actuator")
48+
implementation("io.micrometer:micrometer-tracing")
49+
implementation("io.micrometer:micrometer-tracing-bridge-brave")
50+
implementation("io.zipkin.reporter2:zipkin-reporter-brave")
51+
implementation("net.logstash.logback:logstash-logback-encoder:7.3")
52+
implementation("io.zipkin.brave:brave-context-slf4j")
53+
implementation("io.zipkin.brave:brave-instrumentation-kafka-clients")
54+
4655
}
4756

4857
tasks.withType<KotlinCompile> {

docker-compose.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,10 @@ services:
5858
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
5959
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
6060

61+
zipkin-server:
62+
image: openzipkin/zipkin
63+
ports:
64+
- "9411:9411"
65+
6166
volumes:
6267
postgres_dev:

src/main/kotlin/com/omprakash/springbootplayground/SpringBootPlaygroundApplication.kt

+3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package com.omprakash.springbootplayground
22

3+
import com.omprakash.springbootplayground.config.TracingConfig
34
import org.springframework.boot.autoconfigure.SpringBootApplication
45
import org.springframework.boot.runApplication
6+
import org.springframework.context.annotation.Import
57
import org.springframework.kafka.annotation.EnableKafka
68

79
@SpringBootApplication
810
@EnableKafka
11+
@Import(TracingConfig::class)
912
class SpringBootPlaygroundApplication
1013

1114
fun main(args: Array<String>) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.omprakash.springbootplayground.config
2+
3+
import brave.baggage.BaggageField
4+
import brave.baggage.CorrelationScopeConfig
5+
import brave.context.slf4j.MDCScopeDecorator
6+
import brave.propagation.CurrentTraceContext
7+
import org.springframework.beans.factory.annotation.Qualifier
8+
import org.springframework.context.annotation.Bean
9+
10+
class TracingConfig {
11+
@Bean(name = ["bookId"])
12+
fun bookId(): BaggageField = BaggageField.create("bookId")
13+
14+
@Bean
15+
fun mdcScopeDecorator(@Qualifier("bookId") bookId: BaggageField): CurrentTraceContext.ScopeDecorator {
16+
return MDCScopeDecorator.newBuilder().clear()
17+
.add(CorrelationScopeConfig.SingleCorrelationField.newBuilder(bookId).flushOnUpdate().build())
18+
.build()
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,51 @@
11
package com.omprakash.springbootplayground.controllers
22

3+
import brave.baggage.BaggageField
34
import com.omprakash.springbootplayground.kafka.BookCreatedKafkaProducer
45
import com.omprakash.springbootplayground.models.Book
6+
import com.omprakash.springbootplayground.models.request.CreateBook
57
import com.omprakash.springbootplayground.services.BookService
8+
import io.micrometer.core.instrument.kotlin.asContextElement
9+
import io.micrometer.observation.ObservationRegistry
610
import kotlinx.coroutines.flow.Flow
11+
import kotlinx.coroutines.reactor.mono
12+
import org.slf4j.LoggerFactory
13+
import org.springframework.beans.factory.annotation.Qualifier
714
import org.springframework.web.bind.annotation.GetMapping
15+
import org.springframework.web.bind.annotation.PostMapping
16+
import org.springframework.web.bind.annotation.RequestBody
817
import org.springframework.web.bind.annotation.RequestMapping
918
import org.springframework.web.bind.annotation.RestController
19+
import reactor.core.publisher.Mono
1020

1121
@RestController
1222
@RequestMapping("/books")
13-
class BookController(private val bookService: BookService, private val bookKafkaProducer: BookCreatedKafkaProducer) {
23+
class BookController(
24+
private val bookService: BookService,
25+
private val bookKafkaProducer: BookCreatedKafkaProducer,
26+
@Qualifier("bookId") var bookIdBaggageField: BaggageField,
27+
private val observationRegistry: ObservationRegistry
28+
) {
29+
30+
var logger = LoggerFactory.getLogger(this::class.java)
1431

1532
@GetMapping
1633
fun findAll(): Flow<Book> {
1734
return bookService.findAll()
1835
}
1936

20-
@GetMapping("/publish")
21-
suspend fun publishBook(): String {
22-
return try {
23-
bookKafkaProducer.publishBook(1L, "Test", "1234").toString()
24-
} catch (e: Exception) {
25-
println("Exception while publishing books ${e.message}")
26-
e.message ?: "Exception"
37+
@PostMapping("/publish")
38+
fun publishBook(@RequestBody createBook: CreateBook): Mono<String> {
39+
return mono(observationRegistry.asContextElement()) {
40+
try {
41+
bookIdBaggageField.updateValue(createBook.id.toString())
42+
bookKafkaProducer.publishBook(createBook.id, createBook.title, createBook.isbn).toString()
43+
logger.info("Book published")
44+
} catch (e: Exception) {
45+
println("Exception while publishing books ${e.message}")
46+
e.message ?: "Exception"
47+
}
48+
"Completed"
2749
}
2850
}
2951
}
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package com.omprakash.springbootplayground.kafka
22

33
import org.apache.avro.generic.GenericRecord
4+
import org.slf4j.LoggerFactory
45
import org.springframework.kafka.annotation.KafkaListener
56
import org.springframework.stereotype.Service
67

78
@Service
89
class BookCreatedKafkaConsumer {
910

11+
var logger = LoggerFactory.getLogger(this::class.java)
12+
1013
@KafkaListener(topics = [TOPICS.BOOKS_CREATED], groupId = GROUP_ID.BOOKS_CREATED_CONSUMER, containerFactory = "bookCreatedKafkaListenerContainerFactory")
1114
fun onBookCreated(bookCreated: GenericRecord) {
12-
println("Consumed message $bookCreated")
15+
logger.info("Consumed message $bookCreated")
1316
}
1417
}

src/main/kotlin/com/omprakash/springbootplayground/kafka/ConsumerConfig.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.omprakash.springbootplayground.kafka
22

3+
import brave.Tracing
4+
import brave.kafka.clients.KafkaTracing
35
import io.confluent.kafka.serializers.KafkaAvroDeserializer
46
import org.apache.avro.generic.GenericRecord
57
import org.apache.kafka.clients.consumer.ConsumerConfig.*
@@ -31,9 +33,17 @@ class ConsumerConfig(val kafkaProperties: KafkaProperties) {
3133
}
3234

3335
@Bean
34-
fun bookCreatedKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, GenericRecord> {
36+
fun kafkaTracing(tracing: Tracing?): KafkaTracing? {
37+
return KafkaTracing.create(tracing)
38+
}
39+
40+
@Bean
41+
fun bookCreatedKafkaListenerContainerFactory(kafkaTracing: KafkaTracing): ConcurrentKafkaListenerContainerFactory<String, GenericRecord> {
3542
return ConcurrentKafkaListenerContainerFactory<String, GenericRecord>().apply {
36-
consumerFactory = consumerFactory()
43+
val cf = consumerFactory()
44+
cf.addPostProcessor(kafkaTracing::consumer)
45+
consumerFactory = cf
46+
this.containerProperties.isObservationEnabled = true
3747
}
3848
}
3949
}

src/main/kotlin/com/omprakash/springbootplayground/kafka/Extensions.kt

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import kotlin.coroutines.resumeWithException
1212
suspend inline fun <reified K : Any, reified V : Any> KafkaTemplate<K, V>.sendAsync(record: ProducerRecord<K, V>): SendResult<K, V> {
1313
return suspendCancellableCoroutine { cancellableContinuation ->
1414
val future = this.send(record)
15+
this.setObservationEnabled(true)
1516
future.whenComplete { metadata, exception ->
1617
if (metadata != null) {
1718
cancellableContinuation.resume(metadata)

src/main/kotlin/com/omprakash/springbootplayground/kafka/ProducerConfig.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class ProducerConfig(val kafkaProperties: KafkaProperties) {
2525
}
2626

2727
@Bean
28-
fun booksCreatedKafkaTemplate() = KafkaTemplate(producerFactory())
28+
fun booksCreatedKafkaTemplate(): KafkaTemplate<String, GenericRecord> {
29+
val kafkaTemplate: KafkaTemplate<String, GenericRecord> = KafkaTemplate(producerFactory())
30+
kafkaTemplate.setObservationEnabled(true)
31+
return kafkaTemplate
32+
}
2933

3034
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.omprakash.springbootplayground.models.request
2+
3+
data class CreateBook(val id: Long, val title: String, val isbn: String)

src/main/resources/application.yaml

+20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
11
spring:
2+
application:
3+
name: spring-boot-playground
24
sql:
35
init:
46
schema-locations: classpath:schema.sql
57
mode: always
8+
9+
logging:
10+
config: classpath:logback-spring.xml
11+
pattern:
12+
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"
13+
14+
management:
15+
tracing:
16+
enabled: true
17+
propagation:
18+
type: b3
19+
sampling:
20+
probability: 1.0
21+
baggage:
22+
correlation:
23+
enabled: true
24+
fields: bookId
25+
remote-fields: bookId

src/main/resources/logback-spring.xml

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<configuration>
2+
<springProperty scope="context" name="service" source="spring.application.name"/>
3+
<property scope="context" name="hostname" value="${HOSTNAME}"/>
4+
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
5+
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
6+
<timeZone>UTC</timeZone>
7+
<includeMdcKeyName>traceId</includeMdcKeyName>
8+
<includeMdcKeyName>bookId</includeMdcKeyName>
9+
<includeContext>false</includeContext>
10+
<customFields>{"hostname":"${HOSTNAME}","service":"${service}"}</customFields>
11+
</encoder>
12+
</appender>
13+
<root level="info">
14+
<appender-ref ref="console"/>
15+
</root>
16+
</configuration>

0 commit comments

Comments
 (0)