Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit 68b087c

Browse files
authoredOct 24, 2019
Merge pull request #855 from carlosms/progress-info
Add progress for each partition in SHOW PROCESSLIST
2 parents 19fc282 + a385e34 commit 68b087c

File tree

4 files changed

+158
-31
lines changed

4 files changed

+158
-31
lines changed
 

Diff for: ‎sql/analyzer/process.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,25 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
4444

4545
seen[name] = struct{}{}
4646

47-
notify := func() {
47+
onPartitionDone := func(partitionName string) {
4848
processList.UpdateProgress(ctx.Pid(), name, 1)
49+
processList.RemoveProgressItem(ctx.Pid(), partitionName)
50+
}
51+
52+
onPartitionStart := func(partitionName string) {
53+
processList.AddProgressItem(ctx.Pid(), partitionName, -1)
54+
}
55+
56+
onRowNext := func(partitionName string) {
57+
processList.UpdateProgress(ctx.Pid(), partitionName, 1)
4958
}
5059

5160
var t sql.Table
5261
switch table := n.Table.(type) {
5362
case sql.IndexableTable:
54-
t = plan.NewProcessIndexableTable(table, notify)
63+
t = plan.NewProcessIndexableTable(table, onPartitionDone, onPartitionStart, onRowNext)
5564
default:
56-
t = plan.NewProcessTable(table, notify)
65+
t = plan.NewProcessTable(table, onPartitionDone, onPartitionStart, onRowNext)
5766
}
5867

5968
return plan.NewResolvedTable(t), nil

Diff for: ‎sql/plan/process.go

+104-20
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (p *QueryProcess) RowIter(ctx *sql.Context) (sql.RowIter, error) {
3737
return nil, err
3838
}
3939

40-
return &trackedRowIter{iter, p.Notify}, nil
40+
return &trackedRowIter{iter: iter, onDone: p.Notify}, nil
4141
}
4242

4343
func (p *QueryProcess) String() string { return p.Child.String() }
@@ -48,12 +48,14 @@ func (p *QueryProcess) String() string { return p.Child.String() }
4848
// partition is processed.
4949
type ProcessIndexableTable struct {
5050
sql.IndexableTable
51-
Notify NotifyFunc
51+
OnPartitionDone NamedNotifyFunc
52+
OnPartitionStart NamedNotifyFunc
53+
OnRowNext NamedNotifyFunc
5254
}
5355

5456
// NewProcessIndexableTable returns a new ProcessIndexableTable.
55-
func NewProcessIndexableTable(t sql.IndexableTable, notify NotifyFunc) *ProcessIndexableTable {
56-
return &ProcessIndexableTable{t, notify}
57+
func NewProcessIndexableTable(t sql.IndexableTable, onPartitionDone, onPartitionStart, OnRowNext NamedNotifyFunc) *ProcessIndexableTable {
58+
return &ProcessIndexableTable{t, onPartitionDone, onPartitionStart, OnRowNext}
5759
}
5860

5961
// Underlying implements sql.TableWrapper interface.
@@ -71,7 +73,7 @@ func (t *ProcessIndexableTable) IndexKeyValues(
7173
return nil, err
7274
}
7375

74-
return &trackedPartitionIndexKeyValueIter{iter, t.Notify}, nil
76+
return &trackedPartitionIndexKeyValueIter{iter, t.OnPartitionDone, t.OnPartitionStart, t.OnRowNext}, nil
7577
}
7678

7779
// PartitionRows implements the sql.Table interface.
@@ -81,22 +83,46 @@ func (t *ProcessIndexableTable) PartitionRows(ctx *sql.Context, p sql.Partition)
8183
return nil, err
8284
}
8385

84-
return &trackedRowIter{iter, t.Notify}, nil
86+
partitionName := partitionName(p)
87+
if t.OnPartitionStart != nil {
88+
t.OnPartitionStart(partitionName)
89+
}
90+
91+
var onDone NotifyFunc
92+
if t.OnPartitionDone != nil {
93+
onDone = func() {
94+
t.OnPartitionDone(partitionName)
95+
}
96+
}
97+
98+
var onNext NotifyFunc
99+
if t.OnRowNext != nil {
100+
onNext = func() {
101+
t.OnRowNext(partitionName)
102+
}
103+
}
104+
105+
return &trackedRowIter{iter: iter, onNext: onNext, onDone: onDone}, nil
85106
}
86107

