Skip to content

Commit f6ca5c3

Browse files
feat(sqs): add SNS headers when sending message from springwolf ui (#1324)
- standardizes plugin producer/controller header management
1 parent 5815aaf commit f6ca5c3

File tree

7 files changed

+102
-40
lines changed

7 files changed

+102
-40
lines changed

springwolf-examples/springwolf-sqs-example/src/test/java/io/github/springwolf/examples/sqs/SqsProducerSystemTest.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package io.github.springwolf.examples.sqs;
33

4+
import io.github.springwolf.core.controller.dtos.MessageDto;
45
import io.github.springwolf.examples.sqs.consumers.ExampleConsumer;
56
import io.github.springwolf.examples.sqs.dtos.ExamplePayloadDto;
67
import io.github.springwolf.plugins.sqs.producer.SpringwolfSqsProducer;
@@ -10,9 +11,6 @@
1011
import org.mockito.Captor;
1112
import org.springframework.beans.factory.annotation.Autowired;
1213
import org.springframework.boot.test.context.SpringBootTest;
13-
import org.springframework.messaging.Message;
14-
import org.springframework.messaging.MessageHeaders;
15-
import org.springframework.messaging.support.MessageBuilder;
1614
import org.springframework.test.context.bean.override.mockito.MockitoSpyBean;
1715
import org.testcontainers.containers.DockerComposeContainer;
1816
import org.testcontainers.containers.wait.strategy.Wait;
@@ -21,7 +19,6 @@
2119

2220
import java.io.File;
2321
import java.util.Arrays;
24-
import java.util.List;
2522
import java.util.Map;
2623

2724
import static io.github.springwolf.examples.sqs.dtos.ExamplePayloadDto.ExampleEnum.FOO1;
@@ -69,22 +66,19 @@ void producerCanUseSpringwolfConfigurationToSendMessage() {
6966
payload.setSomeLong(5);
7067
payload.setSomeEnum(FOO1);
7168

72-
MessageHeaders headers =
73-
new MessageHeaders(Map.of("some-header", "some-header-value", "structured-header", List.of(42, 3.14)));
74-
75-
Message<ExamplePayloadDto> message = MessageBuilder.createMessage(payload, headers);
69+
Map<String, MessageDto.HeaderValue> headers = Map.of(
70+
"some-header", new MessageDto.HeaderValue("some-header-value"),
71+
"null-header", new MessageDto.HeaderValue(null));
7672

7773
// when
78-
springwolfSqsProducer.send("example-queue", message);
74+
springwolfSqsProducer.send("example-queue", headers, payload);
7975

8076
// then
8177
verify(exampleConsumer, timeout(10000)).receiveExamplePayload(eq(payload), headersCaptor.capture());
8278

8379
Map<String, Object> capturedHeaders = headersCaptor.getValue();
8480

85-
assertThat(capturedHeaders)
86-
.containsAllEntriesOf(Map.of(
87-
"some-header", "some-header-value",
88-
"structured-header", "[42, 3.14]"));
81+
assertThat(capturedHeaders).containsAllEntriesOf(Map.of("some-header", "some-header-value"));
82+
assertThat(capturedHeaders).doesNotContainKey("null-header");
8983
}
9084
}

springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/springwolf/plugins/sns/controller/SpringwolfSnsController.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import io.github.springwolf.core.controller.dtos.MessageDto;
77
import io.github.springwolf.plugins.sns.producer.SpringwolfSnsProducer;
88
import lombok.extern.slf4j.Slf4j;
9-
import org.springframework.messaging.support.MessageBuilder;
109
import org.springframework.web.bind.annotation.RequestMapping;
1110
import org.springframework.web.bind.annotation.RestController;
1211

@@ -30,6 +29,6 @@ protected boolean isEnabled() {
3029
@Override
3130
protected void publishMessage(String topic, MessageDto message, Object payload) {
3231
log.debug("Publishing to sns topic {}: {}", topic, message);
33-
producer.send(topic, MessageBuilder.withPayload(payload).build());
32+
producer.send(topic, message.getHeaders(), payload);
3433
}
3534
}

springwolf-plugins/springwolf-sns-plugin/src/main/java/io/github/springwolf/plugins/sns/producer/SpringwolfSnsProducer.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22
package io.github.springwolf.plugins.sns.producer;
33

44
import io.awspring.cloud.sns.core.SnsTemplate;
5+
import io.github.springwolf.core.controller.dtos.MessageDto;
56
import lombok.extern.slf4j.Slf4j;
67
import org.springframework.messaging.Message;
8+
import org.springframework.messaging.support.MessageBuilder;
79

10+
import java.util.HashMap;
811
import java.util.List;
12+
import java.util.Map;
913
import java.util.Optional;
1014

1115
@Slf4j
@@ -21,11 +25,24 @@ public boolean isEnabled() {
2125
return template.isPresent();
2226
}
2327

