-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcanal.txt
935 lines (516 loc) · 51.2 KB
/
canal.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。
不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,
不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,
由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,
当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
---------------------------------------------
基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
关键词: mysql binlog parser / real-time / queue&topic
mysql主备复制实现
从上层来看,复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理:
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
重要版本更新说明
canal 1.1.x系列,参考release文档:版本发布信息
整体性能测试&优化,提升了150%. #726 参考: 【Performance】
原生支持prometheus监控 #765 【Prometheus QuickStart】
原生支持kafka消息投递 #695 【Canal Kafka/RocketMQ QuickStart】
原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: 【Aliyun RDS QuickStart】
原生支持docker镜像 #801 参考: 【Docker QuickStart】
See the wiki page for : wiki文档
wiki文档列表
Home
Introduction
QuickStart
Docker QuickStart
Canal Kafka/RocketMQ QuickStart
Aliyun RDS QuickStart
Prometheus QuickStart
AdminGuide
ClientExample
ClientAPI
Performance
DevGuide
BinlogChange(Mysql5.6)
BinlogChange(MariaDB)
TableMetaTSDB
ReleaseNotes
Download
FAQ
多语言业务
canal整体交互协议设计上使用了protobuf3.0,理论上可以支持绝大部分的多语言场景,欢迎大家提交多客户端的PR
canal java客户端: https://github.com/alibaba/canal/wiki/ClientExample
canal c#客户端开源项目地址: https://github.com/dotnetcore/CanalSharp
canal go客户端开源项目地址: https://github.com/CanalClient/canal-go
canal作为MySQL binlog的增量获取工具,可以将数据投递到MQ系统中,比如Kafka/RocketMQ,可以借助于MQ的多语言能力
参考文档: Canal Kafka/RocketMQ QuickStart
相关资料
ADC阿里技术嘉年华分享ppt (放在google docs上,可能需要翻墙): ppt下载
与阿里巴巴的RocketMQ配合使用
相关开源
阿里巴巴分布式数据库同步系统(解决中美异地机房):http://github.com/alibaba/otter
阿里巴巴去Oracle数据迁移同步工具(目标支持MySQL/DRDS):http://github.com/alibaba/yugong
相关产品
阿里云分布式数据库DRDS
阿里云数据传输服务DTS
阿里云数据库备份服务DBS
阿里云数据管理服务DMS
问题反馈
qq交流群: 161559791
邮件交流: [email protected]
新浪微博: agapple0002
报告issue:issues
最新更新
canal发布重大版本更新1.1.0,具体releaseNode参考:https://github.com/alibaba/canal/releases/tag/canal-1.1.0
canal c#客户端开源项目地址: https://github.com/dotnetcore/CanalSharp ,推荐!
canal QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。
canal消费端项目开源: Otter(分布式数据库同步系统),地址:https://github.com/alibaba/otter
Canal已在阿里云推出商业化版本 数据传输服务DTS, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的订阅高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。DTS产品使用文档
DTS支持阿里云RDS&DRDS的Binlog日志实时订阅,现推出首月免费体验,限时限量,立即体验>>>
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
二、各模块架构
2.1 Parser
Connection获取上一次解析成功的位置(如果第一次启动,则获取初始制定的位置或者是当前数据库的binlog位点)
Connection建立连接,发生BINLOG_DUMP命令
Mysql开始推送Binary Log
接收到的Binary Log通过Binlog parser进行协议解析,补充一些特定信息
传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
存储成功后,定时记录Binary Log位置
2.2 Sink
数据过滤:支持通配符的过滤模式,表名,字段内容等
数据路由/分发:解决1:n (1个parser对应多个store的模式)
数据归并:解决n:1 (多个parser对应1个store)
数据加工:在进入store之前进行额外的处理,比如join
1 数据1:n业务 :
为了合理的利用数据库资源, 一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。 所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注。
2 数据n:1业务:
同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。 所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并.
2.3 Store
目前实现了Memory内存、本地file存储以及持久化到zookeeper以保障数据集群共享。
Memory内存的RingBuffer设计:
定义了3个cursor
Put : Sink模块进行数据存储的最后一次写入位置
Get : 数据订阅获取的最后一次提取位置
Ack : 数据消费成功的最后一次消费位置
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
实现说明:
Put/Get/Ack cursor用于递增,采用long型存储
buffer的get操作,通过取余或者与操作。(与操作: cusor & (size – 1) , size需要为2的指数,效率比较高)
---------------------
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
起源:早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
工作原理
mysql主备复制实现:
从上层来看,复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
架构设计
个人理解,数据增量订阅与消费应当有如下几个点:
增量订阅和消费模块应当包括binlog日志抓取,binlog日志解析,事件分发过滤(EventSink),存储(EventStore)等主要模块。
如果需要确保HA可以采用Zookeeper保存各个子模块的状态,让整个增量订阅和消费模块实现无状态化,当然作为consumer(客户端)的状态也可以保存在zk之中。
整体上通过一个Manager System进行集中管理,分配资源。
可以参考下图:
canal架构设计
说明:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
EventParser
整个parser过程大致可分为几部:
Connection获取上一次解析成功的位置(如果第一次启动,则获取初始制定的位置或者是当前数据库的binlog位点)
Connection建立连接,发生BINLOG_DUMP命令
Mysql开始推送Binary Log
接收到的Binary Log通过Binlog parser进行协议解析,补充一些特定信息
传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
存储成功后,定时记录Binary Log位置
EventSink设计
说明:
数据过滤:支持通配符的过滤模式,表名,字段内容等
数据路由/分发:解决1:n (1个parser对应多个store的模式)
数据归并:解决n:1 (多个parser对应1个store)
数据加工:在进入store之前进行额外的处理,比如join
1 数据1:n业务 :
为了合理的利用数据库资源, 一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。 所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注。
2 数据n:1业务:
同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。 所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并.
EventStore设计
目前实现了Memory内存、本地file存储以及持久化到zookeeper以保障数据集群共享。
Memory内存的RingBuffer设计:
定义了3个cursor
Put : Sink模块进行数据存储的最后一次写入位置
Get : 数据订阅获取的最后一次提取位置
Ack : 数据消费成功的最后一次消费位置
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
实现说明:
Put/Get/Ack cursor用于递增,采用long型存储
buffer的get操作,通过取余或者与操作。(与操作: cusor & (size – 1) , size需要为2的指数,效率比较高)
Instance设计
instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。
抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:
1. manager方式: 和你自己的内部web console/manager系统进行对接。(alibaba内部使用方式)
2. spring方式:基于spring xml + properties进行定义,构建spring配置.
spring/memory-instance.xml 所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析。特点:速度最快,依赖最少
spring/file-instance.xml 所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.支持单机持久化
spring/default-instance.xml 所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享. 支持HA
spring/group-instance.xml 主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.
Server设计
server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现:
Embeded : 对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
Netty : 基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。
增量订阅/消费设计
具体的协议格式,可参见:CanalProtocol.proto
get/ack/rollback协议介绍:
Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
a. batch id 唯一标识
b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto
void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
流式api设计的好处:
get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
流式api设计:
每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取
数据格式
canal采用protobuff:
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [发生的变更]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为文本]
canal-message example:
比如数据库中的表:
mysql> select * from person;
+----+------+------+------+
| id | name | age | sex |
+----+------+------+------+
| 1 | zzh | 10 | m |
| 3 | zzh3 | 12 | f |
| 4 | zzh4 | 5 | m |
+----+------+------+------+
3 rows in set (0.00 sec)
更新一条数据(update person set age=15 where id=4):
****************************************************
* Batch Id: [2] ,count : [3] , memsize : [165] , Time : 2016-09-07 15:54:18
* Start : [mysql-bin.000003:6354:1473234846000(2016-09-07 15:54:06)]
* End : [mysql-bin.000003:6550:1473234846000(2016-09-07 15:54:06)]
****************************************************
================> binlog[mysql-bin.000003:6354] , executeTime : 1473234846000 , delay : 12225ms
BEGIN ----> Thread id: 67
----------------> binlog[mysql-bin.000003:6486] , name[canal_test,person] , eventType : UPDATE , executeTime : 1473234846000 , delay : 12225ms
id : 4 type=int(11)
name : zzh4 type=varchar(100)
age : 15 type=int(11) update=true
sex : m type=char(1)
----------------
END ----> transaction id: 308
================> binlog[mysql-bin.000003:6550] , executeTime : 1473234846000 , delay : 12240ms
HA机制设计
canal的HA分为两部分,canal server和canal client分别有对应的ha实现:
canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),可以看下我之前zookeeper的相关文章。
Canal Server:
大致步骤:
canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.
Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制.
HA配置架构图(举例)如下所示:
canal其他链接方式
canal还有几种连接方式:
1. 单连
2. 两个client+两个instance+1个mysql
当mysql变动时,两个client都能获取到变动
3. 一个server+两个instance+两个mysql+两个client
4. instance的standby配置
整体架构
从整体架构上来说canal是这种架构的(canal中没有包含一个运维的console web来对接,但要运用于分布式环境中肯定需要一个Manager来管理):
一个总体的manager system对应于n个Canal Server(物理上来说是一台服务器), 那么一个Canal Server对应于n个Canal Instance(destinations). 大体上是三层结构,第二层也需要Manager统筹运维管理。
那么随着Docker技术的兴起,是否可以试一下下面的架构呢?
一个docker中跑一个instance服务,相当于略去server这一层的概念。
Manager System中配置一个instance,直接调取一个docker发布这个instance,其中包括向这个instance发送配置信息,启动instance服务.
instance在运行过程中,定时刷新binlog filename+ binlog position的信息至zk。
如果一个instance出现故障,instance本身报错或者zk感知此node消失,则根据相应的信息,比如上一步保存的binlog filename+binlog position重新开启一个docker服务,当然这里可以适当的加一些重试机制。
当要更新时,类似AB test, 先关闭一个docker,然后开启新的已更新的替换,循序渐进的进行。
当涉及到分表分库时,多个物理表对应于一个逻辑表,可以将结果存于一个公共的模块(比如MQ),或者单独存取也可以,具体情况具体分析
存储可以参考canal的多样化:内存,文件,zk,或者加入至MQ中
docker由此之外的工具管理,比如kubernetes
也可以进一步添加HA的功能,两个docker对应一个mysql,互为主备,类似Canal的HA架构。如果时效性不是贴别强的场景,考虑到成本,此功能可以不采用。
总结
这里总结了一下Canal的一些点,仅供参考:
原理:模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;mysql master收到dump请求,开始推送binary log给slave(也就是canal);解析binary log对象(原始为byte流)
重复消费问题:在消费端解决。
采用开源的open-replicator来解析binlog
canal需要维护EventStore,可以存取在Memory, File, zk
canal需要维护客户端的状态,同一时刻一个instance只能有一个消费端消费
数据传输格式:protobuff
支持binlog format 类型:statement, row, mixed. 多次附加功能只能在row下使用,比如otter
binlog position可以支持保存在内存,文件,zk中
instance启动方式:rpc/http; 内嵌
有ACK机制
无告警,无监控,这两个功能都需要对接外部系统
方便快速部署。
本文主要描述Alibaba Canal中间件,官方文档请参考:
1)gitlab:https://github.com/alibaba/canal
2)主要原理介绍:https://github.com/alibaba/canal/wiki/canal%E4%BB%8B%E7%BB%8D
2)运维操作文档:https://github.com/alibaba/canal/wiki/AdminGuide
下文的介绍,基于大家对上述文档的基本了解!
1)Canal版本为:1.0.24
2)通过Canal同步数据库数据变更事件,并由下游的消费者消费,将数据转存到ES或者跨机房的DB中。
一、设计目标
1、监控canal组件以及客户端消费者
2、通过平台,能够实时查看监控数据。canal问题的定位应该快速,且运行状态数据可见。
3、按需提供报警策略。
4、平台支持添加canal集群的监控。
5、canal组件的部署和使用遵守约定,canal的实施应该快速。
我们希望构建一个canal服务:根据用户需求,能够快速构建canal集群,包括环境隔离;此外canal组件、上游的MySQL、下游的consumer等数据链路的整体状态都在监控之中,且数据可见。我们希望任何利益相关者,都可以参与到数据决策中,并按需提供报警、预警机制。
二、基于Canal架构设计
1、整体架构
1)、每个Canal 集群应该至少有2个Canal实例,软硬件配置应该对等。我们不应该在同一个Cluster的多个节点上,配置有任何差异。
2)、一个Canal可以多个“instances”,每个instance对应一个“MySQL实例”的一个database(专业来说,一个instance对应一个MySQL实例,支持其上的多个databases);简单而言,我们认为一个instance相当于一个逻辑Slave。
3)、由2、可以得出,每个Canal Instance的全局处理的数据总量与一个正常的MySQL Slave相同,如果保持同等SLA,从Canal instance角度考虑,它的硬件能力应该与MySQL Slave保持相同。(同为单线程处理)。
4)、原则上,每个Canal可以支持“数十个instance”,但是instance的个数最终会影响instance同步数据的效能。我们建议,一个Canal尽量保持一个instance;除非Slave数据变更极小,我们才会考虑合并instances,以提高Canal组件的利用效率。
5)、每个instance,一个单独的处理线程,用于负责“binlog dump”、“解析”、“入队和存储”。
6)、Canal集群模式,必须依赖Zookeeper,但是对Zookeeper的数据交互并不频繁。
7)、Canal集群运行态,为“M-S”模式。但是“M-S”的粒度为“instance”级别。如果当前Canal的instance,与MySQL建立连接并进行binlog解析时,发生一定次数的“网络异常”等,将会判定为当前instance失效,并stop(备注:此时会删除注册在ZK的相关临时节点)。同时,集群中的每个Canal都会注册所有“destination”(每个destination将有一个instance服务)的状态变更事件,如果“临时节点”被删除(或者不存在),则会出发抢占,抢占成功,则成为此instance的Master。
(源码:CanalController.initGlobalConfig(),
ServerRunningMonitor.start(),
HeartBeatHAController.onFailed()
)
8)、根据7、,我们得知,如果Canal组件中有多个instances,有可能这些instances的Master会分布在不同的Canal节点上。
9)、在运维层面,我们基于“default-instance.xml”配置,基于“spring”模式;每个instance的配置,放置在各自的文件夹下。(${canal.root}/conf/${destination}/instance.properties)
10)、每个Canal节点,在启动时会初始化一个“嵌入式server”(NettyServer),此server主要目的是向Consumer提供服务。server的“ip:port”信息会注册在ZK中,此后Consumer通过ZK来感知。
(源码:
ServerRunningMonitor.initRunning(),
ClusterNodeAccessStrategy构造方法,
ZookeeperPathUtils.getDestinationServerRunning(destination)
)
11)、在Canal运行期间,可以动态的增加instances配置、修改instances配置。
2、Canal内部组件解析
1)Canal节点,可以有多个instances,每个instance在运行时为一个单独的Spring Context,对象实例为“CanalInstanceWithSpring”。
2)每个instances有一个单独的线程处理整个数据流过程。
3)instance内部有EventParser、EventSink、EventStore、metaManager主要四个组件构成,当然还有其他的守护组件比如monitor、HA心跳检测、ZK事件监听等。对象实例初始化和依赖关系,可以参见“default-instance.xml”,其配置模式为普通的Spring。
(源码参见:SpringCanalInstanceGenerator)
4)Parser主要用于解析指定"数据库"的binlog,内部基于JAVA实现的“binlog dump”、“show master status”等。Parser会与ZK交互,并获取当前instance所有消费者的cursor,并获其最小值,作为此instance解析binlog的起始position。目前的实现,一个instance同时只能有一个consumer处于active消费状态,ClientId为定值“1001”,“cursor”中包含consumer消费binlog的position,数字类型。由此可见,Canal instance本身并没有保存binlog的position,Parser中继操作是根据consumer的消费cursor位置来决定;对于信息缺失时,比如Canal集群初次online,且在“default-instance.xml”中也没有指定“masterPositiion”信息(每个instance.properties是可以指定起始position的),那么将根据“show master status”指令获取当前binlog的最后位置。
(源码:MysqlEventParser.findStartPosition())
5)Parser每次、批量获取一定条数的binlog,将binlog数据封装成event,并经由EventSink将消息转发给EventStore,Sink的作用就是“协调Parser和Store”,确保binglog的解析速率与Store队列容量相容。
(参见源码:AbstractEventParser.start(),
EntryEventSink.sink()
)
6)EventStore,用于暂存“尚未消费”的events的存储队列,默认基于内存的阻塞队列实现。Store中的数据由Sink组件提交入队,有NettyServer服务的消费者消费确认后出队,队列的容量和容量模式由“canal.properties”中的“memory”相关配置决定。当Store中容量溢满时,将会阻塞Sink操作(间接阻塞Parser),所以消费者的效能会直接影响instance的同步效率。
7)metaManager:主要用于保存Parser组件、CanalServer(即本文中提到的NettyServer)、Canal Instances的meta数据,其中Parser组件涉及到的binlog position、CanalServer与消费者交互时ACK的Cursor信息、instance的集群运行时信息等。根据官方解释,我们在production级别、高可靠业务要求场景下,metaManager建议基于Zookeeper实现。
其中有关Position信息由CanalLogPositionManager类负责,其实现类有多个,在Cluster模式下,建议基于FailbackLogPositionManager,其内部有“primary”、“failback”两级组合,优先基于primary来存取Position,只有当primary异常时会“降级”使用failback;其配置模式,建议与“default-instance.xml”保持一致。
(参看源码:CanalMetaManager,PeriodMixedMetaManager)
3、Consumer端
1)Consumer允许分布式部署,多个对等节点互备。但是任何时候,同一个destination的消费者只能有一个(client实例),这种排他、协调操作由zookeeper承担。在Cluster模式下,指定zkServer的地址,那么Consumer将会从meta信息中获取指定destination所对应的instance运行在哪个Canal节点上,且CanalServer(即NettyServer)的ip:port信息,那么此时Consumer将根据“ip:port”与NettyServer建立连接,并进行数据交互。
(参见源码:SimpleCanalConnector.connect(),
ClientRunningMonitor.start()
)
2)Consumer有序消费消息,严格意义上说,我们强烈建议Consumer为单线程逐条处理。尽管研发同学,有很多策略可以让消息的处理过程使用多线程,但是对于消息的ACK将需要特殊的关注,而且非有序情境下,或许会对你的数据一致性有一定的影响。
3)消费者的消费效率,取决于“业务本身”,我们建议业务处理尽可能“短平快”。如果你的业务处理相对耗时,也不建议大家再使用“比如MQ、kafka”等其他异步存储做桥接,因为这本质上对提高endpoint端效能没有太大帮助,反而增加了架构的复杂性。
4)我们严格限制:消费者在处理业务时,必须捕获所有异常,并将异常的event和处理过程的exception打印到业务日志,以备将来进行数据补偿;捕获异常,有助于Consumer可以继续处理后续的event,那么整个canal链路不会因为一条消息而导致全部阻塞或者rollback。
5)Consumer单线程运行,阻塞、流式处理消息,获取event的方式为pull + batch;每个batch的size由配置决定,一个batch获取结束后,将会逐个调用业务的process方法,并在整个batch处理结束后,按需进行ack或者rollback。
6)需要注意:rollback操作是根据batchId进行,即回滚操作将会导致一个batch的消息会被重发;后续有重复消费的可能,这意味着业务需要有兼容数据幂等的能力。
7)消费者的ClientId为定值:1001,不可修改。
三、部署与最佳实践(建议)
1、Canal集群部署
1)Production场景,节点个数至少为2,考虑到Canal自身健壮性,也不建议Canal单组集群的节点数量过多。
2)Canal节点为“网络IO高耗”、“CPU高耗”(并发要求较高,体现在instance处理、consumer交互频繁)型应用,对磁盘IO、内存消耗很低。
3)不建议Canal与其他应用混合部署,我们认定Canal为核心组件,其可用性应该被保障在99.99%+。
4)每个Canal集群的instances个数,并没有严格限制,但其所能承载的数据量(TPS,包括consumer + binlog parser)是评估instances个数的主要条件。考虑到Production级别数据变更的场景不可控,我们建议每个Canal集群的instance个数,应该在1~3个。
5)对于核心数据库、TPS操作较高的数据库,应该使用单独的Canal。
6)Canal集群的个数多,或者分散,或者利用率低,并不是我们特别关注的事情,不要因为过度考虑“资源利用率”、“Consumer的集中化”而让Canal负重。
7)Canal的配置,绝大部分可以使用“默认”,但是要求在Production场景,instance模式必须使用Spring,配置方式采用“default-instance.xml”。“default-instance.xml”默认配置已满足我们HA环境下的所有设计要求。(版本:1.0.24)
8)Canal机器的配置要求(最低):4Core、8G;建议:8Core、16G。
9)Canal的上游,即MySQL实例,可以是“Master”或者任意level的Slave,但是无论如何,其binlog_format必须为ROW,通过使用“show variables like 'binlog_format"”来确定。目前已经验证,使用mixed模式可能导致某些UPDATE操作事件无法被消费者解析的问题。
2、Zookeeper集群
1)Zookeeper集群,要求至少3个节点。网络联通性应该尽可能的良好。
2)多个Canal Cluster可以共享一个ZK集群,而且建议共享。那么只需要在canal.properties文件中“zkServers”配置项增加“rootPath”后缀即可,比如“10.0.1.21:2181,10.0.1.22:2181/canal/g1”。但是不同的Canal cluster,其rootPath应该不同。我们约定所有的Canal集群,rootpath的都以“/canal/”开头。(这对我们后续的ZK监控比较有利,我们只需要遍历"/canal"的子节点即可知道集群信息)
3)业界也有一种通用的部署方式,zookeeper集群与canal共生部署,三个节点,每个节点上都部署一个ZK和canal;这种部署模式的出发点也是比较简单,分析canal问题时只需要通过本地zk即可。(仅为建议)
4)需要非常注意,rootpath必须首先创建,否则canal启动时将会抛出异常!
3、Consumer集群
1)Consumer实例为普通application,JAVA项目,Spring环境。
2)Consumer集群至少2个节点,分布式部署。运行态为M-S。
3)每个Consumer实例为单线程,Consumer本身对CPU、内存消耗较低,但是对磁盘有一定的要求,因为我们将会打印大量的日志。建议磁盘为200G + ,logback的日志格式应该遵守我司规范,后续承接ELK基础数据平台。
4)一个Application中,允许有多个Consumer实例。
5)Consumer的业务处理部分,必须捕获全部异常,否则异常逃逸将可能导致整个链路的阻塞;对于异常情况下,建议进行日志记录,稍后按需进行数据补偿。
6)Consumer的业务处理部分,我们要求尽可能的快,业务处理简单;最重要的是千万不要在业务处理部分使用比如“Thread.sleep”、“Lock”等阻塞线程的操作,这可能导致主线程无法继续;如果必须,建议使用分支线程。
7)如果你对消息的顺序、事务不敏感,也允许你在业务处理部分使用多线程,这一部分有一定的歧义,所以需要开发者自己评估。从原理上说,多线程可以提高消息消费的效率,但是对数据一致性可能会有影响。但是Consumer的Client框架,仍然坚守单线程、有序交付。
8)在CanalServer和Consumer端,都能指定“filter”,即“过滤不关注的schema消息”;在CanalServer启动时将会首先加载“instance.properties”中的filter配置并生效,此后如果instance的消费者上线且也指定了filter,那么此filter信息将会被注册ZK中,那么CanalServer将会基于ZK获取此信息,并将Consumer端的filter作为最终决策;由此可见,我们在Consumer端指定filter的灵活性更高(当然隐蔽性也增加,这对排查问题需要一些提前沟通),无论如何,CanalServer不会传送“不符合filter”的消息给Consumer。
4、Filter规则描述:适用于instance.properties和Consumer端的subscribe()方法
1) 所有表:.* or .*\\..*
2) canal schema下所有表: canal\\..*
3) canal下的以canal打头的表:canal\\.canal.*
4) canal schema下的一张表:canal.test1
5) 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
5、运行状态监控
非常遗憾的是,Canal监控能力相当的弱,内部程序中几乎没有JMX的任何export机制,所以如果需要监控比如“slave延迟”、“消费速率”、“position”等,需要开发代码。思路如下:
1)开发一个JAVA WEB项目。
2)读取ZK中的相关META信息,解析出每个destination对于的slave地址,并创建JDBC连接,发送“show master status”等指令,查看此slave binlog的位置,用于判断Canal延迟。
3)读取ZK中相关META信息,解析出每个destination对应的consumer cursor,与2)进行对比,用于判定consumer的消费延迟。
四、Canal核心配置样例
建议保持默认
五、Consumer代码样例(抽象类)
六、META数据整理(zookeeper,省略了chrootpath)
1、/otter/canal/cluster:子节点列表,临时节点
含义:表示当前集群中处于Active状态的,CanalServer的列表和TCP端口。
创建:当CanalServer节点启动时创建。
删除:CanalServer失效时删除对应的临时节点。
2、/otter/canal/destinations/:子节点列表
含义:表示当前集群中,正在提供服务的instances对应的destination列表。根据原理,每个destination对应一个instance实例。
创建:Canal节点初始化instance时。
删除:当instance配置被删除时。
3、/otter/canal/destinations/${destination}/running:节点值,临时节点
含义:表示此destination对应的instances由哪个CanalServer提供服务。
创建:当次instance初始化时,通过ZK抢占成功后,写入Server信息。集群中多个Canal节点会同时初始化instance,但是只有一个Canal处于服务状态。
获取:Consumer会根据destination从ZK中获取此信息,并根据”address“与CanalServer建立连接。
删除:instance无法与MySQL Server正常通讯、且重试N次后,将会触发HA切换,此时会删除此节点。同时,其他Canal节点会检测ZK事件,并重新抢占,成功者将会成为此destination的服务者。
4、/otter/canal/destinations/${destination}/cluster:子节点列表,临时节点
含义:集群中可以提供此destination服务的CanalServer列表(不表示正在提供服务的CannalServer)。
创建:instance初始化时
获取:Consumer侦听此列表的变更事件。
5、/otter/canal/destinations/${destination}/1001:节点存在与否,历史节点
含义:“1001”为定值,如果存在此子节点,表示此destination上有Active状态的消费者。
创建:Consumer启动时。
获取:Consumer集群的每个实例都会探测其变更事件。
6、/otter/canal/destinations/${destination}/1001/filter:节点值
含义:表示此destination使用的过滤器。
创建:1)instance初始化时,会读取instance.properties文件中的filter配置,并保存在此节点值中。 2)Consumer初始化时,由subscribe()方法传入,并通过RPC同学发送给CanalServer并由其修改此值。
获取:instance中的Parser组件会侦听此值的变更,并根据此值作为binlog过滤的条件。
7、/otter/canal/destinations/${destination}/1001/cursor:节点值
含义:其“position”字段表示Consumer已经消费、ACK的binglog位置。“timestamp”表示最后位置的binlog事件发生的时间。可以通过此节点值,判断当前Canal的延迟。
创建:CanalServer收到Consumer ACK之后,有定时器间歇性的写入ZK。(metaManager,参见default-instance.xml)
8、/otter/canal/destinations/${destination}/1001/running:节点值,临时节点
含义:当前处于running状态的Consumer实例所在的位置。
创建:Consumer实例初始化,且抢占成功时。
获取:其他Consumer会检测节点的变更事件。
删除:Consumer实例关闭时。
七、Canal Instance位点查找过程(spring模式)
1、从ZK中获取消费者ACK的位点LogPosition,包括binlog名称和position、时间戳等。
2、如果1、存在,则比较LogPosition与当前配置中的"master.address"地址是否一致:
1)如果一致,且dump正常,则使用此位点。但是如果dump错误次数达到阈值(dumpErrorCountThreshold),则使用LogPosition中的timestamp回退一分钟,重新获取位点(如果获取失败,则启动失败)。(根据timestamp获取Position的方式,就是将binlog文件列表中查找此时间戳所在的binlog文件,并重新构建LogPosition,如果找不到则返回null,后续会中断instance服务)
2)如果不一致,表示切换了数据库地址,则直接将timestamp回退一分钟,从新库(当前配置的“master.address”)获取新的LogPosition并使用。
3、如果1、不存在,表示没有历史消费记录,依次从“master.address” 、“standby.address”配置中选择一个不为空的作为LogPosition。
1)如果LogPosition中,没有指定journalName(即binlog文件名),则使用timestamp获取位点(方式同上);如果也没有指定timestamp,则直接使用数据库最新位点。
2)如果指定了journalName,此时也明确指定了“position”配置参数,则直接使用;如果没有指定“position”但是指定了timestamp,则尝试获取实际的position并使用,如果在此timestamp没有找到位点,则从此journalName中首条开始。
无论是ZK获取position、还是通过instance.properties获取postition,信息都应该正确与数据库实际状态匹配;否则将无法构建LogPosition对象,直接导致此instance停止服务。
八、答疑
1、Canal会不会丢失数据?
答:Canal正常情况下不会丢失数据,比如集群节点失效、重启、Consumer关闭等;但是,存在丢数据的风险可能存在如下几种可能:
1)ZK的数据可靠性或者安全性被破坏,比如ZK数据丢失,ZK的数据被人为串改,特别是有关Position的值。
2)MySQL binlog非正常运维,比如binglog迁移、重命名、丢失等。
3)切换MySQL源,比如原来基于M1实例,后来M1因为某种原因失效,那么Canal将数据源切换为M2,而且M1和M2可能binlog数据存在不一致(非常有可能)。
4)Consumer端ACK的时机不佳,比如调用get()方法,而不是getWithoutAck(),那么消息有可能尚未完全消费,就已经ACK,那么此时由异常或者Consumer实例失效,则可能导致消息丢失。我们需要在ACK时机上保障“at lease once”。
2、Canal的延迟很大是什么原因?
答:根据数据流的pipeline,“Master” > "Slave" > "Canal" > "Consumer",每个环节都需要耗时,而且整个管道中都是单线程、串行、阻塞式。(假如网络层面都是良好的)
1)如果批量insert、update、delete,都可能导致大量的binlog产生,也会加剧Master与slave之间数据同步的延迟。(写入频繁)
2)“Consumer”消费的效能较低,比如每条event执行耗时很长。这会导致数据变更的消息ACK较慢,那么对于Canal而言也将阻塞,直到Canal内部的store有足够的空间存储新消息、才会继续与Slave进行数据同步。
3)如果Canal节点ZK的网络联通性不畅,将会导致Canal集群处于动荡状态,大量的时间消耗在ZK状态监测和维护上,而无法对外提供正常服务,包括不能顺畅的dump数据库数据。
3、Canal会导致消息重复吗?
答:会,这从两个大的方面谈起。
1)Canal instance初始化时,根据“消费者的Cursor”来确定binlog的起始位置,但是Cursor在ZK中的保存是滞后的(间歇性刷新),所以Canal instance获得的起始position一定不会大于消费者真实已见的position。
2)Consumer端,因为某种原因的rollback,也可能导致一个batch内的所有消息重发,此时可能导致重复消费。
我们建议,Consumer端需要保持幂等,对于重复数据可以进行校验或者replace。对于非幂等操作,比如累加、计费,需要慎重。
4、Canal性能如何?
答:Canal本身非常轻量级,主要性能开支就是在binlog解析,其转发、存储、提供消费者服务等都很简单。它本身不负责数据存储。原则上,canal解析效率几乎没有负载,canal的本身的延迟,取决于其与slave之间的网络IO。
5、Canal数据的集散问题,一个destination的消息能否被多个Consumer集群并行消费?
答:比如有两个Consumer集群,C1/C2,你希望C1和C2中的消费者都能够订阅到相同的消息,就像Kafka或者JMS Topic一样...但是非常遗憾,似乎Canal无法做到,这取决于Canal内部的存储模式,Canal内部是一个“即发即失”的内存队列,无法权衡、追溯不同Consumer之间的消息,所以无法支持。
如果希望达到这种结果,有2个办法:第一,消费者收到消息以后转发到kafka或者MQ中,后继的其他Consumer只与kafka或者MQ接入;第二:一个Canal中使用多个destination,但是它们对应相同的MySQL源。
6、我的Consumer从canal消费数据,但是我的业务有反查数据库的操作,那么数据一致性怎么做?
答:从基本原理,我们得知canal就像一个“二级Slave”一样,所以canal接收到的数据总是相对滞后,如果消费者消费效率较低,那么从consumer的角度来说,它接收的数据更加滞后;如果consumer中反查数据库,无论它查找master还是其他任意level的从库,都会获得比当前视图更新(fresh)的数据,无论如何,我们总是无法做到完全意义上的“数据一致性”视图。
比如,canal消费者收到的数据为db.t1.row1.column1 = A,那么此时master上column1值已经更改为B,但是Slave可能因为与master同步延迟问题,此时Slave上column1值可能为C。所以无论你怎么操作,都无法得到一致性的数据。(数据发生的时间点,A < C < B)。
我们需要接受这种问题,为了避免更多干扰,consumer反查数据时使用canal所对应的slave可以在一定程度上缓解数据一致性的风险,但是这仍然无法解决问题。但是这种策略仍然有风险,会知道canal所对应的slave性能消耗加剧,进而增加数据同步的延迟。
理想的解决办法:canal的消费者,消费数据以后,写入到一个数据库或者ES,那么在消费者内部的数据反查操作,全部基于这个数据库或者ES。
7、Consumer端无法进行消费的问题?
答: 1)Consumer会与ZK集群保持联通性,用于检测消费者集群、CanalServer集群的变化,如果Consumer与ZK集群的联通性失效,将会导致消费者无法正常工作。
2)Consumer会与CanalServer保持TCP长连接,此长连接用于传输消息、心跳检测等,如果Consumer与CanalServer联通性故障,将有可能导致Consumer不断重试,此期间消息无法正常消费。
3)如果CanalServer与ZK联通性失效,将会导致此CanalServer释放资源,进行HA切换,切换时间取决于ZK的session活性检测,大概为30S,此期间消费者无法消费。
4)CanalServer中某个instance与slave联通性失效,将会触发HA切换,切换时间取决于HA心跳探测时间,大概为30S,此期间消费者无法消费。
8、如果Canal更换上游的master(或者slave),该怎么办?(比如迁库、迁表等)
答:背景要求,我们建议“新的数据库最好是旧的数据库的slave”或者“新、旧数据库为同源master”,平滑迁移;
1)创建一个新的instance,使用新的destination,并与新的Slave创建连接。
2)在此期间,Consumer仍然与旧的destination消费。
3)通过“timestamp”确认,新的slave的最近binlog至少已经超过此值。
4)Consumer切换,使用新的destination消费,可能会消费到重复数据,但是不会导致数据丢失。
当然,更简单的办法就是直接将原destination中的数据库地址跟新即可,前提是新、旧两个数据库同源master,新库最好已经同步执行了一段时间。
9、Canal如何重置消费的position?
答:比如当消费者在消费binlog时,数据异常,需要回溯到旧的position重新消费,是这个场景!
1)我们首先确保,你需要回溯的position所对应的binlog文件仍然存在,可以通过需要回溯的时间点来确定position和binlog文件名,这一点可以通过DBA来确认。
2)关闭消费者,否则重置位点操作无法生效。(你可以在关闭消费者之前执行unsubscribe,来删除ZK中历史位点的信息)
3)关闭Canal集群,修改对应的destination下的配置文件中的“canal.instance.master.journal.name = <此position对应的binlog名称>”、“canal.instance.master.position = <此position>”;可以只需要修改一台。
4)删除zk中此destination的消费者meta信息,“${destination}/1001"此path下所有的子节点,以及“1001”节点。(可以通过消费者执行unsubscribe来实现)
5)重启2)中的此canal节点,观察日志。
6)重启消费者。
当然应该还有其他更优的操作方式,请大家补充。上述操作过程,是被验证可行的。
查看图片附件
分享到:
Tomcat-jdbc + Spring启动错误“Layered po ... | MySQL Router架构实践
2017-11-13 20:05浏览 20833评论(3)分类:企业架构查看更多
评论
3 楼 theone5288 2018-09-14
你好,如果我设置canal从很早之前的一个log的position点开始同步,性能会很慢么?我这里测试,从很早一个点位开始同步,相当于mysql日志里面积压了很多数据,很多日志,此时打开canal,发现canal在fetcher.fetch()dmup日志event这个地方就会越来越慢呢。刚开始运行会1秒钟写获取到5-6000条event,但跑了几秒后,就越来越慢,最后好几秒才1000条。
2 楼 QING____ 2018-03-14
世界杯2009 写道
canal支持前缀filter吗?比如schema是actionlog+四位数字,filter能匹配这种吗?
支持的,就是一般的JAVA表达式能支持的都可以。