87108
var _ sql.IndexableTable = (*ProcessIndexableTable)(nil)
88109

110+
// NamedNotifyFunc is a function to notify about some event with a string argument.
111+
type NamedNotifyFunc func(name string)
112+
89113
// ProcessTable is a wrapper for sql.Tables inside a query process. It
90114
// notifies the process manager about the status of a query when a partition
91115
// is processed.
92116
type ProcessTable struct {
93117
sql.Table
94-
Notify NotifyFunc
118+
OnPartitionDone NamedNotifyFunc
119+
OnPartitionStart NamedNotifyFunc
120+
OnRowNext NamedNotifyFunc
95121
}
96122

97123
// NewProcessTable returns a new ProcessTable.
98-
func NewProcessTable(t sql.Table, notify NotifyFunc) *ProcessTable {
99-
return &ProcessTable{t, notify}
124+
func NewProcessTable(t sql.Table, onPartitionDone, onPartitionStart, OnRowNext NamedNotifyFunc) *ProcessTable {
125+
return &ProcessTable{t, onPartitionDone, onPartitionStart, OnRowNext}
100126
}
101127

102128
// Underlying implements sql.TableWrapper interface.
@@ -111,18 +137,38 @@ func (t *ProcessTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.Row
111137
return nil, err
112138
}
113139

114-
return &trackedRowIter{iter, t.Notify}, nil
140+
partitionName := partitionName(p)
141+
if t.OnPartitionStart != nil {
142+
t.OnPartitionStart(partitionName)
143+
}
144+
145+
var onDone NotifyFunc
146+
if t.OnPartitionDone != nil {
147+
onDone = func() {
148+
t.OnPartitionDone(partitionName)
149+
}
150+
}
151+
152+
var onNext NotifyFunc
153+
if t.OnRowNext != nil {
154+
onNext = func() {
155+
t.OnRowNext(partitionName)
156+
}
157+
}
158+
159+
return &trackedRowIter{iter: iter, onNext: onNext, onDone: onDone}, nil
115160
}
116161

117162
type trackedRowIter struct {
118163
iter sql.RowIter
119-
notify NotifyFunc
164+
onDone NotifyFunc
165+
onNext NotifyFunc
120166
}
121167

122168
func (i *trackedRowIter) done() {
123-
if i.notify != nil {
124-
i.notify()
125-
i.notify = nil
169+
if i.onDone != nil {
170+
i.onDone()
171+
i.onDone = nil
126172
}
127173
}
128174

@@ -134,6 +180,11 @@ func (i *trackedRowIter) Next() (sql.Row, error) {
134180
}
135181
return nil, err
136182
}
183+
184+
if i.onNext != nil {
185+
i.onNext()
186+
}
187+
137188
return row, nil
138189
}
139190

@@ -144,7 +195,9 @@ func (i *trackedRowIter) Close() error {
144195

145196
type trackedPartitionIndexKeyValueIter struct {
146197
sql.PartitionIndexKeyValueIter
147-
notify NotifyFunc
198+
OnPartitionDone NamedNotifyFunc
199+
OnPartitionStart NamedNotifyFunc
200+
OnRowNext NamedNotifyFunc
148201
}
149202

150203
func (i *trackedPartitionIndexKeyValueIter) Next() (sql.Partition, sql.IndexKeyValueIter, error) {
@@ -153,18 +206,38 @@ func (i *trackedPartitionIndexKeyValueIter) Next() (sql.Partition, sql.IndexKeyV
153206
return nil, nil, err
154207
}
155208

156-
return p, &trackedIndexKeyValueIter{iter, i.notify}, nil
209+
partitionName := partitionName(p)
210+
if i.OnPartitionStart != nil {
211+
i.OnPartitionStart(partitionName)
212+
}
213+
214+
var onDone NotifyFunc
215+
if i.OnPartitionDone != nil {
216+
onDone = func() {
217+
i.OnPartitionDone(partitionName)
218+
}
219+
}
220+
221+
var onNext NotifyFunc
222+
if i.OnRowNext != nil {
223+
onNext = func() {
224+
i.OnRowNext(partitionName)
225+
}
226+
}
227+
228+
return p, &trackedIndexKeyValueIter{iter, onDone, onNext}, nil
157229
}
158230

159231
type trackedIndexKeyValueIter struct {
160232
iter sql.IndexKeyValueIter
161-
notify NotifyFunc
233+
onDone NotifyFunc
234+
onNext NotifyFunc
162235
}
163236

164237
func (i *trackedIndexKeyValueIter) done() {
165-
if i.notify != nil {
166-
i.notify()
167-
i.notify = nil
238+
if i.onDone != nil {
239+
i.onDone()
240+
i.onDone = nil
168241
}
169242
}
170243

@@ -185,5 +258,16 @@ func (i *trackedIndexKeyValueIter) Next() ([]interface{}, []byte, error) {
185258
return nil, nil, err
186259
}
187260

261+
if i.onNext != nil {
262+
i.onNext()
263+
}
264+
188265
return v, k, nil
189266
}
267+
268+
func partitionName(p sql.Partition) string {
269+
if n, ok := p.(sql.Nameable); ok {
270+
return n.Name()
271+
}
272+
return string(p.Key())
273+
}

Diff for: ‎sql/plan/process_test.go

+28-8
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ func TestProcessTable(t *testing.T) {
6161
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(3)))
6262
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(4)))
6363

