From 641fe922b6d4ae2b0cbd5f1dc6658a631860508f Mon Sep 17 00:00:00 2001 From: meiji163 Date: Thu, 24 Oct 2024 17:00:54 -0700 Subject: [PATCH] add flag for number of Coordinator workers --- go/base/context.go | 2 +- go/cmd/gh-ost/main.go | 1 + go/logic/coordinator.go | 2 -- go/logic/migrator.go | 4 +--- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index f8c179984..f12f62e35 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -161,7 +161,7 @@ type MigrationContext struct { CutOverType CutOver ReplicaServerId uint - // Number of workers used by the Coordinator + // Number of workers used by the trx coordinator NumWorkers int Hostname string diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 3e6057995..4b3e0bed1 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -109,6 +109,7 @@ func main() { defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") + flag.IntVar(&migrationContext.NumWorkers, "workers", 8, "Number of concurrent workers for applying DML events. Each worker uses one goroutine.") maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation") replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index 56e137649..ceb8f1d9a 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -276,8 +276,6 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, t waitingJobs: make(map[int64][]chan struct{}), events: make(chan *replication.BinlogEvent, 1000), - - workerQueue: make(chan *Worker, 16), } } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index bd0e6aea7..b2c4538a8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -332,8 +332,6 @@ func (this *Migrator) Migrate() (err error) { return err } - // TODO(meiji163): configure workers - this.migrationContext.NumWorkers = 16 this.trxCoordinator = NewCoordinator(this.migrationContext, this.applier, this.throttler, this.onChangelogEvent) if err := this.initiateStreaming(); err != nil { @@ -364,7 +362,7 @@ func (this *Migrator) Migrate() (err error) { } } - this.migrationContext.Log.Info("starting applier workers") + this.migrationContext.Log.Infof("starting %d applier workers", this.migrationContext.NumWorkers) this.trxCoordinator.InitializeWorkers(this.migrationContext.NumWorkers) initialLag, _ := this.inspector.getReplicationLag()