Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit 62780e1

Browse files
authored
Format SHOW PROCESSLIST progress as a tree (#861)
Format SHOW PROCESSLIST progress as a tree
2 parents 5b68206 + e81f029 commit 62780e1

6 files changed

+191
-47
lines changed

sql/analyzer/process.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,21 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
4040
}
4141
total = count
4242
}
43-
processList.AddProgressItem(ctx.Pid(), name, total)
43+
processList.AddTableProgress(ctx.Pid(), name, total)
4444

4545
seen[name] = struct{}{}
4646

4747
onPartitionDone := func(partitionName string) {
48-
processList.UpdateProgress(ctx.Pid(), name, 1)
49-
processList.RemoveProgressItem(ctx.Pid(), partitionName)
48+
processList.UpdateTableProgress(ctx.Pid(), name, 1)
49+
processList.RemovePartitionProgress(ctx.Pid(), name, partitionName)
5050
}
5151

5252
onPartitionStart := func(partitionName string) {
53-
processList.AddProgressItem(ctx.Pid(), partitionName, -1)
53+
processList.AddPartitionProgress(ctx.Pid(), name, partitionName, -1)
5454
}
5555

5656
onRowNext := func(partitionName string) {
57-
processList.UpdateProgress(ctx.Pid(), partitionName, 1)
57+
processList.UpdatePartitionProgress(ctx.Pid(), name, partitionName, 1)
5858
}
5959

6060
var t sql.Table

