Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Add progress for each partition in SHOW PROCESSLIST #855

Merged
merged 2 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions sql/analyzer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,25 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {

seen[name] = struct{}{}

notify := func() {
onPartitionDone := func(partitionName string) {
processList.UpdateProgress(ctx.Pid(), name, 1)
processList.RemoveProgressItem(ctx.Pid(), partitionName)
}

onPartitionStart := func(partitionName string) {
processList.AddProgressItem(ctx.Pid(), partitionName, -1)
}

onRowNext := func(partitionName string) {
processList.UpdateProgress(ctx.Pid(), partitionName, 1)
}

var t sql.Table
switch table := n.Table.(type) {
case sql.IndexableTable:
t = plan.NewProcessIndexableTable(table, notify)
t = plan.NewProcessIndexableTable(table, onPartitionDone, onPartitionStart, onRowNext)
default:
t = plan.NewProcessTable(table, notify)
t = plan.NewProcessTable(table, onPartitionDone, onPartitionStart, onRowNext)
}

return plan.NewResolvedTable(t), nil
Expand Down
124 changes: 104 additions & 20 deletions sql/plan/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p *QueryProcess) RowIter(ctx *sql.Context) (sql.RowIter, error) {
return nil, err
}

return &trackedRowIter{iter, p.Notify}, nil
return &trackedRowIter{iter: iter, onDone: p.Notify}, nil
}

func (p *QueryProcess) String() string { return p.Child.String() }
Expand All @@ -48,12 +48,14 @@ func (p *QueryProcess) String() string { return p.Child.String() }
// partition is processed.
type ProcessIndexableTable struct {
sql.IndexableTable
Notify NotifyFunc
OnPartitionDone NamedNotifyFunc
OnPartitionStart NamedNotifyFunc
OnRowNext NamedNotifyFunc
}

// NewProcessIndexableTable returns a new ProcessIndexableTable.
func NewProcessIndexableTable(t sql.IndexableTable, notify NotifyFunc) *ProcessIndexableTable {
return &ProcessIndexableTable{t, notify}
func NewProcessIndexableTable(t sql.IndexableTable, onPartitionDone, onPartitionStart, OnRowNext NamedNotifyFunc) *ProcessIndexableTable {
return &ProcessIndexableTable{t, onPartitionDone, onPartitionStart, OnRowNext}
}

// Underlying implements sql.TableWrapper interface.
Expand All @@ -71,7 +73,7 @@ func (t *ProcessIndexableTable) IndexKeyValues(
return nil, err
}

return &trackedPartitionIndexKeyValueIter{iter, t.Notify}, nil
return &trackedPartitionIndexKeyValueIter{iter, t.OnPartitionDone, t.OnPartitionStart, t.OnRowNext}, nil
}

// PartitionRows implements the sql.Table interface.
Expand All @@ -81,22 +83,46 @@ func (t *ProcessIndexableTable) PartitionRows(ctx *sql.Context, p sql.Partition)
return nil, err
}

return &trackedRowIter{iter, t.Notify}, nil
partitionName := partitionName(p)
if t.OnPartitionStart != nil {
t.OnPartitionStart(partitionName)
}

var onDone NotifyFunc
if t.OnPartitionDone != nil {
onDone = func() {
t.OnPartitionDone(partitionName)
}
}

var onNext NotifyFunc
if t.OnRowNext != nil {
onNext = func() {
t.OnRowNext(partitionName)
}
}

return &trackedRowIter{iter: iter, onNext: onNext, onDone: onDone}, nil
}

var _ sql.IndexableTable = (*ProcessIndexableTable)(nil)

// NamedNotifyFunc is a function to notify about some event with a string argument.
type NamedNotifyFunc func(name string)

// ProcessTable is a wrapper for sql.Tables inside a query process. It
// notifies the process manager about the status of a query when a partition
// is processed.
type ProcessTable struct {
sql.Table
Notify NotifyFunc
OnPartitionDone NamedNotifyFunc
OnPartitionStart NamedNotifyFunc
OnRowNext NamedNotifyFunc
}

// NewProcessTable returns a new ProcessTable.
func NewProcessTable(t sql.Table, notify NotifyFunc) *ProcessTable {
return &ProcessTable{t, notify}
func NewProcessTable(t sql.Table, onPartitionDone, onPartitionStart, OnRowNext NamedNotifyFunc) *ProcessTable {
return &ProcessTable{t, onPartitionDone, onPartitionStart, OnRowNext}
}

// Underlying implements sql.TableWrapper interface.
Expand All @@ -111,18 +137,38 @@ func (t *ProcessTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.Row
return nil, err
}

return &trackedRowIter{iter, t.Notify}, nil
partitionName := partitionName(p)
if t.OnPartitionStart != nil {
t.OnPartitionStart(partitionName)
}

var onDone NotifyFunc
if t.OnPartitionDone != nil {
onDone = func() {
t.OnPartitionDone(partitionName)
}
}

var onNext NotifyFunc
if t.OnRowNext != nil {
onNext = func() {
t.OnRowNext(partitionName)
}
}

return &trackedRowIter{iter: iter, onNext: onNext, onDone: onDone}, nil
}

