Skip to content

Commit aedbbf6

Browse files
committed
fix: use the object heap to keep the min ddl ts order
Signed-off-by: SimFG <[email protected]>
1 parent d6206ad commit aedbbf6

File tree

4 files changed

+150
-8
lines changed

4 files changed

+150
-8
lines changed

Diff for: internal/rootcoord/scheduler.go

+23-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/milvus-io/milvus/internal/tso"
2929
"github.com/milvus-io/milvus/pkg/log"
3030
"github.com/milvus-io/milvus/pkg/util/lock"
31+
"github.com/milvus-io/milvus/pkg/util/typeutil"
3132
)
3233

3334
type IScheduler interface {
@@ -46,6 +47,7 @@ type scheduler struct {
4647
tsoAllocator tso.Allocator
4748

4849
taskChan chan task
50+
taskHeap typeutil.Heap[task]
4951

5052
lock sync.Mutex
5153

@@ -56,16 +58,22 @@ type scheduler struct {
5658
lockMapping map[LockLevel]*lock.KeyLock[string]
5759
}
5860

61+
func GetTaskHeapOrder(t task) Timestamp {
62+
return t.GetTs()
63+
}
64+
5965
func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler {
6066
ctx1, cancel := context.WithCancel(ctx)
6167
// TODO
6268
n := 1024 * 10
69+
taskArr := make([]task, 0)
6370
s := &scheduler{
6471
ctx: ctx1,
6572
cancel: cancel,
6673
idAllocator: idAllocator,
6774
tsoAllocator: tsoAllocator,
6875
taskChan: make(chan task, n),
76+
taskHeap: typeutil.NewObjectArrayBasedMinimumHeap[task, Timestamp](taskArr, GetTaskHeapOrder),
6977
minDdlTs: *atomic.NewUint64(0),
7078
clusterLock: lock.NewKeyLock[string](),
7179
databaseLock: lock.NewKeyLock[string](),
@@ -93,7 +101,7 @@ func (s *scheduler) Stop() {
93101
}
94102

95103
func (s *scheduler) execute(task task) {
96-
defer s.setMinDdlTs(task.GetTs()) // we should update ts, whatever task succeeds or not.
104+
defer s.setMinDdlTs() // we should update ts, whatever task succeeds or not.
97105
task.SetInQueueDuration()
98106
if err := task.Prepare(task.GetCtx()); err != nil {
99107
task.NotifyDone(err)
@@ -153,6 +161,7 @@ func (s *scheduler) setTs(task task) error {
153161
return err
154162
}
155163
task.SetTs(ts)
164+
s.taskHeap.Push(task)
156165
return nil
157166
}
158167

@@ -186,18 +195,29 @@ func (s *scheduler) GetMinDdlTs() Timestamp {
186195
return s.minDdlTs.Load()
187196
}
188197

189-
func (s *scheduler) setMinDdlTs(ts Timestamp) {
190-
s.minDdlTs.Store(ts)
198+
func (s *scheduler) setMinDdlTs() {
199+
s.lock.Lock()
200+
defer s.lock.Unlock()
201+
202+
header := s.taskHeap.Peek()
203+
for header != nil && header.IsFinished() {
204+
s.minDdlTs.Store(header.GetTs())
205+
s.taskHeap.Pop()
206+
header = s.taskHeap.Peek()
207+
}
191208
}
192209

193210
func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
194211
if lockerKey == nil {
195212
if err := s.setID(task); err != nil {
196213
return err
197214
}
215+
s.lock.Lock()
198216
if err := s.setTs(task); err != nil {
217+
s.lock.Unlock()
199218
return err
200219
}
220+
s.lock.Unlock()
201221
s.execute(task)
202222
return nil
203223
}

Diff for: internal/rootcoord/task.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package rootcoord
1919
import (
2020
"context"
2121
"fmt"
22+
"sync/atomic"
2223
"time"
2324

2425
"go.uber.org/zap"
@@ -53,16 +54,18 @@ type task interface {
5354
Execute(ctx context.Context) error
5455
WaitToFinish() error
5556
NotifyDone(err error)
57+
IsFinished() bool
5658
SetInQueueDuration()
5759
GetLockerKey() LockerKey
5860
}
5961

6062
type baseTask struct {
61-
ctx context.Context
62-
core *Core
63-
done chan error
64-
ts Timestamp
65-
id UniqueID
63+
ctx context.Context
64+
core *Core
65+
done chan error
66+
isFinished atomic.Bool
67+
ts Timestamp
68+
id UniqueID
6669

6770
tr *timerecord.TimeRecorder
6871
queueDur time.Duration
@@ -116,12 +119,17 @@ func (b *baseTask) WaitToFinish() error {
116119

117120
func (b *baseTask) NotifyDone(err error) {
118121
b.done <- err
122+
b.isFinished.Store(true)
119123
}
120124

121125
func (b *baseTask) SetInQueueDuration() {
122126
b.queueDur = b.tr.ElapseSpan()
123127
}
124128

129+
func (b *baseTask) IsFinished() bool {
130+
return b.isFinished.Load()
131+
}
132+
125133
func (b *baseTask) GetLockerKey() LockerKey {
126134
return nil
127135
}

Diff for: pkg/util/typeutil/heap.go

+64
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,39 @@ func (h *heapArray[E]) Peek() interface{} {
6969
return (*h)[0]
7070
}
7171

72+
type objectHeapArray[O any, E constraints.Ordered] struct {
73+
objects []O
74+
getOrderFunc func(O) E
75+
}
76+
77+
func (h *objectHeapArray[O, E]) Len() int {
78+
return len(h.objects)
79+
}
80+
81+
func (h *objectHeapArray[O, E]) Less(i, j int) bool {
82+
return h.getOrderFunc(h.objects[i]) < h.getOrderFunc(h.objects[j])
83+
}
84+
85+
func (h *objectHeapArray[O, E]) Swap(i, j int) {
86+
h.objects[i], h.objects[j] = h.objects[j], h.objects[i]
87+
}
88+
89+
func (h *objectHeapArray[O, E]) Push(x interface{}) {
90+
h.objects = append(h.objects, x.(O))
91+
}
92+
93+
func (h *objectHeapArray[O, E]) Pop() interface{} {
94+
old := h.objects
95+
n := len(old)
96+
x := old[n-1]
97+
h.objects = old[0 : n-1]
98+
return x
99+
}
100+
101+
func (h *objectHeapArray[O, E]) Peek() interface{} {
102+
return h.objects[0]
103+
}
104+
72105
// reverseOrderedInterface is a heap base interface that reverses the order of the elements.
73106
type reverseOrderedInterface[E constraints.Ordered] struct {
74107
HeapInterface
@@ -107,6 +140,37 @@ func NewArrayBasedMinimumHeap[E constraints.Ordered](initial []E) Heap[E] {
107140
}
108141
}
109142

143+
func NewObjectArrayBasedMaximumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] {
144+
if initial == nil {
145+
initial = make([]O, 0)
146+
}
147+
ha := &objectHeapArray[O, E]{
148+
objects: initial,
149+
getOrderFunc: getOrderFunc,
150+
}
151+
reverse := reverseOrderedInterface[E]{
152+
HeapInterface: ha,
153+
}
154+
heap.Init(reverse)
155+
return &heapImpl[O, reverseOrderedInterface[E]]{
156+
inner: reverse,
157+
}
158+
}
159+
160+
func NewObjectArrayBasedMinimumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] {
161+
if initial == nil {
162+
initial = make([]O, 0)
163+
}
164+
ha := &objectHeapArray[O, E]{
165+
objects: initial,
166+
getOrderFunc: getOrderFunc,
167+
}
168+
heap.Init(ha)
169+
return &heapImpl[O, *objectHeapArray[O, E]]{
170+
inner: ha,
171+
}
172+
}
173+
110174
// heapImpl is a min-heap of E.
111175
type heapImpl[E any, H HeapInterface] struct {
112176
inner H

Diff for: pkg/util/typeutil/heap_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,53 @@ func TestMaximumHeap(t *testing.T) {
3939
assert.Equal(t, i, heap.Pop())
4040
}
4141
}
42+
43+
type FooHeapObject struct {
44+
value int
45+
}
46+
47+
func GetFooHeapObjectOrderFunc(obj *FooHeapObject) int {
48+
return obj.value
49+
}
50+
51+
func TestMinimumObjectHeap(t *testing.T) {
52+
h := []*FooHeapObject{
53+
{value: 4},
54+
{value: 5},
55+
{value: 2},
56+
}
57+
heap := NewObjectArrayBasedMinimumHeap(h, GetFooHeapObjectOrderFunc)
58+
assert.Equal(t, 2, heap.Peek().value)
59+
assert.Equal(t, 3, heap.Len())
60+
heap.Push(&FooHeapObject{value: 3})
61+
assert.Equal(t, 2, heap.Peek().value)
62+
assert.Equal(t, 4, heap.Len())
63+
heap.Push(&FooHeapObject{value: 1})
64+
assert.Equal(t, 1, heap.Peek().value)
65+
assert.Equal(t, 5, heap.Len())
66+
for i := 1; i <= 5; i++ {
67+
assert.Equal(t, i, heap.Peek().value)
68+
assert.Equal(t, i, heap.Pop().value)
69+
}
70+
}
71+
72+
func TestMaximumObjectHeap(t *testing.T) {
73+
h := []*FooHeapObject{
74+
{value: 4},
75+
{value: 1},
76+
{value: 2},
77+
}
78+
heap := NewObjectArrayBasedMaximumHeap(h, GetFooHeapObjectOrderFunc)
79+
assert.Equal(t, 4, heap.Peek().value)
80+
assert.Equal(t, 3, heap.Len())
81+
heap.Push(&FooHeapObject{value: 3})
82+
assert.Equal(t, 4, heap.Peek().value)
83+
assert.Equal(t, 4, heap.Len())
84+
heap.Push(&FooHeapObject{value: 5})
85+
assert.Equal(t, 5, heap.Peek().value)
86+
assert.Equal(t, 5, heap.Len())
87+
for i := 5; i >= 1; i-- {
88+
assert.Equal(t, i, heap.Peek().value)
89+
assert.Equal(t, i, heap.Pop().value)
90+
}
91+
}

0 commit comments

Comments
 (0)