Skip to content

Commit b837a0c

Browse files
committed
kubelet: podresources: DOS prevention with builtin ratelimit
Implement DOS prevention wiring a global rate limit for podresources API. The goal here is not to introduce a general ratelimiting solution for the kubelet (we need more research and discussion to get there), but rather to prevent misuse of the API. Known limitations: - the rate limits value (QPS, BurstTokens) are hardcoded to "high enough" values. Enabling user-configuration would require more discussion and sweeping changes to the other kubelet endpoints, so it is postponed for now. - the rate limiting is global. Malicious clients can starve other clients consuming the QPS quota. Add e2e test to exercise the flow, because the wiring itself is mostly boilerplate and API adaptation.
1 parent 09517c2 commit b837a0c

File tree

3 files changed

+111
-1
lines changed

3 files changed

+111
-1
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package grpc
18+
19+
import (
20+
"context"
21+
22+
gotimerate "golang.org/x/time/rate"
23+
"k8s.io/klog/v2"
24+
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/codes"
27+
"google.golang.org/grpc/status"
28+
)
29+
30+
const (
31+
// DefaultQPS is determined by empirically reviewing known consumers of the API.
32+
// It's at least unlikely that there is a legitimate need to query podresources
33+
// more than 100 times per second, the other subsystems are not guaranteed to react
34+
// so fast in the first place.
35+
DefaultQPS = 100
36+
// DefaultBurstTokens is determined by empirically reviewing known consumers of the API.
37+
// See the documentation of DefaultQPS, same caveats apply.
38+
DefaultBurstTokens = 10
39+
)
40+
41+
var (
42+
ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit")
43+
)
44+
45+
// Limiter defines the interface to perform request rate limiting,
46+
// based on the interface exposed by https://pkg.go.dev/golang.org/x/time/rate#Limiter
47+
type Limiter interface {
48+
// Allow reports whether an event may happen now.
49+
Allow() bool
50+
}
51+
52+
// LimiterUnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting.
53+
func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor {
54+
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
55+
if !limiter.Allow() {
56+
return nil, ErrorLimitExceeded
57+
}
58+
return handler(ctx, req)
59+
}
60+
}
61+
62+
func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption {
63+
qpsVal := gotimerate.Limit(qps)
64+
burstVal := int(burstTokens)
65+
klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal)
66+
return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal)))
67+
}

pkg/kubelet/server/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ import (
7575
"k8s.io/kubernetes/pkg/features"
7676
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
7777
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
78+
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
7879
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
7980
"k8s.io/kubernetes/pkg/kubelet/cri/streaming"
8081
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward"
@@ -218,7 +219,7 @@ type PodResourcesProviders struct {
218219

219220
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
220221
func ListenAndServePodResources(socket string, providers PodResourcesProviders) {
221-
server := grpc.NewServer()
222+
server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens))
222223

223224
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers.Pods, providers.Devices))
224225
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers.Pods, providers.Devices, providers.Cpus, providers.Memory))

test/e2e_node/podresources_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package e2enode
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"os"
2324
"strings"
@@ -31,6 +32,7 @@ import (
3132
kubefeatures "k8s.io/kubernetes/pkg/features"
3233
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
3334
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
35+
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
3436
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
3537
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
3638
"k8s.io/kubernetes/pkg/kubelet/util"
@@ -861,7 +863,47 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
861863
ginkgo.By("Ensuring the metrics match the expectations a few more times")
862864
gomega.Consistently(ctx, getPodResourcesMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
863865
})
866+
})
867+
868+
ginkgo.Context("with the builtin rate limit values", func() {
869+
ginkgo.It("should hit throttling when calling podresources List in a tight loop", func(ctx context.Context) {
870+
// ensure APIs have been called at least once
871+
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
872+
framework.ExpectNoError(err)
864873

874+
ginkgo.By("Connecting to the kubelet endpoint")
875+
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
876+
framework.ExpectNoError(err)
877+
defer conn.Close()
878+
879+
tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens
880+
errs := []error{}
881+
882+
ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries))
883+
startTime := time.Now()
884+
for try := 0; try < tries; try++ {
885+
_, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
886+
errs = append(errs, err)
887+
}
888+
elapsed := time.Since(startTime)
889+
890+
ginkgo.By(fmt.Sprintf("Checking return codes for %d List() calls in %v", tries, elapsed))
891+
892+
framework.ExpectNoError(errs[0], "the first List() call unexpectedly failed with %v", errs[0])
893+
// we would expect (burst) successes and then (tries-burst) errors on a clean test environment running with
894+
// enough CPU power. CI is usually harsher. So we relax constraints, expecting at least _a_ failure, while
895+
// we are likely to get much more. But we can't predict yet how more we should expect, so we prefer to relax
896+
// constraints than to risk flakes at this stage.
897+
errLimitExceededCount := 0
898+
for _, err := range errs[1:] {
899+
if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) {
900+
errLimitExceededCount++
901+
}
902+
}
903+
gomega.Expect(errLimitExceededCount).ToNot(gomega.BeZero(), "never hit the rate limit trying %d calls in %v", tries, elapsed)
904+
905+
framework.Logf("got %d/%d rate limit errors, at least one needed, the more the better", errLimitExceededCount, tries)
906+
})
865907
})
866908
})
867909

0 commit comments

Comments
 (0)