24-
public void send(String channelName, Message<?> payload) {
28+
public void send(String channelName, Map<String, MessageDto.HeaderValue> headers, Object payload) {
2529
if (template.isPresent()) {
26-
template.get().send(channelName, payload);
30+
Message<?> message = MessageBuilder.withPayload(payload)
31+
.copyHeaders(mapHeaders(headers))
32+
.build();
33+
template.get().send(channelName, message);
2734
} else {
2835
log.warn("SNS producer is not configured");
2936
}
3037
}
38+
39+
private static Map<String, String> mapHeaders(Map<String, MessageDto.HeaderValue> headers) {
40+
return headers.entrySet().stream()
41+
.collect(
42+
HashMap::new,
43+
(m, e) -> m.put(
44+
e.getKey(),
45+
e.getValue() == null ? null : e.getValue().stringValue()),
46+
HashMap::putAll);
47+
}
3148
}

springwolf-plugins/springwolf-sns-plugin/src/test/java/io/github/springwolf/plugins/sns/producer/SpringwolfSnsProducerTest.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,59 @@
22
package io.github.springwolf.plugins.sns.producer;
33

44
import io.awspring.cloud.sns.core.SnsTemplate;
5+
import io.github.springwolf.core.controller.dtos.MessageDto;
6+
import org.assertj.core.api.Assertions;
57
import org.junit.jupiter.api.BeforeEach;
68
import org.junit.jupiter.api.Test;
9+
import org.mockito.ArgumentCaptor;
710
import org.springframework.messaging.Message;
8-
import org.springframework.messaging.support.MessageBuilder;
11+
import org.springframework.messaging.MessageHeaders;
912

1013
import java.util.Collections;
1114
import java.util.HashMap;
1215
import java.util.Map;
1316

1417
import static org.mockito.ArgumentMatchers.eq;
15-
import static org.mockito.ArgumentMatchers.same;
1618
import static org.mockito.Mockito.mock;
1719
import static org.mockito.Mockito.verify;
1820

1921
class SpringwolfSnsProducerTest {
20-
private SpringwolfSnsProducer springwolfSqsProducer;
22+
private SpringwolfSnsProducer springwolfSnsProducer;
2123

2224
private SnsTemplate template;
25+
private final ArgumentCaptor<Message<Object>> messageCaptor = ArgumentCaptor.forClass(Message.class);
2326

2427
@BeforeEach
2528
void setUp() {
2629
template = mock(SnsTemplate.class);
2730

28-
springwolfSqsProducer = new SpringwolfSnsProducer(Collections.singletonList(template));
31+
springwolfSnsProducer = new SpringwolfSnsProducer(Collections.singletonList(template));
2932
}
3033

3134
@Test
3235
void send_defaultExchangeAndChannelNameAsRoutingKey() {
3336
Map<String, Object> payload = new HashMap<>();
34-
Message<Map<String, Object>> message =
35-
MessageBuilder.withPayload(payload).build();
36-
springwolfSqsProducer.send("channel-name", message);
37-
38-
verify(template).send(eq("channel-name"), same(message));
37+
Map<String, MessageDto.HeaderValue> headers = new HashMap<>() {
38+
{
39+
put("header1", new MessageDto.HeaderValue("value1"));
40+
put("header2", new MessageDto.HeaderValue("value2"));
41+
put("nullHeader1", new MessageDto.HeaderValue(null));
42+
put("nullHeader2", null);
43+
}
44+
};
45+
springwolfSnsProducer.send("channel-name", headers, payload);
46+
47+
verify(template).send(eq("channel-name"), messageCaptor.capture());
48+
49+
Object producedPayload = messageCaptor.getValue().getPayload();
50+
Assertions.assertThat(producedPayload).isSameAs(payload);
51+
MessageHeaders producedHeaders = messageCaptor.getValue().getHeaders();
52+
Assertions.assertThat(producedHeaders).containsAllEntriesOf(new HashMap<>() {
53+
{
54+
put("header1", "value1");
55+
put("header2", "value2");
56+
}
57+
});
58+
Assertions.assertThat(producedHeaders).doesNotContainKeys("nullHeader1", "nullHeader2");
3959
}
4060
}

springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/springwolf/plugins/sqs/controller/SpringwolfSqsController.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
import io.github.springwolf.core.controller.dtos.MessageDto;
77
import io.github.springwolf.plugins.sqs.producer.SpringwolfSqsProducer;
88
import lombok.extern.slf4j.Slf4j;
9-
import org.springframework.messaging.Message;
10-
import org.springframework.messaging.support.MessageBuilder;
119
import org.springframework.web.bind.annotation.RequestMapping;
1210
import org.springframework.web.bind.annotation.RestController;
1311

@@ -31,9 +29,6 @@ protected boolean isEnabled() {
3129
@Override
3230
protected void publishMessage(String topic, MessageDto messageDto, Object payload) {
3331
log.debug("Publishing to sqs queue {}: {}", topic, messageDto);
34-
Message<Object> message = MessageBuilder.withPayload(payload)
35-
.copyHeaders(messageDto.getHeaders())
36-
.build();
37-
producer.send(topic, message);
32+
producer.send(topic, messageDto.getHeaders(), payload);
3833
}
3934
}

