diff --git a/whatsmars-mq/rocketmq-spring-boot-starter/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java b/whatsmars-mq/rocketmq-spring-boot-starter/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java index 0752b760..2129cfc7 100644 --- a/whatsmars-mq/rocketmq-spring-boot-starter/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java +++ b/whatsmars-mq/rocketmq-spring-boot-starter/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -121,12 +122,60 @@ public SendResult syncSend(String destination, Object payload) { return syncSend(destination, payload, producer.getSendMsgTimeout()); } + /** + * Same to {@link #syncSend(String, Object)} with send timeout specified in addition. + * + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param timeout send timeout with millis + * @return {@link SendResult} + */ + public SendResult syncSend(String destination, Object payload, long timeout) { + Message message = this.doConvert(payload, null, null); + return syncSend(destination, message, timeout); + } + + /** + * Same to {@link #syncSend(String, Object)} + */ + public SendResult syncSend(String destination, Object payload, String keys) { + return syncSend(destination, payload, producer.getSendMsgTimeout(), keys); + } + + /** + * Same to {@link #syncSend(String, Object, long)} + * keys 用于消息hash索引,方便查询,尽可能唯一 + */ + public SendResult syncSend(String destination, Object payload, long timeout, String keys) { + Message message = this.doConvert(payload, keys); + return syncSend(destination, message, timeout); + } + + /** + * Same to {@link #sendDelayed(String, Object, MessageDelayLevel)} + */ + public SendResult sendDelayed(String destination, Object payload, String keys, MessageDelayLevel delayLevel) { + Message message = this.doConvert(payload, keys); + return sendDelayed(destination, message, producer.getSendMsgTimeout(), delayLevel); + } + + /** + * Same to {@link #sendDelayed(String, Message, MessageDelayLevel)} + */ + public SendResult sendDelayed(String destination, Object payload, MessageDelayLevel delayLevel) { + Message message = this.doConvert(payload, null, null); + return sendDelayed(destination, message, producer.getSendMsgTimeout(), delayLevel); + } + + /** + * Same to {@link #sendDelayed(String, Message, long, MessageDelayLevel)} + */ public SendResult sendDelayed(String destination, Message message, MessageDelayLevel delayLevel) { return sendDelayed(destination, message, producer.getSendMsgTimeout(), delayLevel); } /** - * Same to {@link #syncSend(String, Message)} with send timeout specified in addition. + * Send delayed message. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} @@ -156,19 +205,6 @@ public SendResult sendDelayed(String destination, Message message, long timeo } } - /** - * Same to {@link #syncSend(String, Object)} with send timeout specified in addition. - * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param timeout send timeout with millis - * @return {@link SendResult} - */ - public SendResult syncSend(String destination, Object payload, long timeout) { - Message message = this.doConvert(payload, null, null); - return syncSend(destination, message, timeout); - } - /** * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified. * @@ -209,6 +245,14 @@ public SendResult syncSendOrderly(String destination, Message message, String } } + /** + * Same to {@link #syncSendOrderly(String, Object, String)} + */ + public SendResult syncSendOrderly(String destination, Object payload, String keys, String hashKey) { + Message message = this.doConvert(payload, keys); + return syncSendOrderly(destination, message, hashKey); + } + /** * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified. * @@ -510,6 +554,13 @@ private org.apache.rocketmq.common.message.Message convertToRocketMsg(String des return rocketMsg; } + private Message doConvert(Object payload, String keys) { + Map headers = new HashMap<>(); + headers.put(MessageConst.PROPERTY_KEYS, keys); + Message message = this.doConvert(payload, headers, null); + return message; + } + @Override protected Message doConvert(Object payload, Map headers, MessagePostProcessor postProcessor) { String content; diff --git a/whatsmars-mq/whatsmars-mq-rocketmq/src/main/java/org/hongxi/whatsmars/rocketmq/boot/producer/ProducerApplication.java b/whatsmars-mq/whatsmars-mq-rocketmq/src/main/java/org/hongxi/whatsmars/rocketmq/boot/producer/ProducerApplication.java index a4bb4530..fa296916 100644 --- a/whatsmars-mq/whatsmars-mq-rocketmq/src/main/java/org/hongxi/whatsmars/rocketmq/boot/producer/ProducerApplication.java +++ b/whatsmars-mq/whatsmars-mq-rocketmq/src/main/java/org/hongxi/whatsmars/rocketmq/boot/producer/ProducerApplication.java @@ -33,10 +33,11 @@ public void run(String... args) throws Exception { } } rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build()); + rocketMQTemplate.syncSend("test-topic-1", "Hello, World! I'm from simple message"); rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00"))); - rocketMQTemplate.sendDelayed("test-topic-1", MessageBuilder.withPayload("I'm delayed message").build(), MessageDelayLevel.TIME_1M); + rocketMQTemplate.sendDelayed("test-topic-1", "I'm delayed message", MessageDelayLevel.TIME_1M); rocketMQTemplate.sendOneWay("test-topic-1", MessageBuilder.withPayload("I'm one way message").build()); - rocketMQTemplate.syncSendOrderly("test-topic-4", MessageBuilder.withPayload("I'm order message").build(), "1234"); + rocketMQTemplate.syncSendOrderly("test-topic-4", "I'm order message", "1234"); rocketMQTemplate.asyncSend("test-topic-1", MessageBuilder.withPayload("I'm async message").build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) {