Skip to content

Commit b18ce73

Browse files
committed
[Enhancement] add support for restore to ccr
1 parent 8955600 commit b18ce73

File tree

14 files changed

+1069
-8
lines changed

14 files changed

+1069
-8
lines changed

pkg/ccr/job.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2609,6 +2609,46 @@ func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.Recov
26092609
return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true)
26102610
}
26112611

2612+
func (j *Job) handleRestoreInfo(binlog *festruct.TBinlog) error {
2613+
log.Infof("handle restore info binlog, prevCommitSeq: %d, commitSeq: %d",
2614+
j.progress.PrevCommitSeq, j.progress.CommitSeq)
2615+
2616+
data := binlog.GetData()
2617+
restoreInfo, err := record.NewRestoreInfoFromJson(data)
2618+
if err != nil {
2619+
return err
2620+
}
2621+
return j.handleRestoreInfoRecord(binlog.GetCommitSeq(), restoreInfo)
2622+
}
2623+
2624+
func (j *Job) handleRestoreInfoRecord(commitSeq int64, restoreInfo *record.RestoreInfo) error {
2625+
if len(restoreInfo.TableInfo) != 1 {
2626+
// for both table and db sync take a full snapshot.
2627+
log.Warnf("Lets do new snapshot")
2628+
return j.newSnapshot(commitSeq)
2629+
}
2630+
2631+
if len(restoreInfo.TableInfo) == 1 {
2632+
for tableId, tableName := range restoreInfo.TableInfo {
2633+
switch j.SyncType {
2634+
case TableSync:
2635+
log.Warnf("full snapshot, table:%d and name:%s",
2636+
tableId, tableName)
2637+
return j.newSnapshot(commitSeq)
2638+
case DBSync:
2639+
log.Warnf("new partial snapshot, table:%d and name:%s",
2640+
tableId, tableName)
2641+
replace := true // replace the old data to avoid blocking reading
2642+
return j.newPartialSnapshot(tableId, tableName, nil, replace)
2643+
default:
2644+
break
2645+
}
2646+
}
2647+
}
2648+
//This is unreachable.
2649+
return nil
2650+
}
2651+
26122652
func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
26132653
data := binlog.GetData()
26142654
barrierLog, err := record.NewBarrierLogFromJson(data)
@@ -2693,6 +2733,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
26932733
return err
26942734
}
26952735
return j.handleRecoverInfoRecord(commitSeq, recoverInfo)
2736+
case festruct.TBinlogType_RESTORE_INFO:
2737+
restoreInfo, err := record.NewRestoreInfoFromJson(barrierLog.Binlog)
2738+
if err != nil {
2739+
return err
2740+
}
2741+
return j.handleRestoreInfoRecord(commitSeq, restoreInfo)
26962742
case festruct.TBinlogType_BARRIER:
26972743
log.Info("handle barrier binlog, ignore it")
26982744
default:
@@ -2807,6 +2853,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error {
28072853
return j.handleDropRollup(binlog)
28082854
case festruct.TBinlogType_RECOVER_INFO:
28092855
return j.handleRecoverInfo(binlog)
2856+
case festruct.TBinlogType_RESTORE_INFO:
2857+
return j.handleRestoreInfo(binlog)
28102858
default:
28112859
return xerror.Errorf(xerror.Normal, "unknown binlog type: %v", binlog.GetType())
28122860
}

pkg/ccr/record/restore_info.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package record
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/selectdb/ccr_syncer/pkg/xerror"
7+
)
8+
9+
type RestoreInfo struct {
10+
DbId int64 `json:"dbId"`
11+
DbName string `json:"dbName"`
12+
TableInfo map[int64]string `json:"tableInfo"`
13+
}
14+
15+
func NewRestoreInfoFromJson(data string) (*RestoreInfo, error) {
16+
var restoreInfo RestoreInfo
17+
err := json.Unmarshal([]byte(data), &restoreInfo)
18+
if err != nil {
19+
return nil, xerror.Wrap(err, xerror.Normal, "unmarshal create table error")
20+
}
21+
22+
if restoreInfo.DbId == 0 {
23+
return nil, xerror.Errorf(xerror.Normal, "db id not found")
24+
}
25+
return &restoreInfo, nil
26+
}

