Skip to content

Commit

Permalink
RocketMQTemplate方法重载(加入keys)
Browse files Browse the repository at this point in the history
  • Loading branch information
javahongxi committed Dec 27, 2018
1 parent 560e33a commit a8b8800
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -121,12 +122,60 @@ public SendResult syncSend(String destination, Object payload) {
return syncSend(destination, payload, producer.getSendMsgTimeout());
}

/**
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
Message<?> message = this.doConvert(payload, null, null);
return syncSend(destination, message, timeout);
}

/**
* Same to {@link #syncSend(String, Object)}
*/
public SendResult syncSend(String destination, Object payload, String keys) {
return syncSend(destination, payload, producer.getSendMsgTimeout(), keys);
}

/**
* Same to {@link #syncSend(String, Object, long)}
* keys 用于消息hash索引,方便查询,尽可能唯一
*/
public SendResult syncSend(String destination, Object payload, long timeout, String keys) {
Message<?> message = this.doConvert(payload, keys);
return syncSend(destination, message, timeout);
}

/**
* Same to {@link #sendDelayed(String, Object, MessageDelayLevel)}
*/
public SendResult sendDelayed(String destination, Object payload, String keys, MessageDelayLevel delayLevel) {
Message<?> message = this.doConvert(payload, keys);
return sendDelayed(destination, message, producer.getSendMsgTimeout(), delayLevel);
}

/**
* Same to {@link #sendDelayed(String, Message, MessageDelayLevel)}
*/
public SendResult sendDelayed(String destination, Object payload, MessageDelayLevel delayLevel) {
Message<?> message = this.doConvert(payload, null, null);
return sendDelayed(destination, message, producer.getSendMsgTimeout(), delayLevel);
}

/**
* Same to {@link #sendDelayed(String, Message, long, MessageDelayLevel)}
*/
public SendResult sendDelayed(String destination, Message<?> message, MessageDelayLevel delayLevel) {
return sendDelayed(destination, message, producer.getSendMsgTimeout(), delayLevel);
}

/**
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
* Send delayed message.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
Expand Down Expand Up @@ -156,19 +205,6 @@ public SendResult sendDelayed(String destination, Message<?> message, long timeo
}
}

/**
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
Message<?> message = this.doConvert(payload, null, null);
return syncSend(destination, message, timeout);
}

/**
* Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
*
Expand Down Expand Up @@ -209,6 +245,14 @@ public SendResult syncSendOrderly(String destination, Message<?> message, String
}
}

/**
* Same to {@link #syncSendOrderly(String, Object, String)}
*/
public SendResult syncSendOrderly(String destination, Object payload, String keys, String hashKey) {
Message<?> message = this.doConvert(payload, keys);
return syncSendOrderly(destination, message, hashKey);
}

/**
* Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
*
Expand Down Expand Up @@ -510,6 +554,13 @@ private org.apache.rocketmq.common.message.Message convertToRocketMsg(String des
return rocketMsg;
}

private Message<?> doConvert(Object payload, String keys) {
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, keys);
Message<?> message = this.doConvert(payload, headers, null);
return message;
}

@Override
protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
String content;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ public void run(String... args) throws Exception {
}
}
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
rocketMQTemplate.syncSend("test-topic-1", "Hello, World! I'm from simple message");
rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
rocketMQTemplate.sendDelayed("test-topic-1", MessageBuilder.withPayload("I'm delayed message").build(), MessageDelayLevel.TIME_1M);
rocketMQTemplate.sendDelayed("test-topic-1", "I'm delayed message", MessageDelayLevel.TIME_1M);
rocketMQTemplate.sendOneWay("test-topic-1", MessageBuilder.withPayload("I'm one way message").build());
rocketMQTemplate.syncSendOrderly("test-topic-4", MessageBuilder.withPayload("I'm order message").build(), "1234");
rocketMQTemplate.syncSendOrderly("test-topic-4", "I'm order message", "1234");
rocketMQTemplate.asyncSend("test-topic-1", MessageBuilder.withPayload("I'm async message").build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
Expand Down

0 comments on commit a8b8800

Please sign in to comment.