-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathactivemq.txt
1358 lines (964 loc) · 45.2 KB
/
activemq.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
消息中间件是值利用高效可靠的消息传递机制进行平台无
关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型,可以在分布式架构下
扩展进程之间的通信。
消息中间件主要解决的就是分布式系统之间消息传递的问
题,它能够屏蔽各种平台以及协议之间的特性,实现应用
程序之间的协同。举个非常简单的例子,就拿一个电商平
台的注册功能来简单分析下,用户注册这一个服务,不单
做技术人的指路明灯,做职场生涯的精神导师
单只是 insert 一条数据到数据库里面就完事了,还需要发
送激活邮件、发送新人红包或者积分、发送营销短信等一
系列操作。假如说这里面的每一个操作,都需要消耗 1s,
那么整个注册过程就需要耗时 4s 才能响应给用户。
但是我们从注册这个服务可以看到,每一个子操作都是相
对独立的,同时,基于领域划分以后,发送激活邮件、发送
营销短信、赠送积分及红包都属于不同的子域。所以我们
可以对这些子操作进行来实现异步化执行,类似于多线程
并行处理的概念。
如何实现异步化呢?用多线程能实现吗?多线程当然可以
实现,只是,消息的持久化、消息的重发这些条件,多线程
并不能满足。所以需要借助一些开源中间件来解决。而分
布式消息队列就是一个非常好的解决办法,引入分布式消
息队列以后,架构图就变成这样了(下图是异步消息队列
的场景)。通过引入分布式队列,就能够大大提升程序的处
理效率,并且还解决了各个模块之间的耦合问题
➢ 这个是分布式消息队列的第一个解决场景【异步处理】
做技术人的指路明灯,做职场生涯的精神导师
我们再来展开一种场景,通过分布式消息队列来实现流量
整形,比如在电商平台的秒杀场景下,流量会非常大。通
过消息队列的方式可以很好的缓解高流量的问题
➢ 用户提交过来的请求,先写入到消息队列。消息队列是
有长度的,如果消息队列长度超过指定长度,直接抛弃
➢ 秒杀的具体核心处理业务,接收消息队列中消息进行处
理,这里的消息处理能力取决于消费端本身的吞吐量
当然,消息中间件还有更多应用场景,比如在弱一致性事
务模型中,可以采用分布式消息队列的实现最大能力通知
做技术人的指路明灯,做职场生涯的精神导师
方式来实现数据的最终一致性等等
ActiveMQ 是完全基于 JMS 规范实现的一个消息中间件产
品。是 Apache 开源基金会研发的消息中间件。ActiveMQ
主要应用在分布式系统架构中,帮助构建高可用、高性能、
可伸缩的企业级面向消息服务的系统
ActiveMQ 特性
1. 多语言和协议编写客户端
语言:java/C/C++/C#/Ruby/Perl/Python/PHP
应 用 协 议 :
openwire/stomp/REST/ws/notification/XMPP/AMQP
2. 完全支持 jms1.1 和 J2ee1.4 规范
3. 对 spring 的支持,ActiveMQ 可以很容易内嵌到 spring
模块中
什么是MOM
面向消息的中间件,使用消息传送提供者来协调消息传输操作。 MOM需要提供API和管理工具。 客户端调用api。 把消息发送到消息传送提供者指定的目的地
在消息发送之后,客户端会技术执行其他的工作。并且在接收方收到这个消息确认之前。提供者一直保留该消息
=========================
消息同步发送和异步发送
ActiveMQ支持同步、异步两种发送模式将消息发送到broker上。
同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理。这个机
制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能
异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所
以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。
默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。
但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高。所以在发送持久化消
息的时候,尽量去开启事务会话。
除了持久化消息和非持久化消息的同步和异步特性以外,我们还可以通过以下几种方式来设置异步发送
消息的发送原理分析图解
1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?
jms.useAsyncSend=true");
2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3.((ActiveMQConnection)connection).setUseAsyncSend(true);
ProducerWindowSize的含义
producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的
确认,才能继续发送。
代码在:ActiveMQSession的1957行
主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。每次发送消
息之后,都将会导致memoryUsage大小增加(+message.size),当broker返回producerAck时,memoryUsage尺
寸减少(producerAck.size,此size表示先前发送消息的大小)。
可以通过如下2种方式设置:
Ø 在brokerUrl中设置: "tcp://localhost:61616?jms.producerWindowSize=1048576",这种设置将会对所有的
producer生效。
Ø 在destinationUri中设置: "test-queue?producer.windowSize=1048576",此参数只会对使用此Destination实例
的producer失效,将会覆盖brokerUrl中的producerWindowSize值。
注意:此值越大,意味着消耗Client端的内存就越大。
消息发送的源码分析
以producer.send为入口
ActiveMQSession的send方法
public void send(Destination destination, Message message, int deliveryMode, int priority, long
timeToLive, AsyncCallback onComplete) throws JMSException {
checkClosed(); //检查session的状态,如果session以关闭则抛异常
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {//检查destination的类型,如果符合要求,就转变为
ActiveMQDestination
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " +
this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (producerWindow != null) {//如果发送窗口大小不为空,则判断发送窗口的大小决定是否阻塞
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//发送消息到broker的topic
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,
sendTimeout, onComplete);
stats.onMessage();
}
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message
message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete)
throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " +
destination);
}
synchronized (sendMutex) { //互斥锁,如果一个session的多个producer发送消息到这里,会保证消息发送
的有序性
// tell the Broker we are about to start a new transaction
doStartTransaction();//告诉broker开始一个新事务,只有事务型会话中才会开启
TransactionId txid = transactionContext.getTransactionId();//从事务上下文中获取事务id
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode); //在JMS协议头中设置是否持久化标识
long expiration = 0L;//计算消息过期时间
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);//设置消息过期时间
message.setJMSPriority(priority);//设置消息的优先级
message.setJMSRedelivered(false);//设置消息为非重发
// transform to our own message format here
//将不通的消息格式统一转化为ActiveMQMessage
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message,
connection);
msg.setDestination(destination);//设置目的地
//生成并设置消息id
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(),
sequenceNumber));
// Set the message id.
if (msg != message) {//如果消息是经过转化的,则更新原来的消息id和目的地
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
msg.setBrokerPath(null);
msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
msg.onSend();//把消息属性和消息体都设置为只读,防止被修改
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
ActiveMQConnection. doAsyncSendPacket
这个地方问题来了,this.transport是什么东西?在哪里实例化的?按照以前看源码的惯例来看,它肯定不是一个
单纯的对象。
按照以往我们看源码的经验来看,一定是在创建连接的过程中初始化的。所以我们定位到代码
transport的实例化过程
从connection=connectionFactory.createConnection();这行代码作为入口,一直跟踪到
ActiveMQConnectionFactory. createActiveMQConnection这个方法中。代码如下
//如果onComplete没有设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且消息非持久
化或者连接器是异步发送模式
//或者存在事务id的情况下,走异步发送,否则走同步发送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() &&
!connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid !=
null)) {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize(); //异步发送的情况下,需要设置producerWindow的大小
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout); //带超时时间的同步发送
}else {
this.connection.syncSendPacket(msg, onComplete); //带回调的同步发送
}
}
}
}
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
createTransport
调用ActiveMQConnectionFactory.createTransport方法,去创建一个transport对象。
1. 构建一个URI
2. 根据URL去创建一个连接TransportFactory.connect
Ø 默认使用的是tcp的协议
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws
JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);
connection.setUserName(userName);
connection.setPassword(password);
//省略后面的代码
}
protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = brokerURL;
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
}
TransportFactory. findTransportFactory
1. 从TRANSPORT_FACTORYS这个Map集合中,根据scheme去获得一个TransportFactory指定的实例对象
2. 如果Map集合中不存在,则通过TRANSPORT_FACTORY_FINDER去找一个并且构建实例
Ø 这个地方又有点类似于我们之前所学过的SPI的思想吧?他会从META-
INF/services/org/apache/activemq/transport/ 这个路径下,根据URI组装的scheme去找到匹配的class对象并且
实例化,所以根据tcp为key去对应的路径下可以找到TcpTransportFactory
调用TransportFactory.doConnect去构建一个连接
public static TransportFactory findTransportFactory(URI location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
// Try to load if from a META-INF property.
try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]",
e);
}
}
return tf;
}
public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap<String, String>
(URISupport.parseParameters(location));
if( !options.containsKey("wireFormat.host") ) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = createWireFormat(options);
Transport transport = createTransport(location, wf); //创建一个Transport,创建一个socket连
接 -> 终于找到真相了
Transport rc = configure(transport, wf, options);//配置configure,这个里面是对Transport做
链路包装
//remove auto
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
configure
到目前为止,这个transport实际上就是一个调用链了,他的链结构为
ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport()))
每一层包装表示什么意思呢?
ResponseCorrelator 用于实现异步请求。
MutexTransport 实现写锁,表示同一时间只允许发送一个请求
WireFormatNegotiator 实现了客户端连接broker的时候先发送数据解析相关的协议信息,比如解析版本号,是否
使用缓存等
InactivityMonitor 用于实现连接成功成功后的心跳检查机制,客户端每10s发送一次心跳信息。服务端每30s读取
一次心跳信息。
同步发送和异步发送的区别
在ResponseCorrelator的request方法中,需要通过response.getResult去获得broker的反馈,否则会阻塞
持久化消息和非持久化消息的存储原理
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
//组装一个复合的transport,这里会包装两层,一个是IactivityMonitor.另一个是WireFormatNegotiator
transport = compositeConfigure(transport, wf, options);
transport = new MutexTransport(transport); //再做一层包装,MutexTransport
transport = new ResponseCorrelator(transport); //包装ResponseCorrelator
return transport;
}
public Object request(Object command, int timeout) throws IOException {
FutureResponse response = asyncRequest(command, null);
return response.getResult(timeout); // 从future方法阻塞等待返回
}
正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。能够存储的最大消息数据在
${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点
SystemUsage配置设置了一些系统内存和硬盘容量
Ø 从上面的配置我们需要get到一个结论,当非持久化消息堆积到一定程度的时候,也就是内存超过指定的设置阀
值时,ActiveMQ会将内存中的非持久化消息写入到临时文件,以便腾出内存。但是它和持久化消息的区别是,重
启之后,持久化消息会从文件中恢复,非持久化的临时文件会直接删除
消息的持久化策略分析
消息持久性对于可靠消息传递来说是一种比较好的方法,即时发送者和接受者不是同时在线或者消息中心在发送者
发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去。消息持久性的原理很简单,就是在发送消息出去
后,消息中心首先将消息存储在本地文件、内存或者远程数据库,然后把消息发送给接受者,发送成功后再把消息
从存储中删除,失败则继续尝试。接下来我们来了解一下消息在broker上的持久化存储实现方式
持久化存储支持类型
ActiveMQ支持多种不同的持久化方式,主要有以下几种,不过,无论使用哪种持久化方式,消息的存储逻辑都是
一致的。
Ø KahaDB存储(默认存储方式)
Ø JDBC存储
Ø Memory存储
Ø LevelDB存储
<systemUsage>
<systemUsage>
<memoryUsage>
//该子标记设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过ActiveMQ本身设置的最大内存大小。其中的
percentOfJvmHeap属性表示百分比。占用70%的堆内存
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
//该标记设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。该子标记的limit属性必须要进行设置
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
//一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,非持久化消息就会被转储到 temp store区域,虽然
我们说过非持久化消息不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时非持久化消息大量堆积致使内存耗
尽的情况出现,还是会将非持久化消息写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp
store区域的“可用磁盘空间限制”
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
Ø JDBC With ActiveMQ Journal
KahaDB存储
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个
索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到
data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
KahaDB的配置方式
KahaDB的存储原理
在data/kahadb这个目录下,会生成四个文件
Ø db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息
Ø db.redo 用来进行消息恢复
Ø db-*.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较
快的。默认是32M,达到阀值会自动递增
Ø lock文件 锁,表示当前获得kahadb读写权限的broker
JDBC存储
使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。
ACTIVEMQ_MSGS 消息表,queue和topic都存在这个表中
ACTIVEMQ_ACKS 存储持久订阅的信息和最后一个持久订阅接收的消息ID
ACTIVEMQ_LOCKS 锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库
JDBC存储实践
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" />
</persistenceAdapter>
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这
样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false
Mysql持久化Bean配置
添加Jar包依赖
LevelDB存储
LevelDB持久化性能高于KahaDB,虽然目前默认的持久化方式仍然是KahaDB。并且,在ActiveMQ 5.9版本提供
了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。
不过,据ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB
Memory 消息存储
基于内存的消息存储,内存消息存储主要是存储所有的持久化的消息在内存中。persistent=”false”,表示不设置持
久化存储,直接存储到内存中
<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.11.156:3306/activemq?
relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
</bean>
<persistenceAdapter>
<levelDBdirectory="activemq-data"/>
</persistenceAdapter>
<beans>
<broker brokerName="test-broker" persistent="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors> </broker>
</beans>
JDBC Message store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。
ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子,生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况
下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的消息,那么这个时候只需要同步剩余的
10%的消息到DB。
如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
Ø 将原来的标签注释掉
Ø 添加如下标签
Ø 在服务端循环发送消息。可以看到数据是延迟同步到数据库的
消费端消费消息的原理
我们通过上一节课的讲解,知道有两种方法可以接收消息,一种是使用同步阻塞的MessageConsumer#receive方
法。另一种是使用消息监听器MessageListener。这里需要注意的是,在同一个session下,这两者不能同时工
作,也就是说不能针对不同消息采用不同的接收方式。否则会抛出异常。
至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控
消息的发送策略
持久化消息
默认情况下,生产者发送的消息是持久化的。消息发送到broker以后,producer会等待broker对这条消息的处理情况的反馈
可以设置消息发送端发送持久化消息的异步方式
connectionFactory.setUseAsyncSend(true);
回执窗口大小设置
connectionFactory.setProducerWindowSize();
非持久化消息
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENCE);
非持久化消息模式下,默认就是异步发送过程,如果需要对非持久化消息的每次发送的消息都获得broker的回执的话
connectionFactory.setAlwaysSyncSend();
consumer获取消息是pull还是(broker的主动 push)
默认情况下,mq服务器(broker)采用异步方式向客户端主动推送消息(push)。也就是说broker在向某个消费者会话推送消息后,不会
等待消费者响应消息,直到消费者处理完消息以后,主动向broker返回处理结果
prefetchsize
“预取消息数量“
broker端一旦有消息,就主动按照默认设置的规则推送给当前活动的消费者。 每次推送都有一定的数量限制,而这个数量就是prefetchSize
Queue
持久化消息 prefetchSize=1000
非持久化消息 1000
topic
持久化消息 100
非持久化消息 32766
假如prefetchSize=0 . 此时对于consumer来说,就是一个pull模式
关于acknowledge为什么能够在第5次主动执行ack以后,把前面的消息都确认掉
消表示已经被consumer接收但未确认的消息。
消息确认
ACK_TYPE,消费端和broker交换ack指令的时候,还需要告知broker ACK_TYPE。
ACK_TYPE表示确认指令的类型,broker可以根据不同的ACK_TYPE去针对当前消息做不同的应对策略
REDELIVERED_ACK_TYPE (broker会重新发送该消息) 重发侧策略
DELIVERED_ACK_TYPE 消息已经接收,但是尚未处理结束
STANDARD_ACK_TYPE 表示消息处理成功
ActiveMQ结合spring开发
Spring提供了对JMS的支持,需要添加Spring 支持JMS的包
添加jar依赖
配置spring文件
编写发送端代码
配置接收端spring文件
直接copy发送端的文件
编写接收端代码
spring的发布订阅模式配置
以事件通知方式来配置消费者
更改消费端的配置
增加FirstMessageListener监听类
启动spring容器
ActiveMQ支持的传输协议
client端和broker端的通讯协议
TCP、UDP 、NIO、SSL、Http(s)、vm
ActiveMQ持久化存储
1.kahaDB 默认的存储方式
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
2.AMQ 基于文件的存储方式
写入速度很快,容易恢复。
文件默认大小是32M
3.JDBC 基于数据库的存储
ACTIVEMQ_ACKS : 存储持久订阅的信息
ACTIVEMQ_LOCK : 锁表(用来做集群的时候,实现master选举的表)
ACTIVEMQ_MSGS : 消息表
第一步
第二步
第三步
添加jar包依赖
JDBC Message store with activeMQ journal
1.引入了快速缓存机制,缓存到Log文件中
2.性能会比jdbc store要好
3.JDBC Message store with activeMQ journal 不能应用于master/slave模式
4.Memory 基于内存的存储
LevelDB
5.8以后引入的持久化策略。通常用于集群配置
ActiveMQ的网络连接
activeMQ如果要实现扩展性和高可用性的要求的话,就需要用用到网络连接模式
NetworkConnector
主要用来配置broker与broker之间的通信连接
如上图所示,服务器S1和S2通过NewworkConnector相连,则生产者P1发送消息,消费者C3和C4都可以接收到,而生产者P3发送的消息,消费者C1和C2同样也可以接收到
静态网络连接
修改activemq.xml,增加如下内容
两个Brokers通过一个staic的协议来进行网络连接。一个Consumer连接到BrokerB的一个地址上,当Producer在BrokerA上以相同的地址发送消息是,此时消息会被转移到BrokerB上,也就是说BrokerA会转发消息到BrokerB上
丢失的消息
一些consumer连接到broker1、消费broker2上的消息。消息先被broker1从broker2消费掉,然后转发给这些consumers。假设,转发消息的时候broker1重启了,这些consumers发现brokers1连接失败,通过failover连接到broker2.但是因为有一部分没有消费的消息被broker2已经分发到broker1上去了,这些消息就好像消失了。除非有消费者重新连接到broker1上来消费
从5.6版本开始,在destinationPolicy上新增了一个选项replayWhenNoConsumers属性,这个属性可以用来解决当broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当作重复消息而不被分发
通过如下配置,在activeMQ.xml中。 分别在两台服务器都配置。即可完成消息回流处理
动态网络连接
======================
vmware+ Centos 7 jdk8
kafka -> redis -> nginx -> mongoDB
网络连接
静态网络连接
动态网络连接
multicast
networkConnector是一个高性能方案,并不是一个高可用方案
通过zookeeper+activemq实现高可用方案
(master/slave模型)
1.修改activeMQ
2. 启动zookeeper服务器
3. 启动activeMQ
参数的意思
directory: levelDB数据文件存储的位置
replicas:计算公式(replicas/2)+1 , 当replicas的值为2的时候, 最终的结果是2. 表示集群中至少有2台是启动的
bind: 用来负责slave和master的数据同步的端口和ip
zkAddress: 表示zk的服务端地址
hostname:本机ip
jdbc存储的主从方案
基于LOCK锁表的操作来实现master/slave
基于共享文件系统的主从方案
挂载网络磁盘,将数据文件保存到指定磁盘上即可完成master/slave模式
高可用+高性能方案
容错的链接
课后的作业1: ActiveMQ的重发机制?什么情况下会重发消息
课后作业2: 完善注册流程(发邮件)
ActiveMQ监控
只能查看ActiveMQ当前的Queue和Topics等简单信息,不能监控ActiveMQ自身运行的JMX信息等
hawtio
HawtIO 是一个新的可插入式 HTML5 面板,设计用来监控 ActiveMQ, Camel等系统;
ActiveMQ在5.9.0版本曾将hawtio嵌入自身的管理界面,但是由于对hawtio的引入产生了争议,在5.9.1版本中又将其移除,但是开发者可以通过配置,使用hawtio对ActiveMQ进行监控。
本文介绍了通过两种配置方式,使用hawtio对ActiveMQ进行监控。
1.从http://hawt.io/getstarted/index.html 下载hawtio的应用程序
2.下载好后拷贝到ActiveMQ安装目录的webapps目录下,改名为hawtio.war并解压到到hawtio目录下
3.编辑ActiveMQ安装目录下conf/jetty.xml文件,在第75行添加以下代码
<bean class="org.eclipse.jetty.webapp.WebAppContext">
<property name="contextPath" value="/hawtio" />
<property name="war" value="${activemq.home}/webapps/hawtio" />
<property name="logUrlOnStart" value="true" />
</bean>
4.修改bin/env文件
-Dhawtio.realm=activemq -Dhawtio.role=admins
-Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal
需要注意的是-Dhawtio的三个设定必须放在ACTIVEMQ_OPTS设置的最前面(在内存参数设置之后),否则会出现验证无法通过的错误(另外,ACTIVEMQ_OPTS的设置语句不要回车换行)
5.启动activeMQ服务。访问http://ip:8161/hawtio.
==========================================
MOM
面向消息的中间件,使用消息传送提供者协调消息传输操作
需要提供API和管理工具
客户端调用api 把消息发送提供者指定的目的地
消息发送之后 客户端会继续执行其他的工作
并且在接收方收到这个消息确认之前 提供者一直保留该消息
------------------------------------------------------
java message service 服务 面向消息中间件的api
java 平台 面向消息中间件的api 只提供api规范
由其他厂商 实现
用于两个应用程序 发送消息
mom 面向消息中间件的 提供商
active
jboss mq
open jm service
发送消息 异步
使用消息传送提供者
管理工具
客户端 消息中间件
不直接通信 消息传送提供者 协调
api
消息 异步接收的
反馈 接收成功
通过消息传送者 2
最大特点
MOM
-----------------------------------------------------------------
JMS 基本概念盒模型
MOM 什么是
与厂商无关的api
创建 接收
定义的应用程序接口
JMS Provider 接口提供的管理控制 ActiveMQ
JMS message 消息头 属性 体
producer
consumer
domains
connection factory
JMS Connection
JMS Session
Destination
Session
Acknowleage
Transaction
JMS Client
============================================
JMS 规范的通信
user
工程模块 JMS 引进 添加
org.apache.activeMQ
active-all
5.15.0
public static void main(String[] args){
ConnectionFactory connectionFactory = new ActiveMQConnectFactory(brokerURL+"tcp://192.168.11.140:61616");
Connection connection - null;
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,SESSION.AUTO_ACKNOW_REWER);
//创建队列 如果队列存在 不会创建 first-queue是队列名称
Destination destination = seesion.createQueue("first-queue");
//创建消息发送者
MessageProducer producer = session.createProduer(destionation);
TextMessage textMessage = session.createTextMessage("hello,菲菲,我是迈克");
producer.send({textMessage);
session.commit();
session.close();
if(connection != null){
connection.close();
}
}
=================================================
JMSRecervier
ActionMQConnectionFactory
Boolean.TRUE Session.AUTO_ACTION
Destination destination = session.createQueue("first-queue");
MessionConsumer consumer = session.createConsumer(destination);
TextMessage textMessage = (TextMessage)Consumer.receive((;
System.out.println(textMessage.getText());
session.commit();
session.close();
hello feifei
JMS 规范
应用程序的接口 消息传递的功能
====================
消息传递域
点对点 p2p
client1 queue client2
1.每个消息 只能一个消费者
2.消息的生产者和消费者 之间没有时间上的相关性
无论 消费者在生产者发送消息的时候是否处于运行状态 都可以提取消息
被阻塞的消息条数
消费者
多少条消息被消费掉了
多少条消息在队列中
总共发了两条消息
消费完
接收端
====================================
发布订阅 pub/sub
1.每个消息 可以有多个消费者
2.生产者和消费者存在时间相关性
订阅一个主题的消费者 只能消费自它订阅之后发布的消息
但是JMS规范允许提供客户端创建持久订阅
jms provider 提供虚拟的连接
session 生产和消费消息的单线程的上下文
Destination 发送消息的目的地
JMS API
ConnectionFactory
Connection 封装客户端与JMS Provider之间的一个虚拟的连接
Session 生产和消费消息的一个单线程 上下文 用于创建producer consumner message queue
Destination 消息发送或者消息接收的目的地
消息组成
消息头
包含消息的识别信息和路由信息
delete mode priorty messageId
消息体
传递的数据类型
TextMessage
MapMessage
ByteMessage
StreamMessage 输入输出流
ObjectMessage 可序列化对象
属性
textMessage.setStringProperty(Key,value);
text.getStringProperty("key")
===============================
JMS的可靠性机制
JMS消息之后被确认后 才会认为是被成功消费 客户端接收消息 处理 消息被确认
1.事务性 会话
连接是不是一个事务性的会话
消息提交以后 事务自动确认
消息一直存在队列 签收
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);