Skip to content

Commit 3493d60

Browse files
add keycloak and keycloak kafka controller
1 parent 7afc203 commit 3493d60

File tree

14 files changed

+310
-93
lines changed

14 files changed

+310
-93
lines changed

docker/kafka/docker-compose.yml

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
version: "3"
22
services:
3+
### KAFKA
34
dev-kafka-zookeeper:
45
container_name: dev-kafka-zookeeper
56
hostname: dev-kafka-zookeeper
@@ -25,7 +26,45 @@ services:
2526
volumes:
2627
- kafka-broker-data:/kafka
2728

29+
### KEYCLOAK
30+
keycloak:
31+
image: keycloak-kafka:0.1.1
32+
ports:
33+
- "8090:8080"
34+
depends_on:
35+
- postgres-keycloak
36+
- dev-kafka-broker
37+
environment:
38+
KEYCLOAK_ADMIN: admin
39+
KEYCLOAK_ADMIN_PASSWORD: admin
40+
KAFKA_TOPIC: keycloak-events
41+
KAFKA_CLIENT_ID: keycloak
42+
KAFKA_BOOTSTRAP_SERVERS: dev-kafka-broker:9092
43+
DB_ADDR: postgres-keycloak
44+
DB_PORT: 5433
45+
DB_VENDOR: POSTGRES
46+
DB_DATABASE: keycloak
47+
DB_USER: keycloak
48+
DB_SCHEMA: public
49+
DB_PASSWORD: keycloak
50+
PROXY_ADDRESS_FORWARDING: "true"
51+
KAFKA_EVENTS: "REGISTER,LOGIN,LOGOUT"
52+
KAFKA_ADMIN_TOPIC: keycloak-admin-events
53+
command: start-dev
54+
55+
postgres-keycloak:
56+
image: postgres:latest
57+
volumes:
58+
- postgres-data:/var/lib/postgresql/data
59+
environment:
60+
POSTGRES_DB: keycloak
61+
POSTGRES_USER: keycloak
62+
POSTGRES_PASSWORD: keycloak
63+
ports:
64+
- "5433:5432"
65+
2866
volumes:
67+
### KAFKA
2968
kafka-zookeeper-data:
3069
driver: local
3170
driver_opts:
@@ -38,4 +77,12 @@ volumes:
3877
driver_opts:
3978
o: bind
4079
type: none
41-
device: ./volumes/kafka-broker
80+
device: ./volumes/kafka-broker
81+
82+
### KEYCLOAK
83+
postgres-data:
84+
driver: local
85+
driver_opts:
86+
o: bind
87+
type: none
88+
device: ./volumes/postgres

http/coroutine.http

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ GET http://localhost:8080/coroutine/users?page=0&size=10&sort=id&direction=ASC
3030
POST http://localhost:8080/coroutine/users
3131
Content-Type: application/json
3232

33-
{"name": "user test coroutine", "email": "4445test[email protected]", "role": []}
33+
{"name": "user test coroutine", "email": "test[email protected]", "role": []}
3434

