Skip to content

Commit 204a9a1

Browse files
authored
Merge pull request kubernetes#116459 from ffromani/podresources-ratelimit-minimal
add podresources DOS prevention using rate limit
2 parents 2bd69db + b837a0c commit 204a9a1

File tree

4 files changed

+132
-5
lines changed

4 files changed

+132
-5
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package grpc
18+
19+
import (
20+
"context"
21+
22+
gotimerate "golang.org/x/time/rate"
23+
"k8s.io/klog/v2"
24+
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/codes"
27+
"google.golang.org/grpc/status"
28+
)
29+
30+
const (
31+
// DefaultQPS is determined by empirically reviewing known consumers of the API.
32+
// It's at least unlikely that there is a legitimate need to query podresources
33+
// more than 100 times per second, the other subsystems are not guaranteed to react
34+
// so fast in the first place.
35+
DefaultQPS = 100
36+
// DefaultBurstTokens is determined by empirically reviewing known consumers of the API.
37+
// See the documentation of DefaultQPS, same caveats apply.
38+
DefaultBurstTokens = 10
39+
)
40+
41+
var (
42+
ErrorLimitExceeded = status.Error(codes.ResourceExhausted, "rejected by rate limit")
43+
)
44+
45+
// Limiter defines the interface to perform request rate limiting,
46+
// based on the interface exposed by https://pkg.go.dev/golang.org/x/time/rate#Limiter
47+
type Limiter interface {
48+
// Allow reports whether an event may happen now.
49+
Allow() bool
50+
}
51+
52+
// LimiterUnaryServerInterceptor returns a new unary server interceptors that performs request rate limiting.
53+
func LimiterUnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor {
54+
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
55+
if !limiter.Allow() {
56+
return nil, ErrorLimitExceeded
57+
}
58+
return handler(ctx, req)
59+
}
60+
}
61+
62+
func WithRateLimiter(qps, burstTokens int32) grpc.ServerOption {
63+
qpsVal := gotimerate.Limit(qps)
64+
burstVal := int(burstTokens)
65+
klog.InfoS("Setting rate limiting for podresources endpoint", "qps", qpsVal, "burstTokens", burstVal)
66+
return grpc.UnaryInterceptor(LimiterUnaryServerInterceptor(gotimerate.NewLimiter(qpsVal, burstVal)))
67+
}

pkg/kubelet/kubelet.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2751,7 +2751,15 @@ func (kl *Kubelet) ListenAndServePodResources() {
27512751
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
27522752
return
27532753
}
2754-
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager, kl.containerManager)
2754+
2755+
providers := server.PodResourcesProviders{
2756+
Pods: kl.podManager,
2757+
Devices: kl.containerManager,
2758+
Cpus: kl.containerManager,
2759+
Memory: kl.containerManager,
2760+
}
2761+
2762+
server.ListenAndServePodResources(socket, providers)
27552763
}
27562764

27572765
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.

pkg/kubelet/server/server.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ import (
7575
"k8s.io/kubernetes/pkg/features"
7676
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
7777
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
78+
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
7879
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
7980
"k8s.io/kubernetes/pkg/kubelet/cri/streaming"
8081
"k8s.io/kubernetes/pkg/kubelet/cri/streaming/portforward"
@@ -209,11 +210,20 @@ func ListenAndServeKubeletReadOnlyServer(
209210
}
210211
}
211212

213+
type PodResourcesProviders struct {
214+
Pods podresources.PodsProvider
215+
Devices podresources.DevicesProvider
216+
Cpus podresources.CPUsProvider
217+
Memory podresources.MemoryProvider
218+
}
219+
212220
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
213-
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider, cpusProvider podresources.CPUsProvider, memoryProvider podresources.MemoryProvider) {
214-
server := grpc.NewServer()
215-
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(podsProvider, devicesProvider))
216-
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider, cpusProvider, memoryProvider))
221+
func ListenAndServePodResources(socket string, providers PodResourcesProviders) {
222+
server := grpc.NewServer(podresourcesgrpc.WithRateLimiter(podresourcesgrpc.DefaultQPS, podresourcesgrpc.DefaultBurstTokens))
223+
224+
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(providers.Pods, providers.Devices))
225+
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(providers.Pods, providers.Devices, providers.Cpus, providers.Memory))
226+
217227
l, err := util.CreateListener(socket)
218228
if err != nil {
219229
klog.ErrorS(err, "Failed to create listener for podResources endpoint")

test/e2e_node/podresources_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package e2enode
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"os"
2324
"strings"
@@ -31,6 +32,7 @@ import (
3132
kubefeatures "k8s.io/kubernetes/pkg/features"
3233
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
3334
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
35+
podresourcesgrpc "k8s.io/kubernetes/pkg/kubelet/apis/podresources/grpc"
3436
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
3537
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
3638
"k8s.io/kubernetes/pkg/kubelet/util"
@@ -861,7 +863,47 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P
861863
ginkgo.By("Ensuring the metrics match the expectations a few more times")
862864
gomega.Consistently(ctx, getPodResourcesMetrics, 1*time.Minute, 15*time.Second).Should(matchResourceMetrics)
863865
})
866+
})
867+
868+
ginkgo.Context("with the builtin rate limit values", func() {
869+
ginkgo.It("should hit throttling when calling podresources List in a tight loop", func(ctx context.Context) {
870+
// ensure APIs have been called at least once
871+
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
872+
framework.ExpectNoError(err)
864873

874+
ginkgo.By("Connecting to the kubelet endpoint")
875+
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
876+
framework.ExpectNoError(err)
877+
defer conn.Close()
878+
879+
tries := podresourcesgrpc.DefaultQPS * 2 // This should also be greater than DefaultBurstTokens
880+
errs := []error{}
881+
882+
ginkgo.By(fmt.Sprintf("Issuing %d List() calls in a tight loop", tries))
883+
startTime := time.Now()
884+
for try := 0; try < tries; try++ {
885+
_, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
886+
errs = append(errs, err)
887+
}
888+
elapsed := time.Since(startTime)
889+
890+
ginkgo.By(fmt.Sprintf("Checking return codes for %d List() calls in %v", tries, elapsed))
891+
892+
framework.ExpectNoError(errs[0], "the first List() call unexpectedly failed with %v", errs[0])
893+
// we would expect (burst) successes and then (tries-burst) errors on a clean test environment running with
894+
// enough CPU power. CI is usually harsher. So we relax constraints, expecting at least _a_ failure, while
895+
// we are likely to get much more. But we can't predict yet how more we should expect, so we prefer to relax
896+
// constraints than to risk flakes at this stage.
897+
errLimitExceededCount := 0
898+
for _, err := range errs[1:] {
899+
if errors.Is(err, podresourcesgrpc.ErrorLimitExceeded) {
900+
errLimitExceededCount++
901+
}
902+
}
903+
gomega.Expect(errLimitExceededCount).ToNot(gomega.BeZero(), "never hit the rate limit trying %d calls in %v", tries, elapsed)
904+
905+
framework.Logf("got %d/%d rate limit errors, at least one needed, the more the better", errLimitExceededCount, tries)
906+
})
865907
})
866908
})
867909

0 commit comments

Comments
 (0)