Skip to content

Commit e501025

Browse files
authored
enhance: simplify compaction tasks to reduce their memory overhead (#39121)
See #39080 --------- Signed-off-by: Ted Xu <[email protected]>
1 parent f5234c3 commit e501025

File tree

7 files changed

+34
-254
lines changed

7 files changed

+34
-254
lines changed

internal/datacoord/compaction.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"time"
2525

2626
"github.com/cockroachdb/errors"
27-
"go.opentelemetry.io/otel"
2827
"go.uber.org/zap"
2928

3029
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@@ -592,8 +591,6 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
592591
}
593592

594593
func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
595-
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
596-
t.SetSpan(span)
597594
if err := c.queueTasks.Enqueue(t); err != nil {
598595
return err
599596
}
@@ -603,8 +600,6 @@ func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
603600

604601
// restoreTask used to restore Task from etcd
605602
func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
606-
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
607-
t.SetSpan(span)
608603
c.executingGuard.Lock()
609604
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
610605
c.executingGuard.Unlock()

internal/datacoord/compaction_task.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package datacoord
1818

1919
import (
20-
"go.opentelemetry.io/otel/trace"
21-
2220
"github.com/milvus-io/milvus/pkg/proto/datapb"
2321
)
2422

@@ -40,13 +38,10 @@ type CompactionTask interface {
4038

4139
SetTask(*datapb.CompactionTask)
4240
GetTaskProto() *datapb.CompactionTask
43-
SetPlan(plan *datapb.CompactionPlan)
4441
ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask
4542

4643
SetNodeID(UniqueID) error
4744
NeedReAssignNodeID() bool
48-
GetSpan() trace.Span
49-
SetSpan(trace.Span)
5045
SaveTaskMeta() error
5146
}
5247

internal/datacoord/compaction_task_clustering.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525

2626
"github.com/cockroachdb/errors"
2727
"github.com/samber/lo"
28-
"go.opentelemetry.io/otel/trace"
2928
"go.uber.org/atomic"
3029
"go.uber.org/zap"
3130
"google.golang.org/protobuf/proto"
@@ -52,15 +51,13 @@ type clusteringCompactionTask struct {
5251
plan *datapb.CompactionPlan
5352
result *datapb.CompactionPlanResult
5453

55-
span trace.Span
5654
allocator allocator.Allocator
5755
meta CompactionMeta
5856
sessions session.DataNodeManager
5957
handler Handler
6058
analyzeScheduler *taskScheduler
6159

6260
maxRetryTimes int32
63-
slotUsage int64
6461
}
6562

