Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adaptive lookback for monovertex #2373

Merged
merged 11 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,14 @@ var (
Help: "Total number of Write Errors while writing to a fallback sink",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
)

// Daemon server metrics
var (
// MonoVertexLookBack is a gauge used to indicate what is the current lookback window value being used
// by a given monovertex
MonoVertexLookBack = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "monovtx",
Name: "lookback_window",
Help: "A metric to show what is the lookback window value being used by a given monovertex",
}, []string{LabelMonoVertexName})
)
74 changes: 74 additions & 0 deletions pkg/mvtxdaemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,77 @@ func calculatePodDelta(tc1, tc2 *TimestampedCounts) float64 {
}
return delta
}

// CalculateMaxLookback computes the maximum duration (in seconds) for which the count of messages processed by any pod
// remained unchanged within a specified range of indices in a queue of TimestampedCounts. It does this by analyzing each
// data point between the startIndex and endIndex, checking the count changes for each pod, and noting the durations
// during which these counts stay consistent. The metric is updated is when data is read by the pod
// This would encapsulate the lookback for two scenarios
// 1. Slow processing vertex
// 2. Slow data source - data arrives after long intervals
func CalculateMaxLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64 {
// Map to keep track of the last seen count and timestamp of each pod.
lastSeen := make(map[string]struct {
count float64
seenTime int64
})

// Map to store the maximum duration for which the value of any pod was unchanged.
maxDuration := make(map[string]int64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation of maxDuration is independent among pods right? Can we have a separated function to calculate maxDuration for single pod? That way we can have only one for loop and save some ok, found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have kept a single function for easier understanding of logic right now. Can change if you feel required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was to make the code easier to understand by changing the method to something like below

maxUnchangedDuration = make(map[string]int64)

for pod in pods:
   maxUnchangedDuration[podName] = calculateMaxUnchangedDurationForPod(pod)
  
globalMaxSecs =    maxUnchangedDuration.theMaxDuration().

return globalMaxSecs

The calculateMaxUnchangedDurationForPod method doesn't need to maintain pod name to lastSeen/maxDuration mapping as we do right now.


for i := startIndex; i < endIndex; i++ {
// Get a snapshot of pod counts and the timestamp for the current index.
item := counts[i].PodCountSnapshot()
curTime := counts[i].PodTimestamp()

// Iterate through each pod in the snapshot.
for key, count := range item {
if lastSeenData, found := lastSeen[key]; found {
// If the read count data has updated
if lastSeenData.count != count {
// Calculate the duration for which the count was unchanged.
duration := curTime - lastSeenData.seenTime
// Update maxDuration for the pod if this duration is the longest seen so far.
// TODO: Can check if average or EWMA works better than max
if currentMax, ok := maxDuration[key]; !ok || duration > currentMax {
maxDuration[key] = duration
}
// Update the last seen count and timestamp for the pod.
lastSeen[key] = struct {
count float64
seenTime int64
}{count, curTime}
}
} else {
// First time seeing this pod, initialize its last seen data.
lastSeen[key] = struct {
count float64
seenTime int64
}{count, curTime}
}
}
}

// Fetch the last timestamp used in the analysis to check unmodified runs.
endVals := counts[endIndex].PodCountSnapshot()
lastTime := counts[endIndex].PodTimestamp()

// Check for pods that did not change at all during the iteration,
// and update their maxDuration to the full period from first seen to lastTime.
for key, data := range lastSeen {
if _, ok := maxDuration[key]; !ok {
if _, found := endVals[key]; found {
maxDuration[key] = lastTime - data.seenTime
}
}
}

// Calculate the maximum duration found across all pods.
globalMaxSecs := int64(0)
for _, duration := range maxDuration {
if duration > globalMaxSecs {
globalMaxSecs = duration
}
}
return globalMaxSecs
}
134 changes: 134 additions & 0 deletions pkg/mvtxdaemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rater

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -286,3 +287,136 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 23.0, CalculateRate(q, 100))
})
}

// Helper function to create a TimestampedCounts instance
func newTimestampedCounts(timestamp int64, counts map[string]float64) *TimestampedCounts {
return &TimestampedCounts{
timestamp: timestamp,
podReadCounts: counts,
lock: new(sync.RWMutex),
}
}

