Skip to content

Commit

Permalink
support command-line args, scale in, alive check
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD committed Oct 28, 2024
1 parent ecd4397 commit c16245d
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 13 deletions.
8 changes: 8 additions & 0 deletions components/playground/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) {
{"ticdc", opt.TiCDC},
{"tikv-cdc", opt.TiKVCDC},
{"drainer", opt.Drainer},
{"dmmaster", opt.DMMaster},
{"dmworker", opt.DMWorker},
}

for _, cmd := range commands {
Expand Down Expand Up @@ -113,6 +115,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TSO.Host, "tso.host", "", opt.TSO.Host, "Playground TSO host. If not provided, TSO will still use `host` flag as its host")
cmd.Flags().StringVarP(&opt.Scheduling.Host, "scheduling.host", "", opt.Scheduling.Host, "Playground Scheduling host. If not provided, Scheduling will still use `host` flag as its host")
cmd.Flags().StringVarP(&opt.TiProxy.Host, "tiproxy.host", "", opt.PD.Host, "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host")
cmd.Flags().IntVarP(&opt.DMMaster.Num, "dmmaster", "", opt.DMMaster.Num, "DM-master instance number")
cmd.Flags().IntVarP(&opt.DMWorker.Num, "dmworker", "", opt.DMWorker.Num, "DM-worker instance number")

cmd.Flags().StringVarP(&opt.TiDB.ConfigPath, "db.config", "", opt.TiDB.ConfigPath, "TiDB instance configuration file")
cmd.Flags().StringVarP(&opt.TiKV.ConfigPath, "kv.config", "", opt.TiKV.ConfigPath, "TiKV instance configuration file")
Expand All @@ -123,6 +127,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TiProxy.ConfigPath, "tiproxy.config", "", opt.TiProxy.ConfigPath, "TiProxy instance configuration file")
cmd.Flags().StringVarP(&opt.Pump.ConfigPath, "pump.config", "", opt.Pump.ConfigPath, "Pump instance configuration file")
cmd.Flags().StringVarP(&opt.Drainer.ConfigPath, "drainer.config", "", opt.Drainer.ConfigPath, "Drainer instance configuration file")
cmd.Flags().StringVarP(&opt.DMMaster.ConfigPath, "dmmaster.config", "", opt.DMMaster.ConfigPath, "DM-master instance configuration file")
cmd.Flags().StringVarP(&opt.DMWorker.ConfigPath, "dmworker.config", "", opt.DMWorker.ConfigPath, "DM-worker instance configuration file")

cmd.Flags().StringVarP(&opt.TiDB.BinPath, "db.binpath", "", opt.TiDB.BinPath, "TiDB instance binary path")
cmd.Flags().StringVarP(&opt.TiKV.BinPath, "kv.binpath", "", opt.TiKV.BinPath, "TiKV instance binary path")
Expand All @@ -135,6 +141,8 @@ func newScaleOut() *cobra.Command {
cmd.Flags().StringVarP(&opt.TiKVCDC.BinPath, "kvcdc.binpath", "", opt.TiKVCDC.BinPath, "TiKVCDC instance binary path")
cmd.Flags().StringVarP(&opt.Pump.BinPath, "pump.binpath", "", opt.Pump.BinPath, "Pump instance binary path")
cmd.Flags().StringVarP(&opt.Drainer.BinPath, "drainer.binpath", "", opt.Drainer.BinPath, "Drainer instance binary path")
cmd.Flags().StringVarP(&opt.DMMaster.BinPath, "dmmaster.binpath", "", opt.DMMaster.BinPath, "DM-master instance binary path")
cmd.Flags().StringVarP(&opt.DMWorker.BinPath, "dmworker.binpath", "", opt.DMWorker.BinPath, "DM-worker instance binary path")

return cmd
}
Expand Down
4 changes: 4 additions & 0 deletions components/playground/instance/dm_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,7 @@ func (m *DMMaster) Component() string {
func (m *DMMaster) LogFile() string {
return filepath.Join(m.Dir, "dm-master.log")
}

func (m *DMMaster) Addr() string {
return utils.JoinHostPort(m.Host, m.StatusPort)
}
6 changes: 0 additions & 6 deletions components/playground/instance/dm_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"github.com/pingcap/tiup/pkg/utils"
)
Expand Down Expand Up @@ -66,11 +65,6 @@ func (w *DMWorker) Start(ctx context.Context) error {

logIfErr(w.Process.SetOutputFile(w.LogFile()))

// try to wait for the master to be ready
// this is a very ugly implementation, but it may mostly works
// TODO: find a better way to do this,
// e.g, let master support a HTTP API to check if it's ready
time.Sleep(time.Second * 3)
return w.Process.Start()
}

