@@ -34,23 +34,27 @@ tags:
34
34
* 复杂性增加: 一致性,可靠性
35
35
36
36
## 基本设置:
37
- 队列:
38
- 同步队列:
39
- 多次消费:
40
- 如何保证有序性:
41
- 数据持久化:
42
- 属性结构:
43
- connfactory->conn->session->producer/consumer
44
- 数据结构:
45
- 1. topic
46
- 2. queue
47
- 配置结构:
48
- 异步:
49
- 持久化: 生产者需要的关键字: DeliveryMode.PERSISTENT ->当
50
- 确认字:AUTO_ACKNOWLEDGE/CLIENT_ACKNOWLEDGE/DUPS_OK_ACKNOWLEDGE/SESSION_TRANSACTED ->当消息确认后队列才会把消息移出
51
- 消费者一次拉的数据量:consumer.prefetchSize=50
52
- 开启事务:
53
- 和jms规范进行对接:
37
+ ``` markdown
38
+ 属性结构:
39
+ connfactory->conn->session->producer/consumer
40
+
41
+ 配置结构:
42
+ 异步:设置conn
43
+ 持久化: 生产者需要的关键字: DeliveryMode.PERSISTENT ->当
44
+ 确认字:AUTO_ACKNOWLEDGE/CLIENT_ACKNOWLEDGE/DUPS_OK_ACKNOWLEDGE/SESSION_TRANSACTED ->当消息确认后队列才会把消息移出
45
+ 消费者一次拉的数据量:consumer.prefetchSize=50
46
+ 开启事务:
47
+ //Boolean.TRUE 表示开启事务
48
+ Session session =
49
+ connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
50
+
51
+ 和jms规范进行对接:
52
+
53
+ public void sendMessageToAMQ(Destination destination, final String msg) {
54
+ jmsTemplate.send(destination, session -> session.createTextMessage(msg));
55
+ }
56
+
57
+ ```
54
58
55
59
56
60
## 生产者:
@@ -191,23 +195,25 @@ public void sendMessage(ActiveMQMessage msg, final String msgid) throws JMSExcep
191
195
但是维护容易,消息不易丢失,同时采用CLIENT_ACKNOWLEDGE来保证消息完好消费
192
196
#### 代码示例:
193
197
194
- Session session = aliyunAmqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
195
- Queue workQueue = session.createQueue(queueName+"?consumer.prefetchSize=50");
196
- MessageConsumer consumer = session.createConsumer(workQueue);
197
- while(true){
198
- // 停止信号
199
- if(stopFlag == 1){
200
- break;
201
- }
202
- message = consumer.receiveNoWait();
203
- if (message == null) {
204
- //MQ中没有消息的时候,调用会直接返回,返回值为NULL,不能频繁一直调用,所以要睡眠1秒
205
- SystemUtils.sleep(1);
206
- continue;
207
- }
208
- handle(message);
198
+ ``` java
199
+ Session session = aliyunAmqConnection. createSession(false , Session . AUTO_ACKNOWLEDGE );
200
+ Queue workQueue = session. createQueue(queueName+ " ?consumer.prefetchSize=50" );
201
+ MessageConsumer consumer = session. createConsumer(workQueue);
202
+ while (true ){
203
+ // 停止信号
204
+ if (stopFlag == 1 ){
205
+ break ;
209
206
}
210
-
207
+ message = consumer. receiveNoWait();
208
+ if (message == null ) {
209
+ // MQ中没有消息的时候,调用会直接返回,返回值为NULL,不能频繁一直调用,所以要睡眠1秒
210
+ SystemUtils . sleep(1 );
211
+ continue ;
212
+ }
213
+ handle(message);
214
+ }
215
+ ```
216
+
211
217
212
218
### 异步接收:
213
219
1 . 自动确认接受
0 commit comments