Skip to content

Commit 7e6a856

Browse files
author
k8s-merge-robot
authored
Merge pull request kubernetes#28843 from gmarek/limiterPerZone
Automatic merge from submit-queue Separate rate limiters for Pod evictions for different zones in NodeController Ref. kubernetes#28832 NodeController needs to be able to separately adjust rate-limits for eviction for different zones. This PR splits rate limiters. cc @davidopp
2 parents af1891c + 5677a98 commit 7e6a856

File tree

5 files changed

+184
-81
lines changed

5 files changed

+184
-81
lines changed

cmd/kube-controller-manager/app/controllermanager.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ import (
7272
"k8s.io/kubernetes/pkg/serviceaccount"
7373
"k8s.io/kubernetes/pkg/util/configz"
7474
"k8s.io/kubernetes/pkg/util/crypto"
75-
"k8s.io/kubernetes/pkg/util/flowcontrol"
7675
"k8s.io/kubernetes/pkg/util/wait"
7776

7877
"github.com/golang/glog"
@@ -239,8 +238,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
239238
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
240239
}
241240
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
242-
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
243-
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
241+
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
244242
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
245243
nodeController.Run(s.NodeSyncPeriod.Duration)
246244
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

contrib/mesos/pkg/controllermanager/controllermanager.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ import (
5959
quotainstall "k8s.io/kubernetes/pkg/quota/install"
6060
"k8s.io/kubernetes/pkg/serviceaccount"
6161
"k8s.io/kubernetes/pkg/util/crypto"
62-
"k8s.io/kubernetes/pkg/util/flowcontrol"
6362
"k8s.io/kubernetes/pkg/util/wait"
6463

6564
"k8s.io/kubernetes/contrib/mesos/pkg/profile"
@@ -155,8 +154,7 @@ func (s *CMServer) Run(_ []string) error {
155154
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
156155
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
157156
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
158-
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
159-
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
157+
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
160158
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
161159
nodeController.Run(s.NodeSyncPeriod.Duration)
162160

pkg/controller/node/nodecontroller.go

+77-50
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,12 @@ type nodeStatusData struct {
7373
}
7474

7575
type NodeController struct {
76-
allocateNodeCIDRs bool
77-
cloud cloudprovider.Interface
78-
clusterCIDR *net.IPNet
79-
serviceCIDR *net.IPNet
80-
deletingPodsRateLimiter flowcontrol.RateLimiter
81-
knownNodeSet map[string]*api.Node
82-
kubeClient clientset.Interface
76+
allocateNodeCIDRs bool
77+
cloud cloudprovider.Interface
78+
clusterCIDR *net.IPNet
79+
serviceCIDR *net.IPNet
80+
knownNodeSet map[string]*api.Node
81+
kubeClient clientset.Interface
8382
// Method for easy mocking in unittest.
8483
lookupIP func(host string) ([]net.IP, error)
8584
// Value used if sync_nodes_status=False. NodeController will not proactively
@@ -112,9 +111,11 @@ type NodeController struct {
112111
// Lock to access evictor workers
113112
evictorLock *sync.Mutex
114113
// workers that evicts pods from unresponsive nodes.
115-
podEvictor *RateLimitedTimedQueue
116-
terminationEvictor *RateLimitedTimedQueue
117-
podEvictionTimeout time.Duration
114+
zonePodEvictor map[string]*RateLimitedTimedQueue
115+
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
116+
evictionLimiterQPS float32
117+
evictionLimiterBurst int
118+
podEvictionTimeout time.Duration
118119
// The maximum duration before a pod evicted from a node can be forcefully terminated.
119120
maximumGracePeriod time.Duration
120121
recorder record.EventRecorder
@@ -142,8 +143,8 @@ func NewNodeController(
142143
cloud cloudprovider.Interface,
143144
kubeClient clientset.Interface,
144145
podEvictionTimeout time.Duration,
145-
deletionEvictionLimiter flowcontrol.RateLimiter,
146-
terminationEvictionLimiter flowcontrol.RateLimiter,
146+
evictionLimiterQPS float32,
147+
evictionLimiterBurst int,
147148
nodeMonitorGracePeriod time.Duration,
148149
nodeStartupGracePeriod time.Duration,
149150
nodeMonitorPeriod time.Duration,
@@ -184,8 +185,8 @@ func NewNodeController(
184185
podEvictionTimeout: podEvictionTimeout,
185186
maximumGracePeriod: 5 * time.Minute,
186187
evictorLock: &evictorLock,
187-
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
188-
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
188+
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
189+
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
189190
nodeStatusMap: make(map[string]nodeStatusData),
190191
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
191192
nodeMonitorPeriod: nodeMonitorPeriod,
@@ -198,6 +199,8 @@ func NewNodeController(
198199
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
199200
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
200201
computeZoneStateFunc: ComputeZoneState,
202+
evictionLimiterQPS: evictionLimiterQPS,
203+
evictionLimiterBurst: evictionLimiterBurst,
201204
zoneStates: make(map[string]zoneState),
202205
}
203206

@@ -309,45 +312,49 @@ func (nc *NodeController) Run(period time.Duration) {
309312
go wait.Until(func() {
310313
nc.evictorLock.Lock()
311314
defer nc.evictorLock.Unlock()
312-
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
313-
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
314-
if err != nil {
315-
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
316-
return false, 0
317-
}
315+
for k := range nc.zonePodEvictor {
316+
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
317+
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
318+
if err != nil {
319+
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
320+
return false, 0
321+
}
318322

319-
if remaining {
320-
nc.terminationEvictor.Add(value.Value)
321-
}
322-
return true, 0
323-
})
323+
if remaining {
324+
nc.zoneTerminationEvictor[k].Add(value.Value)
325+
}
326+
return true, 0
327+
})
328+
}
324329
}, nodeEvictionPeriod, wait.NeverStop)
325330

326331
// TODO: replace with a controller that ensures pods that are terminating complete
327332
// in a particular time period
328333
go wait.Until(func() {
329334
nc.evictorLock.Lock()
330335
defer nc.evictorLock.Unlock()
331-
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
332-
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
333-
if err != nil {
334-
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
335-
return false, 0
336-
}
336+
for k := range nc.zoneTerminationEvictor {
337+
nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
338+
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
339+
if err != nil {
340+
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
341+
return false, 0
342+
}
337343

338-
if completed {
339-
glog.V(2).Infof("All pods terminated on %s", value.Value)
340-
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
341-
return true, 0
342-
}
344+
if completed {
345+
glog.V(2).Infof("All pods terminated on %s", value.Value)
346+
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
347+
return true, 0
348+
}
343349

344-
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
345-
// clamp very short intervals
346-
if remaining < nodeEvictionPeriod {
347-
remaining = nodeEvictionPeriod
348-
}
349-
return false, remaining
350-
})
350+
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
351+
// clamp very short intervals
352+
if remaining < nodeEvictionPeriod {
353+
remaining = nodeEvictionPeriod
354+
}
355+
return false, remaining
356+
})
357+
}
351358
}, nodeEvictionPeriod, wait.NeverStop)
352359

