Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adaptive lookback for monovertex #2373

Merged
merged 11 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/user-guide/reference/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ spec:
don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to overlap
5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, in order to prevent from
scaling down to 0.
The max value allowed to be configured is `600`.
On top of this, we have dynamic lookback adjustment which tunes this parameter based on the realtime processing data.
- `scaleUpCooldownSeconds` - After a scaling operation, how many seconds to wait for the same vertex, if the follow-up
operation is a scaling up, defaults to `90`. Please make sure that the time is greater than the pod to be `Running` and
start processing, because the autoscaling algorithm will divide the TPS by the number of pods even if the pod is not `Running`.
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ const (
DefaultTargetProcessingSeconds = 20 // Default targeted time in seconds to finish processing all the pending messages for a source
DefaultTargetBufferAvailability = 50 // Default targeted percentage of buffer availability
DefaultReplicasPerScale = 2 // Default maximum replicas to be scaled up or down at once
MaxLookbackSeconds = 600 // Max lookback seconds for calculating avg rate and pending

// Default persistent buffer queue options
DefaultPBQChannelBufferSize = 100 // Default channel size in int (what should be right value?)
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/numaflow/v1alpha1/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type Scale struct {

func (s Scale) GetLookbackSeconds() int {
if s.LookbackSeconds != nil {
return int(*s.LookbackSeconds)
// do not allow the value to be larger than the MaxLookbackSeconds in our config
return min(MaxLookbackSeconds, int(*s.LookbackSeconds))
}
return DefaultLookbackSeconds
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,17 @@ var (
Help: "Total number of Write Errors while writing to a fallback sink",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
)

// Daemon server metrics
var (
// MonoVertexLookBackSecs is a gauge used to indicate what is the current lookback window value being used
// by a given monovertex. It is used as how many seconds to lookback for vertex average processing rate
// (tps) and pending messages calculation, defaults to 120. Rate and pending messages metrics are
// critical for autoscaling.
MonoVertexLookBackSecs = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "monovtx",
Name: "lookback_window_seconds",
Help: "A metric to show what is the lookback window value being used by a given monovertex. " +
"Look back Seconds is critical in autoscaling calculations",
}, []string{LabelMonoVertexName})
)
92 changes: 92 additions & 0 deletions pkg/mvtxdaemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,95 @@ func calculatePodDelta(tc1, tc2 *TimestampedCounts) float64 {
}
return delta
}

// podLastSeen stores the last seen timestamp and count of each pod
type podLastSeen struct {
count float64
seenTime int64
}

type podMaxDuration struct {
// Map to keep track of the last seen count and timestamp of each pod.
lastSeen map[string]podLastSeen
// Map to store the maximum duration for which the value of any pod was unchanged.
maxUnchangedDuration map[string]int64
}

// CalculateMaxLookback computes the maximum duration (in seconds) for which the count of messages processed by any pod
// remained unchanged within a specified range of indices in a queue of TimestampedCounts. It does this by analyzing each
// data point between the startIndex and endIndex, checking the count changes for each pod, and noting the durations
// during which these counts stay consistent. The metric is updated when data is read by the pod
// This would encapsulate the lookback for two scenarios
// 1. Slow processing vertex
// 2. Slow data source - data arrives after long intervals
func CalculateMaxLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64 {
lookBackData := podMaxDuration{
lastSeen: make(map[string]podLastSeen),
maxUnchangedDuration: make(map[string]int64),
}
processTimeline(counts, startIndex, endIndex, &lookBackData)
finalizeDurations(counts[endIndex], &lookBackData)
return findGlobalMaxDuration(lookBackData.maxUnchangedDuration)
}

// processTimeline processes the timeline of counts and updates the maxUnchangedDuration for each pod.
func processTimeline(counts []*TimestampedCounts, startIndex, endIndex int, data *podMaxDuration) {
for i := startIndex; i <= endIndex; i++ {
item := counts[i].PodCountSnapshot()
curTime := counts[i].PodTimestamp()

for key, curCount := range item {
lastSeenData, found := data.lastSeen[key]
if found && lastSeenData.count == curCount {
continue
}

// If the read count data has updated
if found && curCount > lastSeenData.count {
duration := curTime - lastSeenData.seenTime
if currentMax, ok := data.maxUnchangedDuration[key]; !ok || duration > currentMax {
data.maxUnchangedDuration[key] = duration
}
}
// The value is updated in the lastSeen for 3 cases
// 1. If this is the first time seeing the pod entry or
// 2. in case of a value increase,
// 3. In case of a value decrease which is treated as a new entry for pod
data.lastSeen[key] = podLastSeen{curCount, curTime}
}
}
}