Expand Down
24 changes: 24 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().IntVar(&options.TiKVCDC.Num, "kvcdc", 0, "TiKV-CDC instance number")
rootCmd.Flags().IntVar(&options.Pump.Num, "pump", 0, "Pump instance number")
rootCmd.Flags().IntVar(&options.Drainer.Num, "drainer", 0, "Drainer instance number")
rootCmd.Flags().IntVar(&options.DMMaster.Num, "dmmaster", 0, "DM-master instance number")
rootCmd.Flags().IntVar(&options.DMWorker.Num, "dmworker", 0, "DM-worker instance number")

rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit")
rootCmd.Flags().IntVar(&options.TiFlash.UpTimeout, "tiflash.timeout", 120, "TiFlash max wait time in seconds for starting, 0 means no limit")
Expand Down Expand Up @@ -330,6 +332,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.Drainer.ConfigPath, "drainer.config", "", "Drainer instance configuration file")
rootCmd.Flags().StringVar(&options.TiCDC.ConfigPath, "ticdc.config", "", "TiCDC instance configuration file")
rootCmd.Flags().StringVar(&options.TiKVCDC.ConfigPath, "kvcdc.config", "", "TiKV-CDC instance configuration file")
rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dmmaster.config", "", "DM-master instance configuration file")
rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dmworker.config", "", "DM-worker instance configuration file")

rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path")
rootCmd.Flags().StringVar(&options.TiKV.BinPath, "kv.binpath", "", "TiKV instance binary path")
Expand All @@ -345,6 +349,8 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.TiKVCDC.BinPath, "kvcdc.binpath", "", "TiKV-CDC instance binary path")
rootCmd.Flags().StringVar(&options.Pump.BinPath, "pump.binpath", "", "Pump instance binary path")
rootCmd.Flags().StringVar(&options.Drainer.BinPath, "drainer.binpath", "", "Drainer instance binary path")
rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dmmaster.binpath", "", "DM-master instance binary path")
rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dmworker.binpath", "", "DM-worker instance binary path")

rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version")

Expand Down Expand Up @@ -481,6 +487,24 @@ func checkStoreStatus(pdClient *api.PDClient, storeAddr string, timeout int) boo
}
}

func checkDMMasterStatus(dmMasterClient *api.DMMasterClient, dmMasterAddr string, timeout int) bool {
if timeout > 0 {
for i := 0; i < timeout; i++ {
if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive {
return true
}
time.Sleep(time.Second)
}
return false
}
for {
if _, isActive, _, err := dmMasterClient.GetMaster(dmMasterAddr); err == nil && isActive {
return true
}
time.Sleep(time.Second)
}
}

func hasDashboard(pdAddr string) bool {
resp, err := http.Get(fmt.Sprintf("http://%s/dashboard", pdAddr))
if err != nil {
Expand Down
106 changes: 99 additions & 7 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ func (p *Playground) binlogClient() (*api.BinlogClient, error) {
return api.NewBinlogClient(addrs, 5*time.Second, nil)
}

func (p *Playground) dmMasterClient() *api.DMMasterClient {
var addrs []string
for _, inst := range p.dmMasters {
addrs = append(addrs, inst.Addr())
}

return api.NewDMMasterClient(addrs, 5*time.Second, nil)
}

func (p *Playground) pdClient() *api.PDClient {
var addrs []string
for _, inst := range p.pds {
Expand Down Expand Up @@ -384,6 +393,34 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
return nil
}
}
case spec.ComponentDMWorker:
for i := 0; i < len(p.dmWorkers); i++ {
if p.dmWorkers[i].Pid() == pid {
inst := p.dmWorkers[i]

c := p.dmMasterClient()
err = c.OfflineWorker(inst.Name(), nil)
if err != nil {
return err
}
p.dmWorkers = append(p.dmWorkers[:i], p.dmWorkers[i+1:]...)
return nil
}
}
case spec.ComponentDMMaster:
for i := 0; i < len(p.dmMasters); i++ {
if p.dmMasters[i].Pid() == pid {
inst := p.dmMasters[i]

c := p.dmMasterClient()
err = c.OfflineMaster(inst.Name(), nil)
if err != nil {
return err
}
p.dmMasters = append(p.dmMasters[:i], p.dmMasters[i+1:]...)
return nil
}
}
default:
fmt.Fprintf(w, "unknown component in scale in: %s", cid)
return nil
Expand Down Expand Up @@ -444,6 +481,10 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e
return p.sanitizeConfig(p.bootOptions.Drainer, cfg)
case spec.ComponentTiProxy:
return p.sanitizeConfig(p.bootOptions.TiProxy, cfg)
case spec.ComponentDMMaster:
return p.sanitizeConfig(p.bootOptions.DMMaster, cfg)
case spec.ComponentDMWorker:
return p.sanitizeConfig(p.bootOptions.DMWorker, cfg)
default:
return fmt.Errorf("unknown %s in sanitizeConfig", cid)
}
Expand Down Expand Up @@ -927,6 +968,40 @@ func (p *Playground) waitAllTiFlashUp() {
}
}

