Skip to content

Commit ac095f9

Browse files
committed
fix limit
1 parent 47627e4 commit ac095f9

File tree

4 files changed

+34
-13
lines changed

4 files changed

+34
-13
lines changed

Diff for: eplan/eplan.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -335,13 +335,15 @@ func createEPlan(node PlanNode, ePlanNodes *[]ENode, executorHeap *util.Heap, pn
335335
if err != nil {
336336
return res, err
337337
}
338+
inputs := []pb.Location{}
338339
for _, inputNode := range inputNodes {
339-
for _, input := range inputNode.GetOutputs() {
340-
output := executorHeap.GetExecutorLoc()
341-
output.ChannelIndex = 0
342-
res = append(res, NewEPlanLimitNode(nodea, input, output))
343-
}
340+
inputs = append(inputs, inputNode.GetOutputs()...)
344341
}
342+
343+
limitNodeLoc := executorHeap.GetExecutorLoc()
344+
limitNodeLoc.ChannelIndex = 0
345+
res = append(res, NewEPlanLimitNode(nodea, inputs, limitNodeLoc))
346+
345347
*ePlanNodes = append(*ePlanNodes, res...)
346348
return res, nil
347349

Diff for: eplan/limit.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
type EPlanLimitNode struct {
1010
Location pb.Location
11-
Input pb.Location
11+
Inputs []pb.Location
1212
Output pb.Location
1313
LimitNumber *int64
1414
Metadata *metadata.Metadata
@@ -19,7 +19,7 @@ func (self *EPlanLimitNode) GetNodeType() EPlanNodeType {
1919
}
2020

2121
func (self *EPlanLimitNode) GetInputs() []pb.Location {
22-
return []pb.Location{self.Input}
22+
return self.Inputs
2323
}
2424

2525
func (self *EPlanLimitNode) GetOutputs() []pb.Location {
@@ -30,10 +30,10 @@ func (self *EPlanLimitNode) GetLocation() pb.Location {
3030
return self.Location
3131
}
3232

33-
func NewEPlanLimitNode(node *PlanLimitNode, input, output pb.Location) *EPlanLimitNode {
33+
func NewEPlanLimitNode(node *PlanLimitNode, inputs []pb.Location, output pb.Location) *EPlanLimitNode {
3434
return &EPlanLimitNode{
3535
Location: output,
36-
Input: input,
36+
Inputs: inputs,
3737
Output: output,
3838
LimitNumber: node.LimitNumber,
3939
Metadata: node.GetMetadata(),

Diff for: executor/limit.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ func (self *Executor) SetInstructionLimit(instruction *pb.Instruction) (err erro
2323
}
2424
self.Instruction = instruction
2525
self.EPlanNode = &enode
26-
self.InputLocations = []*pb.Location{&enode.Input}
26+
self.InputLocations = []*pb.Location{}
27+
for i := 0; i < len(enode.Inputs); i++ {
28+
self.InputLocations = append(self.InputLocations, &(enode.Inputs[i]))
29+
}
2730
self.OutputLocations = []*pb.Location{&enode.Output}
2831
return nil
2932
}
@@ -79,9 +82,24 @@ func (self *Executor) RunLimit() (err error) {
7982
if err != nil {
8083
return err
8184
}
82-
readRowCnt += int64(rg.GetRowsNumber())
83-
if err = rbWriter.Write(rg); err != nil {
84-
return err
85+
if readRowCnt+int64(rg.GetRowsNumber()) <= *(enode.LimitNumber) {
86+
readRowCnt += int64(rg.GetRowsNumber())
87+
if err = rbWriter.Write(rg); err != nil {
88+
return err
89+
}
90+
91+
} else {
92+
for readRowCnt < *(enode.LimitNumber) {
93+
row, err := rg.Read()
94+
if err != nil {
95+
return err
96+
}
97+
if err = rbWriter.WriteRow(row); err != nil {
98+
return err
99+
}
100+
readRowCnt++
101+
102+
}
85103
}
86104
}
87105
}

Diff for: master/ui_task_info.go

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func DrawNode(canvas *svg.SVG, node *SVGNode) {
116116
"DISTINCT LOCAL": "stroke-width:0; fill:rgb(15,18,98);",
117117
"DISTINCT GLOBAL": "stroke-width:0; fill:rgb(15,58,148);",
118118
"SHOW": "stroke-width:0; fill:rgb(125,148,148);",
119+
"BALANCE": "stroke-width:0; fill:rgb(125,18,148);",
119120
"UNKNOWN": "stroke-width:0; fill:rgb(181,181,181);",
120121
}
121122
style := NodeStyle[node.NodeType]

0 commit comments

Comments
 (0)