3535
###
3636
PUT http://localhost:8080/coroutine/users/63c6a6c0686f32099dfebd88
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"id": "d567e184-963e-4836-80d5-fedcb07870e5",
3+
"time": 1674955961622,
4+
"type": "LOGIN",
5+
"realmId": "49b390fd-9e31-4931-a55f-34ff3e0776bb",
6+
"clientId": "frontend",
7+
"userId": "cd85a973-3f13-4219-827b-0c31044e241f",
8+
"sessionId": "c71a9003-5165-4370-b98e-4d295297ecc0",
9+
"ipAddress": "172.29.0.1",
10+
"error": null,
11+
"details": {
12+
"auth_method": "openid-connect",
13+
"auth_type": "code",
14+
"redirect_uri": "http://localhost:3000/login",
15+
"consent": "no_consent_required",
16+
"code_id": "c71a9003-5165-4370-b98e-4d295297ecc0",
17+
"username": "test"
18+
}
19+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"id": "2c908efd-858a-47e5-b899-e8e68559ab72",
3+
"time": 1674955930173,
4+
"type": "LOGOUT",
5+
"realmId": "49b390fd-9e31-4931-a55f-34ff3e0776bb",
6+
"clientId": null,
7+
"userId": "8693efc7-736d-45c2-ab93-759ed2dd27c0",
8+
"sessionId": "2cce54f3-28d5-4727-ac65-ab8f7c70a8c3",
9+
"ipAddress": "172.29.0.1",
10+
"error": null,
11+
"details": {
12+
"redirect_uri": "http://localhost:3000/"
13+
}
14+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"id": "4ae0d28e-a117-473a-b9ac-83b285874098",
3+
"time": 1674955961567,
4+
"type": "REGISTER",
5+
"realmId": "49b390fd-9e31-4931-a55f-34ff3e0776bb",
6+
"clientId": "frontend",
7+
"userId": "cd85a973-3f13-4219-827b-0c31044e241f",
8+
"sessionId": null,
9+
"ipAddress": "172.29.0.1",
10+
"error": null,
11+
"details": {
12+
"auth_method": "openid-connect",
13+
"auth_type": "code",
14+
"register_method": "form",
15+
"last_name": "test",
16+
"redirect_uri": "http://localhost:3000/login",
17+
"first_name": "test",
18+
"code_id": "c71a9003-5165-4370-b98e-4d295297ecc0",
19+
"email": "[email protected]",
20+
"username": "test"
21+
}
22+
}

src/main/kotlin/com/softeno/template/event/AppEvent.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.softeno.template.event
22

33
import com.softeno.template.kafka.KafkaMessage
4-
import com.softeno.template.kafka.ReactiveKafkaProducerService
4+
import com.softeno.template.kafka.ReactiveKafkaSampleProducer
55
import com.softeno.template.websocket.Message
66
import com.softeno.template.websocket.ReactiveMessageService
77
import org.apache.commons.logging.LogFactory
@@ -14,14 +14,14 @@ data class AppEvent(val source: String) : ApplicationEvent(source)
1414
@Component
1515
class SampleApplicationEventPublisher(
1616
private val reactiveMessageService: ReactiveMessageService,
17-
private val reactiveKafkaProducerService: ReactiveKafkaProducerService
17+
private val reactiveKafkaProducer: ReactiveKafkaSampleProducer
1818
) : ApplicationListener<AppEvent> {
1919
private val log = LogFactory.getLog(javaClass)
2020

2121
override fun onApplicationEvent(event: AppEvent) {
2222
log.info("[event handler]: Received event: $event")
2323
reactiveMessageService.broadcast(event.toMessage())
24-
reactiveKafkaProducerService.send(event.toKafkaMessage())
24+
reactiveKafkaProducer.send(event.toKafkaMessage())
2525
}
2626

2727
}

src/main/kotlin/com/softeno/template/kafka/Kafka.kt