pkg/rpc/kitex_gen/frontendservice/FrontendService.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/rpc/thrift/FrontendService.thrift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,6 +1198,7 @@ enum TBinlogType {
11981198
RENAME_PARTITION = 22,
11991199
DROP_ROLLUP = 23,
12001200
RECOVER_INFO = 24,
1201+
RESTORE_INFO = 25,
12011202
// Keep some IDs for allocation so that when new binlog types are added in the
12021203
// future, the changes can be picked back to the old versions without breaking
12031204
// compatibility.
@@ -1213,8 +1214,7 @@ enum TBinlogType {
12131214
// MODIFY_XXX = 17,
12141215
// MIN_UNKNOWN = 18,
12151216
// UNKNOWN_3 = 19,
1216-
MIN_UNKNOWN = 25,
1217-
UNKNOWN_10 = 26,
1217+
MIN_UNKNOWN = 26,
12181218
UNKNOWN_11 = 27,
12191219
UNKNOWN_12 = 28,
12201220
UNKNOWN_13 = 29,
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql_source_content --
3+
0 0
4+
0 1
5+
0 2
6+
0 3
7+
0 4
8+
0 5
9+
0 6
10+
0 7
11+
0 8
12+
0 9
13+
14+
-- !target_sql_content --
15+
0 0
16+
0 1
17+
0 2
18+
0 3
19+
0 4
20+
0 5
21+
0 6
22+
0 7
23+
0 8
24+
0 9
25+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql_source_content_backup --
3+
0 0
4+
0 1
5+
0 2
6+
7+
-- !target_sql_content_backup --
8+
0 0
9+
0 1
10+
0 2
11+
12+
-- !sql_source_content_new --
13+
0 0
14+
0 1
15+
0 2
16+
9 0
17+
9 1
18+
9 2
19+
20+
-- !target_sql_content_new --
21+
0 0
22+
0 1
23+
0 2
24+
9 0
25+
9 1
26+
9 2
27+
28+
-- !sql_source_content_restore --
29+
0 0
30+
0 1
31+
0 2
32+
33+
-- !target_sql_content_restore --
34+
0 0
35+
0 1
36+
0 2
37+
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql_source_content --
3+
0 0
4+
0 1
5+
0 2
6+
0 3
7+
0 4
8+
0 5
9+
0 6
10+
0 7
11+
0 8
12+
0 9
13+
14+
-- !target_sql_content --
15+
0 0
16+
0 1
17+
0 2
18+
0 3
19+
0 4
20+
0 5
21+
0 6
22+
0 7
23+
0 8
24+
0 9
25+
26+
-- !sql_source_content --
27+
0 0
28+
0 1
29+
0 2
30+
0 3
31+
0 4
32+
0 5
33+
0 6
34+
0 7
35+
0 8
36+
0 9
37+
38+
-- !target_sql_content --
39+
0 0
40+
0 1
41+
0 2
42+
0 3
43+
0 4
44+
0 5
45+
0 6
46+
0 7
47+
0 8
48+
0 9
49+
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql_source_content --
3+
0 0
4+
0 1
5+
0 2
6+
0 3
7+
0 4
8+
0 5
9+
0 6
10+
0 7
11+
0 8
12+
0 9
13+
1 0
14+
1 1
15+
1 2
16+
1 3
17+
1 4
18+
1 5
19+
1 6
20+
1 7
21+
1 8
22+
1 9
23+
24+
-- !target_sql_content --
25+
0 0
26+
0 1
27+
0 2
28+
0 3
29+
0 4
30+
0 5
31+
0 6
32+
0 7
33+
0 8
34+
0 9
35+
1 0
36+
1 1
37+
1 2
38+
1 3
39+
1 4
40+
1 5
41+
1 6
42+
1 7
43+
1 8
44+
1 9
45+
46+
-- !sql_source_content --
47+
0 0
48+
0 1
49+
0 2
50+
0 3
51+
0 4
52+
0 5
53+
0 6
54+
0 7
55+
0 8
56+
0 9
57+
58+
-- !target_sql_content --
59+
0 0
60+
0 1
61+
0 2
62+
0 3
63+
0 4
64+
0 5
65+
0 6
66+
0 7
67+
0 8
68+
0 9
69+

0 commit comments

Comments
 (0)