Skip to content

Commit

Permalink
track time waiting on event channels
Browse files Browse the repository at this point in the history
  • Loading branch information
meiji163 committed Oct 24, 2024
1 parent fa7c484 commit 126c981
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions go/logic/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Coordinator struct {

throttler *Throttler

// Atomic counter for number of active workers
// Atomic counter for number of active workers (not in workerQueue)
busyWorkers atomic.Int64

// Mutex protecting currentCoordinates
Expand Down Expand Up @@ -75,12 +75,21 @@ type Worker struct {
}

type stats struct {
dmlRate float64
trxRate float64
dmlRate float64
trxRate float64

// Number of DML events applied
dmlEventsApplied int64
executedJobs int64
busyTime time.Duration
waitTime time.Duration

// Number of transactions processed
executedJobs int64

// Time spent applying DML events
busyTime time.Duration

// Time spent waiting on transaction dependecies
// or waiting on events to arrive in queue.
waitTime time.Duration
}

func (w *Worker) ProcessEvents() error {
Expand All @@ -92,7 +101,11 @@ func (w *Worker) ProcessEvents() error {
if w.coordinator.finishedMigrating.Load() {
return nil
}

// Wait for first event
waitStart := time.Now()
ev := <-w.eventQueue
w.waitTimeNs.Add(time.Since(waitStart).Nanoseconds())

// Verify this is a GTID Event
gtidEvent, ok := ev.Event.(*replication.GTIDEvent)
Expand All @@ -114,7 +127,11 @@ func (w *Worker) ProcessEvents() error {
dmlEvents := make([]*binlog.BinlogDMLEvent, 0, int(atomic.LoadInt64(&w.coordinator.migrationContext.DMLBatchSize)))
events:
for {
// wait for next event in the transaction
waitStart := time.Now()
ev := <-w.eventQueue
w.waitTimeNs.Add(time.Since(waitStart).Nanoseconds())

if ev == nil {
fmt.Printf("Worker %d ending transaction early\n", w.id)
break events
Expand Down Expand Up @@ -364,6 +381,8 @@ func (c *Coordinator) ProcessEventsUntilNextChangelogEvent() (*binlog.BinlogDMLE
return nil, nil
}

// ProcessEventsUntilDrained reads binlog events and sends them to the workers to process.
// It exits when the event queue is empty and all the workers are returned to the workerQueue.
func (c *Coordinator) ProcessEventsUntilDrained() error {
for {
select {
Expand Down Expand Up @@ -399,7 +418,6 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
switch binlogEvent := ev.Event.(type) {
case *replication.QueryEvent:
if bytes.Equal([]byte("BEGIN"), binlogEvent.Query) {
// c.migrationContext.Log.Infof("BEGIN for transaction in schema %s", binlogEvent.Schema)
} else {
worker.eventQueue <- nil
continue
Expand Down Expand Up @@ -427,8 +445,7 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
// No events in the queue. Check if all workers are sleeping now
default:
{
busyWorkers := c.busyWorkers.Load()
if busyWorkers == 0 {
if c.busyWorkers.Load() == 0 {
return nil
}
}
Expand All @@ -450,6 +467,7 @@ func (c *Coordinator) InitializeWorkers(count int) {
}
}

// GetWorkerStats collects profiling stats for ProcessEvents from each worker.
func (c *Coordinator) GetWorkerStats() []stats {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down

0 comments on commit 126c981

Please sign in to comment.