|
7 | 7 | import org.springframework.beans.factory.annotation.Value;
|
8 | 8 | import org.springframework.kafka.annotation.KafkaListener;
|
9 | 9 | import org.springframework.kafka.core.KafkaTemplate;
|
| 10 | +import org.springframework.messaging.handler.annotation.Payload; |
10 | 11 | import org.springframework.web.bind.annotation.GetMapping;
|
11 | 12 | import org.springframework.web.bind.annotation.RestController;
|
12 | 13 |
|
@@ -49,25 +50,28 @@ public String hello() throws Exception {
|
49 | 50 |
|
50 | 51 | @KafkaListener(topics = "advice-topic", clientIdPrefix = "json",
|
51 | 52 | containerFactory = "kafkaListenerContainerFactory")
|
52 |
| - public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr) { |
53 |
| - logger.info("Logger 1 [JSON] received key {}: Type [{}] | {}", cr.key(), |
54 |
| - typeIdHeader(cr.headers()), cr.toString()); |
| 53 | + public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr, |
| 54 | + @Payload PracticalAdvice payload) { |
| 55 | + logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(), |
| 56 | + typeIdHeader(cr.headers()), payload, cr.toString()); |
55 | 57 | latch.countDown();
|
56 | 58 | }
|
57 | 59 |
|
58 | 60 | @KafkaListener(topics = "advice-topic", clientIdPrefix = "string",
|
59 | 61 | containerFactory = "kafkaListenerStringContainerFactory")
|
60 |
| - public void listenasString(ConsumerRecord<String, String> cr) { |
61 |
| - logger.info("Logger 2 [String] received key {}: Type [{}] | {}", cr.key(), |
62 |
| - typeIdHeader(cr.headers()), cr.toString()); |
| 62 | + public void listenasString(ConsumerRecord<String, String> cr, |
| 63 | + @Payload String payload) { |
| 64 | + logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(), |
| 65 | + typeIdHeader(cr.headers()), payload, cr.toString()); |
63 | 66 | latch.countDown();
|
64 | 67 | }
|
65 | 68 |
|
66 | 69 | @KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
|
67 | 70 | containerFactory = "kafkaListenerByteArrayContainerFactory")
|
68 |
| - public void listenAsByteArray(ConsumerRecord<String, byte[]> cr) { |
69 |
| - logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | {}", cr.key(), |
70 |
| - typeIdHeader(cr.headers()), cr.toString()); |
| 71 | + public void listenAsByteArray(ConsumerRecord<String, byte[]> cr, |
| 72 | + @Payload byte[] payload) { |
| 73 | + logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(), |
| 74 | + typeIdHeader(cr.headers()), payload, cr.toString()); |
71 | 75 | latch.countDown();
|
72 | 76 | }
|
73 | 77 |
|
|
0 commit comments