Skip to content

Commit 9808173

Browse files
authored
Merge pull request kubernetes-sigs#675 from konryd/leaserollout
KNP agent should pick the larger of the two server counts
2 parents 23b100b + 1b41692 commit 9808173

File tree

5 files changed

+154
-7
lines changed

5 files changed

+154
-7
lines changed

cmd/agent/app/options/options.go

+11
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ type GrpcProxyAgentOptions struct {
8989
LeaseNamespace string
9090
// Labels on which lease objects are managed.
9191
LeaseLabel string
92+
// ServerCountSource describes how server counts should be combined.
93+
ServerCountSource string
9294
// Path to kubeconfig (used by kubernetes client for lease listing)
9395
KubeconfigPath string
9496
// Content type of requests sent to apiserver.
@@ -108,6 +110,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
108110
WarnOnChannelLimit: o.WarnOnChannelLimit,
109111
SyncForever: o.SyncForever,
110112
XfrChannelSize: o.XfrChannelSize,
113+
ServerCountSource: o.ServerCountSource,
111114
}
112115
}
113116

@@ -138,6 +141,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
138141
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
139142
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.")
140143
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
144+
flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.")
141145
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
142146
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
143147
return flags
@@ -168,6 +172,7 @@ func (o *GrpcProxyAgentOptions) Print() {
168172
klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases)
169173
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
170174
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
175+
klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource)
171176
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
172177
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
173178
}
@@ -232,6 +237,11 @@ func (o *GrpcProxyAgentOptions) Validate() error {
232237
return err
233238
}
234239
}
240+
if o.ServerCountSource != "" {
241+
if o.ServerCountSource != "default" && o.ServerCountSource != "max" {
242+
return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource)
243+
}
244+
}
235245

236246
return nil
237247
}
@@ -281,6 +291,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
281291
CountServerLeases: false,
282292
LeaseNamespace: "kube-system",
283293
LeaseLabel: "k8s-app=konnectivity-server",
294+
ServerCountSource: "default",
284295
KubeconfigPath: "",
285296
APIContentType: runtime.ContentTypeProtobuf,
286297
}

