Skip to content

修正插入和删除的format_data,增加获取的的data字典解释 #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

# 建立与canal服务端的连接
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.connect(host='127.0.0.1', port=11111) # canal服务端部署的主机IP与端口
client.check_valid(username=b'', password=b'') # 自行填写配置的数据库账户密码
# destination是canal服务端的服务名称, filter即获取数据的过滤规则,采用正则表达式
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')

while True:
message = client.get(100)
# entries是每个循环周期内获取到数据集
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
Expand All @@ -92,27 +95,32 @@ while True:
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
# 数据库名
database = header.schemaName
# 表名
table = header.tableName
event_type = header.eventType
# row是binlog解析出来的行变化记录,一般有三种格式,对应增删改
for row in row_change.rowDatas:
format_data = dict()
# 根据增删改的其中一种情况进行数据处理
if event_type == EntryProtocol_pb2.EventType.DELETE:
format_data['before'] = dict()
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
format_data['before'][column.name] = column.value
elif event_type == EntryProtocol_pb2.EventType.INSERT:
format_data['after'] = dict()
for column in row.afterColumns:
format_data = {
column.name: column.value
}
format_data['after'][column.name] = column.value
else:
format_data['before'] = format_data['after'] = dict()
# format_data['before'] = format_data['after'] = dict() 采用下面的写法应该更好
format_data['before'] = dict()
format_data['after'] = dict()
for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
# data即最后获取的数据,包含库名,表明,事务类型,改动数据
data = dict(
db=database,
table=table,
Expand All @@ -124,6 +132,20 @@ while True:

client.disconnect()
````
这个demo间隔一秒获取一次服务端的增量数据,并作相应的解析,代码中已做了简单的注释帮助理解,最后获取的data就是某个sql语句改动某一行的完整记录,通常有三种情况:
````python
# 设库test中有表test1,分别有id(int)和name(varchar)字段
# insert操作:insert into test.test1 values (1,'a')
# 此时data中应是如下情况
data = {'db':'test', 'table':'test1', 'event_type':1, 'data':{'after':{'id':'1', 'name':'a'}}}
# update操作:update test.test1 set id=2, name='b' where id=1
# 此时的data
data = {'db':'test', 'table':'test1', 'event_type':2, 'data':{'before':{'id':'1', 'name':'a'}, 'after':{'id':'2', 'name':'b'}}}
# delete操作:delete from test.test1 where id=2
# 此时的data
data = {'db':'test', 'table':'test1', 'event_type':3, 'data':{'before':{'id':'2', 'name':'b'}}}
````


更多详情请查看 [Sample](https://github.com/haozi3156666/canal-python/blob/master/example.py)