6663
func (t *clusteringCompactionTask) GetTaskProto() *datapb.CompactionTask {
@@ -79,7 +76,6 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.A
7976
handler: handler,
8077
analyzeScheduler: analyzeScheduler,
8178
maxRetryTimes: 3,
82-
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
8379
}
8480
task.taskProto.Store(t)
8581
return task
@@ -272,7 +268,6 @@ func (t *clusteringCompactionTask) processExecuting() error {
272268
switch result.GetState() {
273269
case datapb.CompactionTaskState_completed:
274270
t.result = result
275-
result := t.result
276271
if len(result.GetSegments()) == 0 {
277272
log.Warn("illegal compaction results, this should not happen")
278273
return merr.WrapErrCompactionResult("compaction result is empty")
@@ -766,24 +761,10 @@ func (t *clusteringCompactionTask) GetResult() *datapb.CompactionPlanResult {
766761
return t.result
767762
}
768763

769-
func (t *clusteringCompactionTask) GetSpan() trace.Span {
770-
return t.span
771-
}
772-
773-
func (t *clusteringCompactionTask) EndSpan() {
774-
if t.span != nil {
775-
t.span.End()
776-
}
777-
}
778-
779764
func (t *clusteringCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
780765
t.result = result
781766
}
782767

783-
func (t *clusteringCompactionTask) SetSpan(span trace.Span) {
784-
t.span = span
785-
}
786-
787768
func (t *clusteringCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
788769
t.plan = plan
789770
}
@@ -805,5 +786,5 @@ func (t *clusteringCompactionTask) NeedReAssignNodeID() bool {
805786
}
806787

807788
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
808-
return t.slotUsage
789+
return paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
809790
}

internal/datacoord/compaction_task_l0.go

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
"github.com/cockroachdb/errors"
2525
"github.com/samber/lo"
26-
"go.opentelemetry.io/otel/trace"
2726
"go.uber.org/atomic"
2827
"go.uber.org/zap"
2928
"google.golang.org/protobuf/proto"
@@ -42,15 +41,10 @@ var _ CompactionTask = (*l0CompactionTask)(nil)
4241

4342
type l0CompactionTask struct {
4443
taskProto atomic.Value // *datapb.CompactionTask
45-
plan *datapb.CompactionPlan
46-
result *datapb.CompactionPlanResult
4744

48-
span trace.Span
4945
allocator allocator.Allocator
5046
sessions session.DataNodeManager
5147
meta CompactionMeta
52-
53-
slotUsage int64
5448
}
5549

5650
func (t *l0CompactionTask) GetTaskProto() *datapb.CompactionTask {
@@ -66,7 +60,6 @@ func newL0CompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator
6660
allocator: allocator,
6761
meta: meta,
6862
sessions: session,
69-
slotUsage: paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
7063
}
7164
task.taskProto.Store(t)
7265
return task
@@ -96,8 +89,7 @@ func (t *l0CompactionTask) processPipelining() bool {
9689
}
9790

9891
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("nodeID", t.GetTaskProto().GetNodeID()))
99-
var err error
100-
t.plan, err = t.BuildCompactionRequest()
92+
plan, err := t.BuildCompactionRequest()
10193
if err != nil {
10294
log.Warn("l0CompactionTask failed to build compaction request", zap.Error(err))
10395
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
@@ -109,7 +101,7 @@ func (t *l0CompactionTask) processPipelining() bool {
109101
return t.processFailed()
110102
}
111103

112-
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan())
104+
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), plan)
113105
if err != nil {
114106
log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
115107
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
@@ -132,8 +124,7 @@ func (t *l0CompactionTask) processExecuting() bool {
132124
}
133125
switch result.GetState() {
134126
case datapb.CompactionTaskState_completed:
135-
t.result = result
136-
if err := t.saveSegmentMeta(); err != nil {
127+
if err := t.saveSegmentMeta(result); err != nil {
137128
log.Warn("l0CompactionTask failed to save segment meta", zap.Error(err))
138129
return false
139130
}
@@ -142,6 +133,7 @@ func (t *l0CompactionTask) processExecuting() bool {
142133
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
143134
return false
144135
}
136+
UpdateCompactionSegmentSizeMetrics(result.GetSegments())
145137
return t.processMetaSaved()
146138
case datapb.CompactionTaskState_failed:
147139
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
@@ -173,7 +165,6 @@ func (t *l0CompactionTask) processCompleted() bool {
173165
}
174166

175167
t.resetSegmentCompacting()
176-
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
177168
task := t.taskProto.Load().(*datapb.CompactionTask)
178169
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", task.GetPlanID()),
179170
zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
@@ -212,40 +203,10 @@ func (t *l0CompactionTask) Clean() bool {
212203
return t.doClean() == nil
213204
}
214205

215-
func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult {
216-
return t.result
217-
}
218-
219-
func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
220-
t.result = result
221-
}
222-
223206
func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) {
224207
t.taskProto.Store(task)
225208
}
226209

227-
func (t *l0CompactionTask) GetSpan() trace.Span {
228-
return t.span
229-
}
230-
231-
func (t *l0CompactionTask) SetSpan(span trace.Span) {
232-
t.span = span
233-
}
234-
235-
func (t *l0CompactionTask) EndSpan() {
236-
if t.span != nil {
237-
t.span.End()
238-
}
239-
}
240-
241-
func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) {
242-
t.plan = plan
243-
}
244-
245-
func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan {
246-
return t.plan
247-
}
248-
249210
func (t *l0CompactionTask) GetLabel() string {
250211
return fmt.Sprintf("%d-%s", t.GetTaskProto().PartitionID, t.GetTaskProto().GetChannel())
251212
}
@@ -373,8 +334,7 @@ func (t *l0CompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
373334
return t.meta.SaveCompactionTask(context.TODO(), task)
374335
}
375336