// Check for pods that did not change at all during the iteration,
// and update their maxUnchangedDuration to the full period from first seen to lastTime.
// Note: There is a case where one pod was getting data earlier, but then stopped altogether.
// For example, one partition in Kafka not getting data after a while. This case will not be covered
// by our logic, and we would keep increasing the look back in such a scenario.
func finalizeDurations(lastCount *TimestampedCounts, data *podMaxDuration) {
endVals := lastCount.PodCountSnapshot()
lastTime := lastCount.PodTimestamp()
for key, lastSeenData := range data.lastSeen {
endDuration := lastTime - lastSeenData.seenTime
// This condition covers two scenarios:
// 1. There is an entry in the last seen, but not in maxUnchangedDuration
// It was seen once, but value never changed. In this case update the maxDuration, but only when
// the count > 0
// 2. The value has not changed till the boundary, and this duration is larger than the current max
if _, exists := endVals[key]; exists && (lastSeenData.count != 0) {
if currentMax, ok := data.maxUnchangedDuration[key]; !ok || endDuration > currentMax {
data.maxUnchangedDuration[key] = endDuration
}
}
}
}

// Calculate the maximum duration found across all pods.
func findGlobalMaxDuration(maxUnchangedDuration map[string]int64) int64 {
globalMaxSecs := int64(0)
for _, duration := range maxUnchangedDuration {
if duration > globalMaxSecs {
globalMaxSecs = duration
}
}
return globalMaxSecs
}
127 changes: 127 additions & 0 deletions pkg/mvtxdaemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rater

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -286,3 +287,129 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 23.0, CalculateRate(q, 100))
})
}

// Helper function to create a TimestampedCounts instance
func newTimestampedCounts(timestamp int64, counts map[string]float64) *TimestampedCounts {
return &TimestampedCounts{
timestamp: timestamp,
podReadCounts: counts,
lock: new(sync.RWMutex),
}
}

// TestCalculateMaxLookback tests various scenarios on the CalculateMaxLookback function
func TestCalculateMaxLookback(t *testing.T) {
tests := []struct {
name string
counts []*TimestampedCounts
startIndex int
endIndex int
expectedMax int64
}{
{
name: "Uniform data across the range",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(200, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(400, map[string]float64{"pod1": 100, "pod2": 200}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 300,
},
{
name: "Values change midway",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100, "pod2": 150}),
newTimestampedCounts(240, map[string]float64{"pod1": 100, "pod2": 200}),
newTimestampedCounts(360, map[string]float64{"pod1": 150, "pod2": 200}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 260,
},
{
name: "No data change across any pods",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 500}),
newTimestampedCounts(600, map[string]float64{"pod1": 500}),
},
startIndex: 0,
endIndex: 1,
expectedMax: 500, // Entire duration
},
{
name: "Edge Case: One entry only",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 100}),
},
startIndex: 0,
endIndex: 0,
expectedMax: 0, // No duration difference
},
{
name: "Rapid changes in sequential entries",
counts: []*TimestampedCounts{
newTimestampedCounts(100, map[string]float64{"pod1": 500, "pod2": 400}),
newTimestampedCounts(130, map[string]float64{"pod1": 600, "pod2": 400}),
newTimestampedCounts(160, map[string]float64{"pod1": 600, "pod2": 600}),
},
startIndex: 0,
endIndex: 2,
expectedMax: 60,
},
{
// Here the pod has an initial read count, and then would we see a pod count as 0.
// This is equated as a refresh in counts, and thus
name: "Pod goes to zero",
counts: []*TimestampedCounts{
newTimestampedCounts(0, map[string]float64{"pod1": 50}), // Initial count
newTimestampedCounts(30, map[string]float64{"pod1": 50}),
newTimestampedCounts(60, map[string]float64{"pod1": 0}), // Count falls to zero
newTimestampedCounts(120, map[string]float64{"pod1": 25}), // Count returns
newTimestampedCounts(180, map[string]float64{"pod1": 25}), // Count stays stable
newTimestampedCounts(240, map[string]float64{"pod1": 25}), // Count stays stable again
},
startIndex: 0,
endIndex: 5,
expectedMax: 120, // from index 3,5
},
{
name: "Pod goes to zero - 2",
counts: []*TimestampedCounts{
newTimestampedCounts(0, map[string]float64{"pod1": 60}),
newTimestampedCounts(60, map[string]float64{"pod1": 60}),
newTimestampedCounts(120, map[string]float64{"pod1": 70}),
newTimestampedCounts(180, map[string]float64{"pod1": 0}),
newTimestampedCounts(240, map[string]float64{"pod1": 25}),
newTimestampedCounts(300, map[string]float64{"pod1": 25}),
},
startIndex: 0,
endIndex: 5,
expectedMax: 120, // here idx 0,2 should be used, after going to zero it resets
},
{
// this is a case where one pod never got any data which we consider as read count = 0 always
// in such a case we should not use this pod for calculation
name: "One pod no data, other >0 ",
counts: []*TimestampedCounts{
newTimestampedCounts(0, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(60, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(120, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(180, map[string]float64{"pod1": 0, "pod2": 5}),
newTimestampedCounts(240, map[string]float64{"pod1": 0, "pod2": 6}),
newTimestampedCounts(300, map[string]float64{"pod1": 0, "pod2": 6}),
},
startIndex: 0,
endIndex: 5,
expectedMax: 240,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
maxDuration := CalculateMaxLookback(tt.counts, tt.startIndex, tt.endIndex)
assert.Equal(t, tt.expectedMax, maxDuration)
})
}
}
Loading
Loading