// TestCalculateMaxLookback tests various scenarios on the CalculateMaxLookback function
func TestCalculateMaxLookback(t *testing.T) {
tests := []struct {
name string
counts []*TimestampedCounts
startIndex int
endIndex int
expectedMax int64
}{
{
name: "Uniform data across the range",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(200, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(400, map[string]float64{"pod1": 100, "pod2": 200}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 300,
},
{
name: "Values change midway",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 150}),
newTimestampedCounts(240, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(360, map[string]float64{"pod1": 150, "pod2": 200}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 260,
},
{
name: "No data change across any pods",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 500}),
newTimestampedCounts(600, map[string]float64{"pod1": 500}),
},
startIndex: 0,
endIndex: 1,
expectedMax: 500, // Entire duration
},
{
name: "Edge Case: One entry only",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100}),
},
startIndex: 0,
endIndex: 0,
expectedMax: 0, // No duration difference
},
{
name: "Multiple pods with different activity patterns",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(300, map[string]float64{"pod1": 100, "pod2": 250}),
newTimestampedCounts(500, map[string]float64{"pod1": 150, "pod2": 250}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 400, // for pod1
},
{
name: "Rapid changes in sequential entries",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 300, "pod2": 400}),
newTimestampedCounts(130, map[string]float64{"pod1": 500, "pod2": 400}),
newTimestampedCounts(160, map[string]float64{"pod1": 500, "pod2": 600}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 60, // for pod2 between 1,3
},
{
name: "Gaps in timestamps",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100}),
newTimestampedCounts(1000, map[string]float64{"pod1": 100}), // Large gap with no change
newTimestampedCounts(1100, map[string]float64{"pod1": 200}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 1000,
},
{
name: "Large number of pods",
counts: []*TimestampedCounts{
newTimestampedCounts(10, map[string]float64{
"pod1": 100, "pod2": 100, "pod3": 100, "pod4": 100, "pod5": 100,
"pod6": 100, "pod7": 100, "pod8": 100, "pod9": 100, "pod10": 100}),
newTimestampedCounts(20, map[string]float64{
"pod1": 100, "pod2": 100, "pod3": 100, "pod4": 200, "pod5": 100,
"pod6": 100, "pod7": 200, "pod8": 100, "pod9": 100, "pod10": 100}),
},
startIndex: 0,
endIndex: 1,
expectedMax: 10, // unchanged duration for pods that didn't change, smallest non-zero
},
{
name: "Large number of pods - No change",
counts: []*TimestampedCounts{
newTimestampedCounts(10, map[string]float64{
"pod1": 100, "pod2": 100, "pod3": 100, "pod4": 100, "pod5": 100,
"pod6": 100, "pod7": 100, "pod8": 100, "pod9": 100, "pod10": 100}),
newTimestampedCounts(20, map[string]float64{
"pod1": 100, "pod2": 100, "pod3": 100, "pod4": 100, "pod5": 100,
"pod6": 100, "pod7": 100, "pod8": 100, "pod9": 100, "pod10": 100}),
newTimestampedCounts(60, map[string]float64{
"pod1": 100, "pod2": 100, "pod3": 100, "pod4": 100, "pod5": 100,
"pod6": 100, "pod7": 100, "pod8": 100, "pod9": 100, "pod10": 100}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 50, // unchanged duration for pods that didn't change, will take the max difference
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
maxDuration := CalculateMaxLookback(tt.counts, tt.startIndex, tt.endIndex)
assert.Equal(t, tt.expectedMax, maxDuration)
})
}
}
90 changes: 84 additions & 6 deletions pkg/mvtxdaemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,25 @@ package rater
import (
"crypto/tls"
"fmt"
"math"
"net/http"
"time"

"github.com/prometheus/common/expfmt"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/net/context"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue"
)

const CountWindow = time.Second * 10
const monoVtxReadMetricName = "monovtx_read_total"
const MaxLookback = time.Minute * 10

// MonoVtxRatable is the interface for the Rater struct.
type MonoVtxRatable interface {
Expand Down Expand Up @@ -63,8 +67,9 @@ type Rater struct {
podTracker *PodTracker
// timestampedPodCounts is a queue of timestamped counts for the MonoVertex
timestampedPodCounts *sharedqueue.OverflowQueue[*TimestampedCounts]
// userSpecifiedLookBackSeconds is the user-specified lookback seconds for that MonoVertex
userSpecifiedLookBackSeconds int64
// userSpecifiedLookBackSeconds the current lookback seconds for the monovertex
// this can be updated dynamically, defaults to user-specified value in the spec
userSpecifiedLookBackSeconds *atomic.Float64
options *options
}

Expand Down Expand Up @@ -95,20 +100,22 @@ func NewRater(ctx context.Context, mv *v1alpha1.MonoVertex, opts ...Option) *Rat
},
Timeout: time.Second * 1,
},
log: logging.FromContext(ctx).Named("Rater"),
options: defaultOptions(),
log: logging.FromContext(ctx).Named("Rater"),
options: defaultOptions(),
userSpecifiedLookBackSeconds: atomic.NewFloat64(float64(mv.Spec.Scale.GetLookbackSeconds())),
}

