Skip to content

Commit cb56783

Browse files
authored
worker: fix subtask initialized state (pingcap#240)
1 parent a1be041 commit cb56783

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

dm/worker/subtask.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (st *SubTask) Init() error {
109109

110110
st.DDLInfo = make(chan *pb.DDLInfo, 1)
111111

112+
initializeUnitSuccess := true
112113
// when error occurred, initialized units should be closed
113114
// when continue sub task from loader / syncer, ahead units should be closed
114115
var needCloseUnits []unit.Unit
@@ -117,7 +118,7 @@ func (st *SubTask) Init() error {
117118
u.Close()
118119
}
119120

120-
st.initialized.Set(len(needCloseUnits) == 0)
121+
st.initialized.Set(initializeUnitSuccess)
121122
}()
122123

123124
// every unit does base initialization in `Init`, and this must pass before start running the sub task
@@ -126,6 +127,7 @@ func (st *SubTask) Init() error {
126127
for i, u := range st.units {
127128
err := u.Init()
128129
if err != nil {
130+
initializeUnitSuccess = false
129131
// when init fail, other units initialized before should be closed
130132
for j := 0; j < i; j++ {
131133
needCloseUnits = append(needCloseUnits, st.units[j])
@@ -140,6 +142,7 @@ func (st *SubTask) Init() error {
140142
u := st.units[i]
141143
isFresh, err := u.IsFreshTask()
142144
if err != nil {
145+
initializeUnitSuccess = false
143146
return terror.Annotatef(err, "fail to get fresh status of subtask %s %s", st.cfg.Name, u.Type())
144147
} else if !isFresh {
145148
skipIdx = i

tests/initial_unit/run.sh

+7
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,14 @@ function run() {
7575

7676
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
7777

78+
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
79+
"resume-task test" \
80+
"\"result\": true" 1 \
81+
"\"result\": false" 1 \
82+
"current stage is not paused not valid" 1
83+
7884
cleanup_process
85+
run_sql "drop database if exists initial_unit" $TIDB_PORT
7986
run_sql "drop database if exists dm_meta" $TIDB_PORT
8087
done
8188
}

0 commit comments

Comments
 (0)