353360
go wait.Until(func() {
@@ -372,8 +379,19 @@ func (nc *NodeController) monitorNodeStatus() error {
372379
for i := range added {
373380
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
374381
recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
375-
nc.cancelPodEviction(added[i])
376382
nc.knownNodeSet[added[i].Name] = added[i]
383+
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
384+
zone := utilnode.GetZoneKey(added[i])
385+
if _, found := nc.zonePodEvictor[zone]; !found {
386+
nc.zonePodEvictor[zone] =
387+
NewRateLimitedTimedQueue(
388+
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
389+
}
390+
if _, found := nc.zoneTerminationEvictor[zone]; !found {
391+
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
392+
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
393+
}
394+
nc.cancelPodEviction(added[i])
377395
}
378396

379397
for i := range deleted {
@@ -689,10 +707,11 @@ func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added,
689707
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
690708
// returns true if an eviction was queued.
691709
func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
710+
zone := utilnode.GetZoneKey(node)
692711
nc.evictorLock.Lock()
693712
defer nc.evictorLock.Unlock()
694-
wasDeleting := nc.podEvictor.Remove(node.Name)
695-
wasTerminating := nc.terminationEvictor.Remove(node.Name)
713+
wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
714+
wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name)
696715
if wasDeleting || wasTerminating {
697716
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
698717
return true
@@ -703,10 +722,18 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
703722
// evictPods queues an eviction for the provided node name, and returns false if the node is already
704723
// queued for eviction.
705724
func (nc *NodeController) evictPods(node *api.Node) bool {
706-
if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation {
707-
return false
708-
}
709725
nc.evictorLock.Lock()
710726
defer nc.evictorLock.Unlock()
711-
return nc.podEvictor.Add(node.Name)
727+
foundHealty := false
728+
for _, state := range nc.zoneStates {
729+
if state != stateFullSegmentation {
730+
foundHealty = true
731+
break
732+
}
733+
}
734+
if !foundHealty {
735+
return false
736+
}
737+
zone := utilnode.GetZoneKey(node)
738+
return nc.zonePodEvictor[zone].Add(node.Name)
712739
}

0 commit comments

Comments
 (0)