rater.podTracker = NewPodTracker(ctx, mv)
// maintain the total counts of the last 30 minutes(1800 seconds) since we support 1m, 5m, 15m lookback seconds.
rater.timestampedPodCounts = sharedqueue.New[*TimestampedCounts](int(1800 / CountWindow.Seconds()))
rater.userSpecifiedLookBackSeconds = int64(mv.Spec.Scale.GetLookbackSeconds())

for _, opt := range opts {
if opt != nil {
opt(rater.options)
}
}
// initialise the metric value for the lookback window
metrics.MonoVertexLookBack.WithLabelValues(mv.Name).Set(rater.userSpecifiedLookBackSeconds.Load())
return &rater
}

Expand Down Expand Up @@ -203,7 +210,10 @@ func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue {
}

func (r *Rater) buildLookbackSecondsMap() map[string]int64 {
lookbackSecondsMap := map[string]int64{"default": r.userSpecifiedLookBackSeconds}
// as the lookback value can be changing dynamically,
// load the current value for the lookback seconds
lbValue := r.userSpecifiedLookBackSeconds.Load()
lookbackSecondsMap := map[string]int64{"default": int64(lbValue)}
for k, v := range fixedLookbackSeconds {
lookbackSecondsMap[k] = v
}
Expand All @@ -223,6 +233,10 @@ func (r *Rater) Start(ctx context.Context) error {
}
}()

// start the dynamic lookback check which will be
// updating the lookback period based on the data read time.
go r.startDynamicLookBack(ctx)

// Worker group
for i := 1; i <= r.options.workers; i++ {
go r.monitor(ctx, i, keyCh)
Expand Down Expand Up @@ -269,3 +283,67 @@ func sleep(ctx context.Context, duration time.Duration) {
case <-time.After(duration):
}
}

func (r *Rater) startDynamicLookBack(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
// Ensure the ticker is stopped to prevent a resource leak.
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.updateDynamicLookbackSecs()
case <-ctx.Done():
// If the context is canceled or expires exit
return
}
}
}

// updateDynamicLookbackSecs continuously adjusts ths lookback duration based on the current
// processing time of the MonoVertex system.
func (r *Rater) updateDynamicLookbackSecs() {
counts := r.timestampedPodCounts.Items()
currentLookback := r.userSpecifiedLookBackSeconds.Load()
vertexName := r.monoVertex.Name
if len(counts) <= 1 {
return
}

// We will calculate the processing time for a time window = 3 * currentLookback
// This ensures that we have enough data to capture one complete processing
startIndex := findStartIndex(3*int64(currentLookback), counts)
// we consider the last but one element as the end index because the last element might be incomplete
// we can be sure that the last but one element in the queue is complete.
endIndex := len(counts) - 2
if startIndex == indexNotFound {
return
}

// time diff in seconds.
timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
if timeDiff == 0 {
// no action required here
return
}
maxProcessingTime := CalculateMaxLookback(counts, startIndex, endIndex)
// round up to the nearest minute, also ensure that while going up and down we have the consistent value for
// a given processingTimeSeconds, then convert back to seconds
roundedProcessingTime := 60.0 * (math.Ceil(float64(maxProcessingTime) / 60.0))
// Based on the value recieved we can have two cases
// 1. Step up case (value is > than current):
// Do not allow the value to be increased more than the MaxLookback allowed (10mins)
// 2. Step Down (value is <= than current)
// Do not allow the value to be lower the lookback value specified in the spec
if roundedProcessingTime > currentLookback {
roundedProcessingTime = math.Min(roundedProcessingTime, MaxLookback.Seconds())
} else {
roundedProcessingTime = math.Max(roundedProcessingTime, float64(r.monoVertex.Spec.Scale.GetLookbackSeconds()))
}
// If the value has changed, update it
if roundedProcessingTime != currentLookback {
r.userSpecifiedLookBackSeconds.Store(roundedProcessingTime)
r.log.Infof("Lookback updated for mvtx %s, Current: %f Updated %f", vertexName, currentLookback, roundedProcessingTime)
// update the metric value for the lookback window
metrics.MonoVertexLookBack.WithLabelValues(r.monoVertex.Name).Set(roundedProcessingTime)
}
}
Loading
Loading