sql/analyzer/process_test.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,16 @@ func TestTrackProcess(t *testing.T) {
3535
require.Len(processes, 1)
3636
require.Equal("SELECT foo", processes[0].Query)
3737
require.Equal(sql.QueryProcess, processes[0].Type)
38-
require.Equal(map[string]sql.Progress{
39-
"foo": sql.Progress{Total: 2},
40-
"bar": sql.Progress{Total: 4},
41-
}, processes[0].Progress)
38+
require.Equal(
39+
map[string]sql.TableProgress{
40+
"foo": sql.TableProgress{
41+
Progress: sql.Progress{Name: "foo", Done: 0, Total: 2},
42+
PartitionsProgress: map[string]sql.PartitionProgress{}},
43+
"bar": sql.TableProgress{
44+
Progress: sql.Progress{Name: "bar", Done: 0, Total: 4},
45+
PartitionsProgress: map[string]sql.PartitionProgress{}},
46+
},
47+
processes[0].Progress)
4248

4349
proc, ok := result.(*plan.QueryProcess)
4450
require.True(ok)

sql/plan/processlist.go

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package plan
22

33
import (
4-
"fmt"
54
"sort"
65
"strings"
76

@@ -77,21 +76,36 @@ func (p *ShowProcessList) RowIter(ctx *sql.Context) (sql.RowIter, error) {
7776

7877
for i, proc := range processes {
7978
var status []string
80-
for name, progress := range proc.Progress {
81-
status = append(status, fmt.Sprintf("%s(%s)", name, progress))
79+
var names []string
80+
for name := range proc.Progress {
81+
names = append(names, name)
82+
}
83+
sort.Strings(names)
84+
85+
for _, name := range names {
86+
progress := proc.Progress[name]
87+
88+
printer := sql.NewTreePrinter()
89+
_ = printer.WriteNode("\n" + progress.String())
90+
children := []string{}
91+
for _, partitionProgress := range progress.PartitionsProgress {
92+
children = append(children, partitionProgress.String())
93+
}
94+
sort.Strings(children)
95+
_ = printer.WriteChildren(children...)
96+
97+
status = append(status, printer.String())
8298
}
8399

84100
if len(status) == 0 {
85101
status = []string{"running"}
86102
}
87103

88-
sort.Strings(status)
89-
90104
rows[i] = process{
91105
id: int64(proc.Connection),
92106
user: proc.User,
93107
time: int64(proc.Seconds()),
94-
state: strings.Join(status, ", "),
108+
state: strings.Join(status, ""),
95109
command: proc.Type.String(),
96110
host: ctx.Session.Client().Address,
97111
info: proc.Query,

sql/plan/processlist_test.go

+18-9
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,21 @@ func TestShowProcessList(t *testing.T) {
2121
ctx, err := p.AddProcess(ctx, sql.QueryProcess, "SELECT foo")
2222
require.NoError(err)
2323

24-
p.AddProgressItem(ctx.Pid(), "a", 5)
25-
p.AddProgressItem(ctx.Pid(), "b", 6)
24+
p.AddTableProgress(ctx.Pid(), "a", 5)
25+
p.AddTableProgress(ctx.Pid(), "b", 6)
2626

2727
ctx = sql.NewContext(context.Background(), sql.WithPid(2), sql.WithSession(sess))
2828
ctx, err = p.AddProcess(ctx, sql.CreateIndexProcess, "SELECT bar")
2929
require.NoError(err)
3030

31-
p.AddProgressItem(ctx.Pid(), "foo", 2)
31+
p.AddTableProgress(ctx.Pid(), "foo", 2)
3232

33-
p.UpdateProgress(1, "a", 3)
34-
p.UpdateProgress(1, "a", 1)
35-
p.UpdateProgress(1, "b", 2)
36-
p.UpdateProgress(2, "foo", 1)
33+
p.UpdateTableProgress(1, "a", 3)
34+
p.UpdateTableProgress(1, "a", 1)
35+
p.UpdatePartitionProgress(1, "a", "a-1", 7)
36+
p.UpdatePartitionProgress(1, "a", "a-2", 9)
37+
p.UpdateTableProgress(1, "b", 2)
38+
p.UpdateTableProgress(2, "foo", 1)
3739

3840
n.ProcessList = p
3941
n.Database = "foo"
@@ -44,8 +46,15 @@ func TestShowProcessList(t *testing.T) {
4446
require.NoError(err)
4547

4648
expected := []sql.Row{
47-
{int64(1), "foo", addr, "foo", "query", int64(0), "a(4/5), b(2/6)", "SELECT foo"},
48-
{int64(1), "foo", addr, "foo", "create_index", int64(0), "foo(1/2)", "SELECT bar"},
49+
{int64(1), "foo", addr, "foo", "query", int64(0),
50+
`
51+
a (4/5 partitions)
52+
├─ a-1 (7/? rows)
53+
└─ a-2 (9/? rows)
54+
55+
b (2/6 partitions)
56+
`, "SELECT foo"},
57+
{int64(1), "foo", addr, "foo", "create_index", int64(0), "\nfoo (1/2 partitions)\n", "SELECT bar"},
4958
}
5059

5160
require.ElementsMatch(expected, rows)

sql/processlist.go

+111-13
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,46 @@ import (
1212

1313
// Progress between done items and total items.
1414
type Progress struct {
15+
Name string
1516
Done int64
1617
Total int64
1718
}
1819

19-
func (p Progress) String() string {
20+
func (p Progress) totalString() string {
2021
var total = "?"
2122
if p.Total > 0 {
2223
total = fmt.Sprint(p.Total)
2324
}
25+
return total
26+
}
27+
28+
// TableProgress keeps track of a table progress, and for each of its partitions
29+
type TableProgress struct {
30+
Progress
31+
PartitionsProgress map[string]PartitionProgress
32+
}
33+
34+
func NewTableProgress(name string, total int64) TableProgress {
35+
return TableProgress{
36+
Progress: Progress{
37+
Name: name,
38+
Total: total,
39+
},
40+
PartitionsProgress: make(map[string]PartitionProgress),
41+
}
42+
}
43+
44+
func (p TableProgress) String() string {
45+
return fmt.Sprintf("%s (%d/%s partitions)", p.Name, p.Done, p.totalString())
46+
}
2447

25-
return fmt.Sprintf("%d/%s", p.Done, total)
48+
// PartitionProgress keeps track of a partition progress
49+
type PartitionProgress struct {
50+
Progress
51+
}
52+
53+
func (p PartitionProgress) String() string {
54+
return fmt.Sprintf("%s (%d/%s rows)", p.Name, p.Done, p.totalString())
2655
}
2756

2857
// ProcessType is the type of process.
@@ -53,7 +82,7 @@ type Process struct {
5382
User string
5483
Type ProcessType
5584
Query string
56-
Progress map[string]Progress
85+
Progress map[string]TableProgress
5786
StartedAt time.Time
5887
Kill context.CancelFunc
5988
}
@@ -108,7 +137,7 @@ func (pl *ProcessList) AddProcess(
108137
Connection: ctx.ID(),
109138
Type: typ,
110139
Query: query,
111-
Progress: make(map[string]Progress),
140+
Progress: make(map[string]TableProgress),
112141
User: ctx.Session.Client().User,
113142
StartedAt: time.Now(),
114143
Kill: cancel,
@@ -117,9 +146,9 @@ func (pl *ProcessList) AddProcess(
117146
return ctx, nil
118147
}
119148

120-
// UpdateProgress updates the progress of the item with the given name for the
149+
// UpdateTableProgress updates the progress of the table with the given name for the
121150
// process with the given pid.
122-
func (pl *ProcessList) UpdateProgress(pid uint64, name string, delta int64) {
151+
func (pl *ProcessList) UpdateTableProgress(pid uint64, name string, delta int64) {
123152
pl.mu.Lock()
124153
defer pl.mu.Unlock()
125154

@@ -130,16 +159,41 @@ func (pl *ProcessList) UpdateProgress(pid uint64, name string, delta int64) {
130159

131160
progress, ok := p.Progress[name]
132161
if !ok {
133-
progress = Progress{Total: -1}
162+
progress = NewTableProgress(name, -1)
134163
}
135164

136165
progress.Done += delta
137166
p.Progress[name] = progress
138167
}
139168

140-
// AddProgressItem adds a new item to track progress from to the process with
169+
// UpdatePartitionProgress updates the progress of the table partition with the
170+
// given name for the process with the given pid.
171+
func (pl *ProcessList) UpdatePartitionProgress(pid uint64, tableName, partitionName string, delta int64) {
172+
pl.mu.Lock()
173+
defer pl.mu.Unlock()
174+
175+
p, ok := pl.procs[pid]
176+
if !ok {
177+
return
178+
}
179+
180+
tablePg, ok := p.Progress[tableName]
181+
if !ok {
182+
return
183+
}
184+
185+
partitionPg, ok := tablePg.PartitionsProgress[partitionName]
186+
if !ok {
187+
partitionPg = PartitionProgress{Progress: Progress{Name: partitionName, Total: -1}}
188+
}
189+
190+
partitionPg.Done += delta
191+
tablePg.PartitionsProgress[partitionName] = partitionPg
192+
}
193+
194+
// AddTableProgress adds a new item to track progress from to the process with
141195
// the given pid. If the pid does not exist, it will do nothing.
142-
func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) {
196+
func (pl *ProcessList) AddTableProgress(pid uint64, name string, total int64) {
143197
pl.mu.Lock()
144198
defer pl.mu.Unlock()
145199

@@ -152,13 +206,38 @@ func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) {
152206
pg.Total = total
153207
p.Progress[name] = pg
154208
} else {
155-
p.Progress[name] = Progress{Total: total}
209+
p.Progress[name] = NewTableProgress(name, total)
156210
}
157211
}
158212

159-
// RemoveProgressItem removes an existing item tracking progress from the
213+
// AddPartitionProgress adds a new item to track progress from to the process with
214+
// the given pid. If the pid or the table does not exist, it will do nothing.
215+
func (pl *ProcessList) AddPartitionProgress(pid uint64, tableName, partitionName string, total int64) {
216+
pl.mu.Lock()
217+
defer pl.mu.Unlock()
218+
219+
p, ok := pl.procs[pid]
220+
if !ok {
221+
return
222+
}
223+
224+
tablePg, ok := p.Progress[tableName]
225+
if !ok {
226+
return
227+
}
228+
229+
if pg, ok := tablePg.PartitionsProgress[partitionName]; ok {
230+
pg.Total = total
231+
tablePg.PartitionsProgress[partitionName] = pg
232+
} else {
233+
tablePg.PartitionsProgress[partitionName] =
234+
PartitionProgress{Progress: Progress{Name: partitionName, Total: total}}
235+
}
236+
}
237+
238+
// RemoveTableProgress removes an existing item tracking progress from the
160239
// process with the given pid, if it exists.
161-
func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) {
240+
func (pl *ProcessList) RemoveTableProgress(pid uint64, name string) {
162241
pl.mu.Lock()
163242
defer pl.mu.Unlock()
164243

@@ -170,6 +249,25 @@ func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) {
170249
delete(p.Progress, name)
171250
}
172251

252+
// RemovePartitionProgress removes an existing item tracking progress from the
253+
// process with the given pid, if it exists.
254+
func (pl *ProcessList) RemovePartitionProgress(pid uint64, tableName, partitionName string) {
255+
pl.mu.Lock()
256+
defer pl.mu.Unlock()
257+
258+
p, ok := pl.procs[pid]
259+
if !ok {
260+
return
261+
}
262+
263+
tablePg, ok := p.Progress[tableName]
264+
if !ok {
265+
return
266+
}
267+
268+
delete(tablePg.PartitionsProgress, partitionName)
269+
}
270+
173271
// Kill terminates all queries for a given connection id.
174272
func (pl *ProcessList) Kill(connID uint32) {
175273
pl.mu.Lock()
@@ -220,7 +318,7 @@ func (pl *ProcessList) Processes() []Process {
220318

221319
for _, proc := range pl.procs {
222320
p := *proc
223-
var progress = make(map[string]Progress, len(p.Progress))
321+
var progress = make(map[string]TableProgress, len(p.Progress))
224322
for n, p := range p.Progress {
225323
progress[n] = p
226324
}

0 commit comments

Comments
 (0)