type trackedRowIter struct {
iter sql.RowIter
notify NotifyFunc
onDone NotifyFunc
onNext NotifyFunc
}

func (i *trackedRowIter) done() {
if i.notify != nil {
i.notify()
i.notify = nil
if i.onDone != nil {
i.onDone()
i.onDone = nil
}
}

Expand All @@ -134,6 +180,11 @@ func (i *trackedRowIter) Next() (sql.Row, error) {
}
return nil, err
}

if i.onNext != nil {
i.onNext()
}

return row, nil
}

Expand All @@ -144,7 +195,9 @@ func (i *trackedRowIter) Close() error {

type trackedPartitionIndexKeyValueIter struct {
sql.PartitionIndexKeyValueIter
notify NotifyFunc
OnPartitionDone NamedNotifyFunc
OnPartitionStart NamedNotifyFunc
OnRowNext NamedNotifyFunc
}

func (i *trackedPartitionIndexKeyValueIter) Next() (sql.Partition, sql.IndexKeyValueIter, error) {
Expand All @@ -153,18 +206,38 @@ func (i *trackedPartitionIndexKeyValueIter) Next() (sql.Partition, sql.IndexKeyV
return nil, nil, err
}

return p, &trackedIndexKeyValueIter{iter, i.notify}, nil
partitionName := partitionName(p)
if i.OnPartitionStart != nil {
i.OnPartitionStart(partitionName)
}

var onDone NotifyFunc
if i.OnPartitionDone != nil {
onDone = func() {
i.OnPartitionDone(partitionName)
}
}

var onNext NotifyFunc
if i.OnRowNext != nil {
onNext = func() {
i.OnRowNext(partitionName)
}
}

return p, &trackedIndexKeyValueIter{iter, onDone, onNext}, nil
}

type trackedIndexKeyValueIter struct {
iter sql.IndexKeyValueIter
notify NotifyFunc
onDone NotifyFunc
onNext NotifyFunc
}

func (i *trackedIndexKeyValueIter) done() {
if i.notify != nil {
i.notify()
i.notify = nil
if i.onDone != nil {
i.onDone()
i.onDone = nil
}
}

Expand All @@ -185,5 +258,16 @@ func (i *trackedIndexKeyValueIter) Next() ([]interface{}, []byte, error) {
return nil, nil, err
}

if i.onNext != nil {
i.onNext()
}

return v, k, nil
}

func partitionName(p sql.Partition) string {
if n, ok := p.(sql.Nameable); ok {
return n.Name()
}
return string(p.Key())
}
36 changes: 28 additions & 8 deletions sql/plan/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func TestProcessTable(t *testing.T) {
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(3)))
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(4)))

var notifications int
var partitionDoneNotifications int
var partitionStartNotifications int
var rowNextNotifications int

node := NewProject(
[]sql.Expression{
Expand All @@ -70,8 +72,14 @@ func TestProcessTable(t *testing.T) {
NewResolvedTable(
NewProcessTable(
table,
func() {
notifications++
func(partitionName string) {
partitionDoneNotifications++
},
func(partitionName string) {
partitionStartNotifications++
},
func(partitionName string) {
rowNextNotifications++
},
),
),
Expand All @@ -91,7 +99,9 @@ func TestProcessTable(t *testing.T) {
}

require.ElementsMatch(expected, rows)
require.Equal(2, notifications)
require.Equal(2, partitionDoneNotifications)
require.Equal(2, partitionStartNotifications)
require.Equal(4, rowNextNotifications)
}

func TestProcessIndexableTable(t *testing.T) {
Expand All @@ -106,12 +116,20 @@ func TestProcessIndexableTable(t *testing.T) {
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(3)))
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(4)))

var notifications int
var partitionDoneNotifications int
var partitionStartNotifications int
var rowNextNotifications int

pt := NewProcessIndexableTable(
table,
func() {
notifications++
func(partitionName string) {
partitionDoneNotifications++
},
func(partitionName string) {
partitionStartNotifications++
},
func(partitionName string) {
rowNextNotifications++
},
)

Expand Down Expand Up @@ -144,5 +162,7 @@ func TestProcessIndexableTable(t *testing.T) {
}

require.ElementsMatch(expectedValues, values)
require.Equal(2, notifications)
require.Equal(2, partitionDoneNotifications)
require.Equal(2, partitionStartNotifications)
require.Equal(4, rowNextNotifications)
}
14 changes: 14 additions & 0 deletions sql/processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@ func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) {
}
}

// RemoveProgressItem removes an existing item tracking progress from the
// process with the given pid, if it exists.
func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) {
pl.mu.Lock()
defer pl.mu.Unlock()

p, ok := pl.procs[pid]
if !ok {
return
}

delete(p.Progress, name)
}

// Kill terminates all queries for a given connection id.
func (pl *ProcessList) Kill(connID uint32) {
pl.mu.Lock()
Expand Down