cmd/agent/app/options/options_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ func TestValidate(t *testing.T) {
156156
fieldMap: map[string]interface{}{"XfrChannelSize": -10},
157157
expected: fmt.Errorf("channel size -10 must be greater than 0"),
158158
},
159+
"ServerCountSource": {
160+
fieldMap: map[string]interface{}{"ServerCountSource": "foobar"},
161+
expected: fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got foobar"),
162+
},
159163
} {
160164
t.Run(desc, func(t *testing.T) {
161165
testAgentOptions := NewGrpcProxyAgentOptions()

pkg/agent/clientset.go

+42-7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ import (
3030
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
3131
)
3232

33+
const (
34+
fromResponses = "KNP server response headers"
35+
fromLeases = "KNP lease count"
36+
fromFallback = "fallback to 1"
37+
)
38+
3339
// ClientSet consists of clients connected to each instance of an HA proxy server.
3440
type ClientSet struct {
3541
mu sync.Mutex //protects the clients.
@@ -39,7 +45,7 @@ type ClientSet struct {
3945
agentID string // ID of this agent
4046
address string // proxy server address. Assuming HA proxy server
4147

42-
leaseCounter *ServerLeaseCounter // counts number of proxy server leases
48+
leaseCounter ServerCounter // counts number of proxy server leases
4349
lastReceivedServerCount int // last server count received from a proxy server
4450
lastServerCount int // last server count value from either lease system or proxy server, former takes priority
4551

@@ -68,6 +74,7 @@ type ClientSet struct {
6874
xfrChannelSize int
6975

7076
syncForever bool // Continue syncing (support dynamic server count).
77+
serverCountSource string
7178
}
7279

7380
func (cs *ClientSet) ClientsCount() int {
@@ -147,7 +154,8 @@ type ClientSetConfig struct {
147154
WarnOnChannelLimit bool
148155
SyncForever bool
149156
XfrChannelSize int
150-
ServerLeaseCounter *ServerLeaseCounter
157+
ServerLeaseCounter ServerCounter
158+
ServerCountSource string
151159
}
152160

153161
func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
@@ -167,6 +175,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
167175
xfrChannelSize: cc.XfrChannelSize,
168176
stopCh: stopCh,
169177
leaseCounter: cc.ServerLeaseCounter,
178+
serverCountSource: cc.ServerCountSource,
170179
}
171180
}
172181

@@ -218,15 +227,41 @@ func (cs *ClientSet) sync() {
218227
}
219228

220229
func (cs *ClientSet) ServerCount() int {
230+
221231
var serverCount int
222-
if cs.leaseCounter != nil {
223-
serverCount = cs.leaseCounter.Count()
224-
} else {
225-
serverCount = cs.lastReceivedServerCount
232+
var countSourceLabel string
233+
234+
switch cs.serverCountSource {
235+
case "", "default":
236+
if cs.leaseCounter != nil {
237+
serverCount = cs.leaseCounter.Count()
238+
countSourceLabel = fromLeases
239+
} else {
240+
serverCount = cs.lastReceivedServerCount
241+
countSourceLabel = fromResponses
242+
}
243+
case "max":
244+
countFromLeases := 0
245+
if cs.leaseCounter != nil {
246+
countFromLeases = cs.leaseCounter.Count()
247+
}
248+
countFromResponses := cs.lastReceivedServerCount
249+
250+
serverCount = countFromLeases
251+
countSourceLabel = fromLeases
252+
if countFromResponses > serverCount {
253+
serverCount = countFromResponses
254+
countSourceLabel = fromResponses
255+
}
256+
if serverCount == 0 {
257+
serverCount = 1
258+
countSourceLabel = fromFallback
259+
}
260+
226261
}
227262

228263
if serverCount != cs.lastServerCount {
229-
klog.Warningf("change detected in proxy server count (was: %d, now: %d)", cs.lastServerCount, serverCount)
264+
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
230265
cs.lastServerCount = serverCount
231266
}
232267

pkg/agent/clientset_test.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agent
18+
19+
import (
20+
"testing"
21+
)
22+
23+
type FakeServerCounter struct {
24+
count int
25+
}
26+
27+
func (f *FakeServerCounter) Count() int {
28+
return f.count
29+
}
30+
31+
func TestServerCount(t *testing.T) {
32+
testCases := []struct{
33+
name string
34+
serverCountSource string
35+
leaseCounter ServerCounter
36+
responseCount int
37+
want int
38+
} {
39+
{
40+
name: "higher from response",
41+
serverCountSource: "max",
42+
responseCount: 42,
43+
leaseCounter: &FakeServerCounter{24},
44+
want: 42,
45+
},
46+
{
47+
name: "higher from leases",
48+
serverCountSource: "max",
49+
responseCount: 3,
50+
leaseCounter: &FakeServerCounter{6},
51+
want: 6,
52+
},
53+
{
54+
name: "both zero",
55+
serverCountSource: "max",
56+
responseCount: 0,
57+
leaseCounter: &FakeServerCounter{0},
58+
want: 1,
59+
},
60+
61+
{
62+
name: "response picked by default when no lease counter",
63+
serverCountSource: "default",
64+
responseCount: 3,
65+
leaseCounter: nil,
66+
want: 3,
67+
},
68+
{
69+
name: "lease counter always picked when present",
70+
serverCountSource: "default",
71+
responseCount: 6,
72+
leaseCounter: &FakeServerCounter{3},
73+
want: 3,
74+
},
75+
}
76+
77+
for _, tc := range testCases {
78+
t.Run(tc.name, func(t *testing.T) {
79+
80+
cs := &ClientSet{
81+
clients: make(map[string]*Client),
82+
leaseCounter: tc.leaseCounter,
83+
serverCountSource: tc.serverCountSource,
84+
85+
}
86+
cs.lastReceivedServerCount = tc.responseCount
87+
if got := cs.ServerCount(); got != tc.want {
88+
t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want)
89+
}
90+
})
91+
}
92+
93+
}

pkg/agent/lease_counter.go

+4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ import (
3636
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
3737
)
3838

39+
type ServerCounter interface {
40+
Count() int
41+
}
42+
3943
// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
4044
// current proxy server count.
4145
type ServerLeaseCounter struct {

0 commit comments

Comments
 (0)