func (p *Playground) waitAllDMMasterUp() {
if len(p.dmMasters) > 0 {
var wg sync.WaitGroup
bars := progress.NewMultiBar(colorstr.Sprintf("[dark_gray]Waiting for dm-master instances ready"))
for _, master := range p.dmMasters {
wg.Add(1)
prefix := master.Addr()
bar := bars.AddBar(prefix)
go func(masterInst *instance.DMMaster) {
defer wg.Done()
displayResult := &progress.DisplayProps{
Prefix: prefix,
}
if cmd := masterInst.Cmd(); cmd == nil {
displayResult.Mode = progress.ModeError
displayResult.Suffix = "initialize command failed"
} else if state := cmd.ProcessState; state != nil && state.Exited() {
displayResult.Mode = progress.ModeError
displayResult.Suffix = fmt.Sprintf("process exited with code: %d", state.ExitCode())
} else if s := checkDMMasterStatus(p.dmMasterClient(), masterInst.Name(), options.DMMaster.UpTimeout); !s {
displayResult.Mode = progress.ModeError
displayResult.Suffix = "failed to up after timeout"
} else {
displayResult.Mode = progress.ModeDone
}
bar.UpdateDisplay(displayResult)
}(master)
}
bars.StartRenderLoop()
wg.Wait()
bars.StopRenderLoop()
}
}

func (p *Playground) bindVersion(comp string, version string) (bindVersion string) {
bindVersion = version
switch comp {
Expand Down Expand Up @@ -967,8 +1042,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme

p.bootOptions = options

// All others components depend on the pd, we just ensure the pd count must be great than 0
if options.PDMode != "ms" && options.PD.Num < 1 {
// All others components depend on the pd except dm, we just ensure the pd count must be great than 0
if options.PDMode != "ms" && options.PD.Num < 1 && options.DMMaster.Num < 1 {
return fmt.Errorf("all components count must be great than 0 (pd=%v)", options.PD.Num)
}

Expand Down Expand Up @@ -1083,11 +1158,17 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
}

anyPumpReady := false
allDMMasterReady := false
// Start all instance except tiflash.
err := p.WalkInstances(func(cid string, ins instance.Instance) error {
if cid == spec.ComponentTiFlash {
return nil
}
// wait dm-master up before dm-worker
if cid == spec.ComponentDMWorker && !allDMMasterReady {
p.waitAllDMMasterUp()
allDMMasterReady = true
}

err := p.startInstance(ctx, ins)
if err != nil {
Expand Down Expand Up @@ -1166,9 +1247,20 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
}
}

if pdAddr := p.pds[0].Addr(); len(p.tidbs) > 0 && hasDashboard(pdAddr) {
fmt.Printf("TiDB Dashboard: ")
colorCmd.Printf("http://%s/dashboard\n", pdAddr)
if len(p.dmMasters) > 0 {
fmt.Printf("Connect DM: ")
endpoints := make([]string, 0, len(p.dmMasters))
for _, dmMaster := range p.dmMasters {
endpoints = append(endpoints, dmMaster.Addr())
}
colorCmd.Printf("tiup dmctl --master-addr %s\n", strings.Join(endpoints, ","))
}

if len(p.pds) > 0 {
if pdAddr := p.pds[0].Addr(); len(p.tidbs) > 0 && hasDashboard(pdAddr) {
fmt.Printf("TiDB Dashboard: ")
colorCmd.Printf("http://%s/dashboard\n", pdAddr)
}
}

if p.bootOptions.Mode == "tikv-slim" {
Expand Down Expand Up @@ -1286,13 +1378,13 @@ func (p *Playground) terminate(sig syscall.Signal) {
go kill("grafana", p.grafana.cmd.Process.Pid, p.grafana.wait)
}

for _, inst := range p.dmMasters {
for _, inst := range p.dmWorkers {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}

for _, inst := range p.dmWorkers {
for _, inst := range p.dmMasters {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
Expand Down

0 comments on commit c16245d

Please sign in to comment.