64-
var notifications int
64+
var partitionDoneNotifications int
65+
var partitionStartNotifications int
66+
var rowNextNotifications int
6567

6668
node := NewProject(
6769
[]sql.Expression{
@@ -70,8 +72,14 @@ func TestProcessTable(t *testing.T) {
7072
NewResolvedTable(
7173
NewProcessTable(
7274
table,
73-
func() {
74-
notifications++
75+
func(partitionName string) {
76+
partitionDoneNotifications++
77+
},
78+
func(partitionName string) {
79+
partitionStartNotifications++
80+
},
81+
func(partitionName string) {
82+
rowNextNotifications++
7583
},
7684
),
7785
),
@@ -91,7 +99,9 @@ func TestProcessTable(t *testing.T) {
9199
}
92100

93101
require.ElementsMatch(expected, rows)
94-
require.Equal(2, notifications)
102+
require.Equal(2, partitionDoneNotifications)
103+
require.Equal(2, partitionStartNotifications)
104+
require.Equal(4, rowNextNotifications)
95105
}
96106

97107
func TestProcessIndexableTable(t *testing.T) {
@@ -106,12 +116,20 @@ func TestProcessIndexableTable(t *testing.T) {
106116
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(3)))
107117
table.Insert(sql.NewEmptyContext(), sql.NewRow(int64(4)))
108118

109-
var notifications int
119+
var partitionDoneNotifications int
120+
var partitionStartNotifications int
121+
var rowNextNotifications int
110122

111123
pt := NewProcessIndexableTable(
112124
table,
113-
func() {
114-
notifications++
125+
func(partitionName string) {
126+
partitionDoneNotifications++
127+
},
128+
func(partitionName string) {
129+
partitionStartNotifications++
130+
},
131+
func(partitionName string) {
132+
rowNextNotifications++
115133
},
116134
)
117135

@@ -144,5 +162,7 @@ func TestProcessIndexableTable(t *testing.T) {
144162
}
145163

146164
require.ElementsMatch(expectedValues, values)
147-
require.Equal(2, notifications)
165+
require.Equal(2, partitionDoneNotifications)
166+
require.Equal(2, partitionStartNotifications)
167+
require.Equal(4, rowNextNotifications)
148168
}

Diff for: ‎sql/processlist.go

+14
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,20 @@ func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) {
156156
}
157157
}
158158

159+
// RemoveProgressItem removes an existing item tracking progress from the
160+
// process with the given pid, if it exists.
161+
func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) {
162+
pl.mu.Lock()
163+
defer pl.mu.Unlock()
164+
165+
p, ok := pl.procs[pid]
166+
if !ok {
167+
return
168+
}
169+
170+
delete(p.Progress, name)
171+
}
172+
159173
// Kill terminates all queries for a given connection id.
160174
func (pl *ProcessList) Kill(connID uint32) {
161175
pl.mu.Lock()

0 commit comments

Comments
 (0)
This repository has been archived.