Lines changed: 0 additions & 87 deletions
This file was deleted.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.softeno.template.kafka
2+
3+
import com.fasterxml.jackson.databind.JsonNode
4+
import org.springframework.beans.factory.annotation.Qualifier
5+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
6+
import org.springframework.boot.context.properties.ConfigurationProperties
7+
import org.springframework.boot.context.properties.ConstructorBinding
8+
import org.springframework.context.annotation.Bean
9+
import org.springframework.context.annotation.Configuration
10+
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
11+
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
12+
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter
13+
import org.springframework.kafka.support.converter.JsonMessageConverter
14+
import reactor.kafka.receiver.ReceiverOptions
15+
import reactor.kafka.sender.SenderOptions
16+
import java.util.*
17+
18+
19+
@ConfigurationProperties(prefix = "com.softeno.kafka")
20+
@ConstructorBinding
21+
data class KafkaApplicationProperties(val tx: String, val rx: String, val keycloak: String)
22+
23+
@Configuration
24+
class JsonMessageConverterConfig {
25+
@Bean
26+
fun jsonMessageConverter(): JsonMessageConverter {
27+
return ByteArrayJsonMessageConverter()
28+
}
29+
}
30+
31+
@Configuration
32+
class ReactiveKafkaSampleConsumerConfig {
33+
@Bean(value = ["kafkaSampleOptions"])
34+
fun kafkaReceiverOptions(kafkaProperties: KafkaProperties, props: KafkaApplicationProperties): ReceiverOptions<String, JsonNode> {
35+
val basicReceiverOptions: ReceiverOptions<String, JsonNode> = ReceiverOptions.create(kafkaProperties.buildConsumerProperties())
36+
return basicReceiverOptions.subscription(Collections.singletonList(props.rx))
37+
}
38+
39+
@Bean(value = ["kafkaSampleConsumerTemplate"])
40+
fun reactiveKafkaConsumerTemplate(@Qualifier(value = "kafkaSampleOptions") kafkaReceiverOptions: ReceiverOptions<String, JsonNode>): ReactiveKafkaConsumerTemplate<String, JsonNode> {
41+
return ReactiveKafkaConsumerTemplate(kafkaReceiverOptions)
42+
}
43+
}
44+
45+
@Configuration
46+
class ReactiveKafkaSampleProducerConfig {
47+
@Bean(value = ["kafkaSampleProducerTemplate"])
48+
fun reactiveKafkaProducerTemplate(properties: KafkaProperties): ReactiveKafkaProducerTemplate<String, KafkaMessage> {
49+
val props = properties.buildProducerProperties()
50+
return ReactiveKafkaProducerTemplate<String, KafkaMessage>(SenderOptions.create(props))
51+
}
52+
}
53+
54+
@Configuration
55+
class ReactiveKafkaKeycloakConsumerConfig {
56+
@Bean(value = ["kafkaKeycloakOptions"])
57+
fun kafkaReceiverOptions(kafkaProperties: KafkaProperties, props: KafkaApplicationProperties): ReceiverOptions<String, JsonNode> {
58+
val basicReceiverOptions: ReceiverOptions<String, JsonNode> = ReceiverOptions.create(kafkaProperties.buildConsumerProperties())
59+
return basicReceiverOptions.subscription(Collections.singletonList(props.keycloak))
60+
}
61+
62+
@Bean(value = ["kafkaKeycloakConsumerTemplate"])
63+
fun reactiveKafkaConsumerTemplate(@Qualifier(value = "kafkaKeycloakOptions") kafkaReceiverOptions: ReceiverOptions<String, JsonNode>): ReactiveKafkaConsumerTemplate<String, JsonNode> {
64+
return ReactiveKafkaConsumerTemplate(kafkaReceiverOptions)
65+
}
66+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.softeno.template.kafka
2+
3+
import com.fasterxml.jackson.databind.JsonNode
4+
import com.fasterxml.jackson.databind.ObjectMapper
5+
import org.apache.commons.logging.LogFactory
6+
import org.apache.kafka.clients.consumer.ConsumerRecord
7+
import org.springframework.beans.factory.annotation.Qualifier
8+
import org.springframework.boot.CommandLineRunner
9+
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
10+
import org.springframework.stereotype.Service
11+
import reactor.core.publisher.Flux
12+
13+
@Service
14+
class ReactiveKafkaKeycloakController(
15+
@Qualifier(value = "kafkaKeycloakConsumerTemplate") private val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, JsonNode>,
16+
private val objectMapper: ObjectMapper
17+
): CommandLineRunner {
18+
private val log = LogFactory.getLog(javaClass)
19+
20+
private fun consumeKafkaMessage(): Flux<JsonNode> {
21+
return reactiveKafkaConsumerTemplate
22+
.receiveAutoAck()
23+
.doOnNext { consumerRecord: ConsumerRecord<String, JsonNode> ->
24+
log.debug("[kafka] rx keycloak: ConsumerRecord: key=${consumerRecord.key()}, value=${consumerRecord.value()} from topic=${consumerRecord.topic()}, offset=${consumerRecord.offset()}")
25+
}
26+
.map { obj: ConsumerRecord<String, JsonNode> -> obj.value() }
27+
.doOnNext { message: JsonNode ->
28+
val dto: KeycloakUserEvent = objectMapper.readValue(message.toString(), KeycloakUserEvent::class.java)
29+
log.info("[kafka] rx keycloak: $dto")
30+
}
31+
.doOnError { throwable: Throwable ->
32+
log.error("[kafka] keycloak: ${throwable.message}")
33+
}
34+
}
35+
36+
override fun run(vararg args: String) {
37+
log.info("[kafka]: keycloak consumer starts")
38+
consumeKafkaMessage().subscribe()
39+
}
40+
}

0 commit comments

Comments
 (0)