Skip to content

Commit ee18f60

Browse files
authored
apiserver: exclude APF queue wait time from SLO latency metrics (kubernetes#116420)
* apiserver: add latency tracker for priority & fairness queue wait time Signed-off-by: Andrew Sy Kim <[email protected]> * apiserver: exclude priority & fairness wait times to SLO/SLI latency metrics Signed-off-by: Andrew Sy Kim <[email protected]> * apiserver: update TestLatencyTrackersFrom to check latency from PriorityAndFairnessTracker Signed-off-by: Andrew Sy Kim <[email protected]> * flowcontrol: add helper function observeQueueWaitTime to consolidate metric and latency tracker calls Signed-off-by: Andrew Sy Kim <[email protected]> * flowcontrol: replace time.Now() / time.Since() with clock.Now() / clock.Since() for better testability Signed-off-by: Andrew Sy Kim <[email protected]> * flowcontrol: add unit test TestQueueWaitTimeLatencyTracker to validate queue wait times recorded by latency tracker Signed-off-by: Andrew Sy Kim <[email protected]> --------- Signed-off-by: Andrew Sy Kim <[email protected]>
1 parent 0e06be5 commit ee18f60

File tree

6 files changed

+202
-11
lines changed

6 files changed

+202
-11
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ var (
106106
&compbasemetrics.HistogramOpts{
107107
Subsystem: APIServerComponent,
108108
Name: "request_slo_duration_seconds",
109-
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.",
109+
Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.",
110110
// This metric is supplementary to the requestLatencies metric.
111111
// It measures request duration excluding webhooks as they are mostly
112112
// dependant on user configuration.
@@ -121,7 +121,7 @@ var (
121121
&compbasemetrics.HistogramOpts{
122122
Subsystem: APIServerComponent,
123123
Name: "request_sli_duration_seconds",
124-
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.",
124+
Help: "Response latency distribution (not counting webhook duration and priority & fairness queue wait times) in seconds for each verb, group, version, resource, subresource, scope and component.",
125125
// This metric is supplementary to the requestLatencies metric.
126126
// It measures request duration excluding webhooks as they are mostly
127127
// dependant on user configuration.
@@ -544,7 +544,7 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour
544544
fieldValidationRequestLatencies.WithContext(req.Context()).WithLabelValues(fieldValidation)
545545

546546
if wd, ok := request.LatencyTrackersFrom(req.Context()); ok {
547-
sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency()).Seconds()
547+
sliLatency := elapsedSeconds - (wd.MutatingWebhookTracker.GetLatency() + wd.ValidatingWebhookTracker.GetLatency() + wd.APFQueueWaitTracker.GetLatency()).Seconds()
548548
requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency)
549549
requestSliLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sliLatency)
550550
}

staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ type LatencyTrackers struct {
116116
// Validate webhooks are done in parallel, so max function is used.
117117
ValidatingWebhookTracker DurationTracker
118118

119+
// APFQueueWaitTracker tracks the latency incurred by queue wait times
120+
// from priority & fairness.
121+
APFQueueWaitTracker DurationTracker
122+
119123
// StorageTracker tracks the latency incurred inside the storage layer,
120124
// it accounts for the time it takes to send data to the underlying
121125
// storage layer (etcd) and get the complete response back.
@@ -168,6 +172,7 @@ func WithLatencyTrackersAndCustomClock(parent context.Context, c clock.Clock) co
168172
return WithValue(parent, latencyTrackersKey, &LatencyTrackers{
169173
MutatingWebhookTracker: newSumLatencyTracker(c),
170174
ValidatingWebhookTracker: newMaxLatencyTracker(c),
175+
APFQueueWaitTracker: newMaxLatencyTracker(c),
171176
StorageTracker: newSumLatencyTracker(c),
172177
TransformTracker: newSumLatencyTracker(c),
173178
SerializationTracker: newSumLatencyTracker(c),
@@ -230,6 +235,14 @@ func TrackResponseWriteLatency(ctx context.Context, d time.Duration) {
230235
}
231236
}
232237

238+
// TrackAPFQueueWaitLatency is used to track latency incurred
239+
// by priority and fairness queues.
240+
func TrackAPFQueueWaitLatency(ctx context.Context, d time.Duration) {
241+
if tracker, ok := LatencyTrackersFrom(ctx); ok {
242+
tracker.APFQueueWaitTracker.TrackDuration(d)
243+
}
244+
}
245+
233246
// AuditAnnotationsFromLatencyTrackers will inspect each latency tracker
234247
// associated with the request context and return a set of audit
235248
// annotations that can be added to the API audit entry.

staging/src/k8s.io/apiserver/pkg/endpoints/request/webhook_duration_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestLatencyTrackersFrom(t *testing.T) {
3535
SumDurations: 1600,
3636
MaxDuration: 400,
3737
}
38-
t.Run("TestWebhookDurationFrom", func(t *testing.T) {
38+
t.Run("TestLatencyTrackersFrom", func(t *testing.T) {
3939
parent := context.TODO()
4040
_, ok := LatencyTrackersFrom(parent)
4141
if ok {
@@ -48,13 +48,14 @@ func TestLatencyTrackersFrom(t *testing.T) {
4848
if !ok {
4949
t.Error("expected LatencyTrackersFrom to be initialized")
5050
}
51-
if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 {
51+
if wd.MutatingWebhookTracker.GetLatency() != 0 || wd.ValidatingWebhookTracker.GetLatency() != 0 || wd.APFQueueWaitTracker.GetLatency() != 0 {
5252
t.Error("expected values to be initialized to 0")
5353
}
5454

5555
for _, d := range tc.Durations {
5656
wd.MutatingWebhookTracker.Track(func() { clk.Step(d) })
5757
wd.ValidatingWebhookTracker.Track(func() { clk.Step(d) })
58+
wd.APFQueueWaitTracker.Track(func() { clk.Step(d) })
5859
}
5960

6061
wd, ok = LatencyTrackersFrom(ctx)
@@ -69,5 +70,9 @@ func TestLatencyTrackersFrom(t *testing.T) {
6970
if wd.ValidatingWebhookTracker.GetLatency() != tc.MaxDuration {
7071
t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidatingWebhookTracker.GetLatency())
7172
}
73+
74+
if wd.APFQueueWaitTracker.GetLatency() != tc.MaxDuration {
75+
t.Errorf("expected priority & fairness duration: %q, but got: %q", tc.MaxDuration, wd.APFQueueWaitTracker.GetLatency())
76+
}
7277
})
7378
}

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
10261026
noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
10271027
workEstimate := workEstimator()
10281028

1029-
startWaitingTime = time.Now()
1029+
startWaitingTime = cfgCtlr.clock.Now()
10301030
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
10311031
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
10321032
if idle {

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strconv"
2222
"time"
2323

24+
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
2425
"k8s.io/apiserver/pkg/server/httplog"
2526
"k8s.io/apiserver/pkg/server/mux"
2627
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
@@ -161,7 +162,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
161162
queued := startWaitingTime != time.Time{}
162163
if req == nil {
163164
if queued {
164-
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
165+
observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
165166
}
166167
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
167168
return
@@ -178,21 +179,26 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
178179
}()
179180
idle = req.Finish(func() {
180181
if queued {
181-
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
182+
observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
182183
}
183184
metrics.AddDispatch(ctx, pl.Name, fs.Name)
184185
fqs.OnRequestDispatched(req)
185186
executed = true
186-
startExecutionTime := time.Now()
187+
startExecutionTime := cfgCtlr.clock.Now()
187188
defer func() {
188-
executionTime := time.Since(startExecutionTime)
189+
executionTime := cfgCtlr.clock.Since(startExecutionTime)
189190
httplog.AddKeyValue(ctx, "apf_execution_time", executionTime)
190191
metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, executionTime)
191192
}()
192193
execFn()
193194
})
194195
if queued && !executed {
195-
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
196+
observeQueueWaitTime(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), cfgCtlr.clock.Since(startWaitingTime))
196197
}
197198
panicking = false
198199
}
200+
201+
func observeQueueWaitTime(ctx context.Context, priorityLevelName, flowSchemaName, execute string, waitTime time.Duration) {
202+
metrics.ObserveWaitingDuration(ctx, priorityLevelName, flowSchemaName, execute, waitTime)
203+
endpointsrequest.TrackAPFQueueWaitLatency(ctx, waitTime)
204+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package flowcontrol
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/apimachinery/pkg/util/wait"
28+
"k8s.io/apiserver/pkg/endpoints/request"
29+
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
30+
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
31+
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
32+
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
33+
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
34+
"k8s.io/client-go/informers"
35+
clientsetfake "k8s.io/client-go/kubernetes/fake"
36+
)
37+
38+
// TestQueueWaitTimeLatencyTracker tests the queue wait times recorded by the P&F latency tracker
39+
// when calling Handle.
40+
func TestQueueWaitTimeLatencyTracker(t *testing.T) {
41+
metrics.Register()
42+
43+
var fsObj *flowcontrol.FlowSchema
44+
var plcObj *flowcontrol.PriorityLevelConfiguration
45+
cfgObjs := []runtime.Object{}
46+
47+
plName := "test-pl"
48+
username := "test-user"
49+
fsName := "test-fs"
50+
lendable := int32(0)
51+
borrowingLimit := int32(0)
52+
fsObj = &flowcontrol.FlowSchema{
53+
ObjectMeta: metav1.ObjectMeta{
54+
Name: fsName,
55+
},
56+
Spec: flowcontrol.FlowSchemaSpec{
57+
MatchingPrecedence: 100,
58+
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
59+
Name: plName,
60+
},
61+
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
62+
Type: flowcontrol.FlowDistinguisherMethodByUserType,
63+
},
64+
Rules: []flowcontrol.PolicyRulesWithSubjects{{
65+
Subjects: []flowcontrol.Subject{{
66+
Kind: flowcontrol.SubjectKindUser,
67+
User: &flowcontrol.UserSubject{Name: username},
68+
}},
69+
NonResourceRules: []flowcontrol.NonResourcePolicyRule{{
70+
Verbs: []string{"*"},
71+
NonResourceURLs: []string{"*"},
72+
}},
73+
}},
74+
},
75+
}
76+
plcObj = &flowcontrol.PriorityLevelConfiguration{
77+
ObjectMeta: metav1.ObjectMeta{
78+
Name: plName,
79+
},
80+
Spec: flowcontrol.PriorityLevelConfigurationSpec{
81+
Type: flowcontrol.PriorityLevelEnablementLimited,
82+
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
83+
NominalConcurrencyShares: 100,
84+
LendablePercent: &lendable,
85+
BorrowingLimitPercent: &borrowingLimit,
86+
LimitResponse: flowcontrol.LimitResponse{
87+
Type: flowcontrol.LimitResponseTypeQueue,
88+
Queuing: &flowcontrol.QueuingConfiguration{
89+
Queues: 10,
90+
HandSize: 2,
91+
QueueLengthLimit: 10,
92+
},
93+
},
94+
},
95+
},
96+
}
97+
cfgObjs = append(cfgObjs, fsObj, plcObj)
98+
99+
clientset := clientsetfake.NewSimpleClientset(cfgObjs...)
100+
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
101+
flowcontrolClient := clientset.FlowcontrolV1beta3()
102+
startTime := time.Now()
103+
clk, _ := eventclock.NewFake(startTime, 0, nil)
104+
controller := newTestableController(TestableConfig{
105+
Name: "Controller",
106+
Clock: clk,
107+
AsFieldManager: ConfigConsumerAsFieldManager,
108+
FoundToDangling: func(found bool) bool { return !found },
109+
InformerFactory: informerFactory,
110+
FlowcontrolClient: flowcontrolClient,
111+
ServerConcurrencyLimit: 24,
112+
RequestWaitLimit: time.Minute,
113+
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
114+
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
115+
QueueSetFactory: fqs.NewQueueSetFactory(clk),
116+
})
117+
118+
informerFactory.Start(nil)
119+
120+
status := informerFactory.WaitForCacheSync(nil)
121+
if names := unsynced(status); len(names) > 0 {
122+
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
123+
}
124+
125+
go func() {
126+
controller.Run(nil)
127+
}()
128+
129+
// ensure that the controller has run its first loop.
130+
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
131+
return controller.hasPriorityLevelState(plcObj.Name), nil
132+
})
133+
if err != nil {
134+
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObj.Name, err)
135+
}
136+
137+
reqInfo := &request.RequestInfo{
138+
IsResourceRequest: false,
139+
Path: "/foobar",
140+
Verb: "GET",
141+
}
142+
noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {}
143+
workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} }
144+
145+
flowUser := testUser{name: "test-user"}
146+
rd := RequestDigest{
147+
RequestInfo: reqInfo,
148+
User: flowUser,
149+
}
150+
151+
// Add 1 second to the fake clock during QueueNoteFn
152+
newTime := startTime.Add(time.Second)
153+
qnf := fq.QueueNoteFn(func(bool) { clk.FakePassiveClock.SetTime(newTime) })
154+
ctx := request.WithLatencyTrackers(context.Background())
155+
controller.Handle(ctx, rd, noteFn, workEstr, qnf, func() {})
156+
157+
latencyTracker, ok := request.LatencyTrackersFrom(ctx)
158+
if !ok {
159+
t.Fatalf("error getting latency tracker: %v", err)
160+
}
161+
162+
expectedLatency := time.Second // newTime - startTime
163+
latency := latencyTracker.APFQueueWaitTracker.GetLatency()
164+
if latency != expectedLatency {
165+
t.Errorf("unexpected latency, got %s, expected %s", latency, expectedLatency)
166+
}
167+
}

0 commit comments

Comments
 (0)