376-
func (t *l0CompactionTask) saveSegmentMeta() error {
377-
result := t.result
337+
func (t *l0CompactionTask) saveSegmentMeta(result *datapb.CompactionPlanResult) error {
378338
var operators []UpdateOperator
379339
for _, seg := range result.GetSegments() {
380340
operators = append(operators, AddBinlogsOperator(seg.GetSegmentID(), nil, nil, seg.GetDeltalogs(), nil))
@@ -392,5 +352,5 @@ func (t *l0CompactionTask) saveSegmentMeta() error {
392352
}
393353

394354
func (t *l0CompactionTask) GetSlotUsage() int64 {
395-
return t.slotUsage
355+
return paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
396356
}

internal/datacoord/compaction_task_l0_test.go

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/samber/lo"
2525
"github.com/stretchr/testify/mock"
2626
"github.com/stretchr/testify/suite"
27-
"go.opentelemetry.io/otel/trace"
2827

2928
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
3029
"github.com/milvus-io/milvus/internal/datacoord/allocator"
@@ -444,7 +443,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
444443
t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved)
445444
t.updateAndSaveTaskMeta(setNodeID(100))
446445
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
447-
t.result = &datapb.CompactionPlanResult{}
448446

449447
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
450448
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
@@ -461,7 +459,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
461459
t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved)
462460
t.updateAndSaveTaskMeta(setNodeID(100))
463461
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
464-
t.result = &datapb.CompactionPlanResult{}
465462

466463
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
467464

@@ -475,7 +472,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
475472
t := s.generateTestL0Task(datapb.CompactionTaskState_completed)
476473
t.updateAndSaveTaskMeta(setNodeID(100))
477474
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
478-
t.result = &datapb.CompactionPlanResult{}
479475
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
480476
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
481477
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
@@ -491,7 +487,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
491487
t := s.generateTestL0Task(datapb.CompactionTaskState_completed)
492488
t.updateAndSaveTaskMeta(setNodeID(100))
493489
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
494-
t.result = &datapb.CompactionPlanResult{}
495490
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once()
496491
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything, false).RunAndReturn(func(ctx context.Context, segIDs []int64, isCompacting bool) {
497492
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
@@ -531,38 +526,3 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
531526
s.True(got)
532527
})
533528
}
534-
535-
func (s *L0CompactionTaskSuite) TestSetterGetter() {
536-
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
537-
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
538-
539-
span := t.GetSpan()
540-
s.Nil(span)
541-
s.NotPanics(t.EndSpan)
542-
543-
t.SetSpan(trace.SpanFromContext(context.TODO()))
544-
s.NotPanics(t.EndSpan)
545-
546-
rst := t.GetResult()
547-
s.Nil(rst)
548-
t.SetResult(&datapb.CompactionPlanResult{PlanID: 19530})
549-
s.NotNil(t.GetResult())
550-
551-
label := t.GetLabel()
552-
s.Equal("10-ch-1", label)
553-
554-
t.updateAndSaveTaskMeta(setStartTime(100))
555-
s.EqualValues(100, t.GetTaskProto().GetStartTime())
556-
557-
t.SetTask(nil)
558-
t.SetPlan(&datapb.CompactionPlan{PlanID: 19530})
559-
s.NotNil(t.GetPlan())
560-
561-
s.Run("set NodeID", func() {
562-
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
563-
564-
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything, mock.Anything).Return(nil)
565-
t.SetNodeID(1000)
566-
s.EqualValues(1000, t.GetTaskProto().GetNodeID())
567-
})
568-
}

0 commit comments

Comments
 (0)