springwolf-plugins/springwolf-sqs-plugin/src/main/java/io/github/springwolf/plugins/sqs/producer/SpringwolfSqsProducer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22
package io.github.springwolf.plugins.sqs.producer;
33

44
import io.awspring.cloud.sqs.operations.SqsTemplate;
5+
import io.github.springwolf.core.controller.dtos.MessageDto;
56
import lombok.extern.slf4j.Slf4j;
67
import org.springframework.messaging.Message;
8+
import org.springframework.messaging.support.MessageBuilder;
79

10+
import java.util.HashMap;
811
import java.util.List;
12+
import java.util.Map;
913
import java.util.Optional;
1014

1115
@Slf4j
@@ -21,11 +25,24 @@ public boolean isEnabled() {
2125
return template.isPresent();
2226
}
2327

24-
public void send(String channelName, Message<?> message) {
28+
public void send(String channelName, Map<String, MessageDto.HeaderValue> headers, Object payload) {
2529
if (template.isPresent()) {
30+
Message<?> message = MessageBuilder.withPayload(payload)
31+
.copyHeaders(mapHeaders(headers))
32+
.build();
2633
template.get().send(channelName, message);
2734
} else {
2835
log.warn("SQS producer is not configured");
2936
}
3037
}
38+
39+
private static Map<String, String> mapHeaders(Map<String, MessageDto.HeaderValue> headers) {
40+
return headers.entrySet().stream()
41+
.collect(
42+
HashMap::new,
43+
(m, e) -> m.put(
44+
e.getKey(),
45+
e.getValue() == null ? null : e.getValue().stringValue()),
46+
HashMap::putAll);
47+
}
3148
}

springwolf-plugins/springwolf-sqs-plugin/src/test/java/io/github/springwolf/plugins/sqs/producer/SpringwolfSqsProducerTest.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,27 @@
22
package io.github.springwolf.plugins.sqs.producer;
33

44
import io.awspring.cloud.sqs.operations.SqsTemplate;
5+
import io.github.springwolf.core.controller.dtos.MessageDto;
6+
import org.assertj.core.api.Assertions;
57
import org.junit.jupiter.api.BeforeEach;
68
import org.junit.jupiter.api.Test;
9+
import org.mockito.ArgumentCaptor;
710
import org.springframework.messaging.Message;
8-
import org.springframework.messaging.support.MessageBuilder;
11+
import org.springframework.messaging.MessageHeaders;
912

1013
import java.util.Collections;
1114
import java.util.HashMap;
1215
import java.util.Map;
1316

1417
import static org.mockito.ArgumentMatchers.eq;
15-
import static org.mockito.ArgumentMatchers.same;
1618
import static org.mockito.Mockito.mock;
1719
import static org.mockito.Mockito.verify;
1820

1921
class SpringwolfSqsProducerTest {
2022
private SpringwolfSqsProducer springwolfSqsProducer;
2123

2224
private SqsTemplate template;
25+
private final ArgumentCaptor<Message<Object>> messageCaptor = ArgumentCaptor.forClass(Message.class);
2326

2427
@BeforeEach
2528
void setUp() {
@@ -31,10 +34,27 @@ void setUp() {
3134
@Test
3235
void send_defaultExchangeAndChannelNameAsRoutingKey() {
3336
Map<String, Object> payload = new HashMap<>();
34-
Message<?> message = MessageBuilder.withPayload(payload).build();
35-
36-
springwolfSqsProducer.send("channel-name", message);
37-
38-
verify(template).send(eq("channel-name"), same(message));
37+
Map<String, MessageDto.HeaderValue> headers = new HashMap<>() {
38+
{
39+
put("header1", new MessageDto.HeaderValue("value1"));
40+
put("header2", new MessageDto.HeaderValue("value2"));
41+
put("nullHeader1", new MessageDto.HeaderValue(null));
42+
put("nullHeader2", null);
43+
}
44+
};
45+
springwolfSqsProducer.send("channel-name", headers, payload);
46+
47+
verify(template).send(eq("channel-name"), messageCaptor.capture());
48+
49+
Object producedPayload = messageCaptor.getValue().getPayload();
50+
Assertions.assertThat(producedPayload).isSameAs(payload);
51+
MessageHeaders producedHeaders = messageCaptor.getValue().getHeaders();
52+
Assertions.assertThat(producedHeaders).containsAllEntriesOf(new HashMap<>() {
53+
{
54+
put("header1", "value1");
55+
put("header2", "value2");
56+
}
57+
});
58+
Assertions.assertThat(producedHeaders).doesNotContainKeys("nullHeader1", "nullHeader2");
3959
}
4060
}

0 commit comments

Comments
 (0)