Skip to content
This repository was archived by the owner on Mar 28, 2020. It is now read-only.

fix use workqueue. nothing should block event handler(#1410) #2151

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func run(ctx context.Context) {
startChaos(context.Background(), cfg.KubeCli, cfg.Namespace, chaosLevel)

c := controller.New(cfg)
err := c.Start()
err := c.Start(ctx)
logrus.Fatalf("controller Start() failed: %v", err)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

var initRetryWaitTime = 30 * time.Second
Expand All @@ -40,6 +42,11 @@ type Controller struct {
logger *logrus.Entry
Config

// k8s workqueue pattern
indexer cache.Indexer
informer cache.Controller
queue workqueue.RateLimitingInterface

clusters map[string]*cluster.Cluster
}

Expand Down
87 changes: 37 additions & 50 deletions pkg/controller/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

// TODO: get rid of this once we use workqueue
Expand All @@ -36,7 +37,7 @@ func init() {
pt = newPanicTimer(time.Minute, "unexpected long blocking (> 1 Minute) when handling cluster event")
}

func (c *Controller) Start() error {
func (c *Controller) Start(ctx context.Context) error {
// TODO: get rid of this init code. CRD and storage class will be managed outside of operator.
for {
err := c.initResource()
Expand All @@ -49,11 +50,12 @@ func (c *Controller) Start() error {
}

probe.SetReady()
c.run()
panic("unreachable")
go c.run(ctx)
<-ctx.Done()
return ctx.Err()
}

func (c *Controller) run() {
func (c *Controller) run(ctx context.Context) {
var ns string
if c.Config.ClusterWide {
ns = metav1.NamespaceAll
Expand All @@ -67,15 +69,29 @@ func (c *Controller) run() {
ns,
fields.Everything())

_, informer := cache.NewIndexerInformer(source, &api.EtcdCluster{}, 0, cache.ResourceEventHandlerFuncs{
c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "etcd-operator")
c.indexer, c.informer = cache.NewIndexerInformer(source, &api.EtcdCluster{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: c.onAddEtcdClus,
UpdateFunc: c.onUpdateEtcdClus,
DeleteFunc: c.onDeleteEtcdClus,
}, cache.Indexers{})

ctx := context.TODO()
// TODO: use workqueue to avoid blocking
informer.Run(ctx.Done())
defer c.queue.ShutDown()

c.logger.Info("starting etcd controller")
go c.informer.Run(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced) {
return
}

const numWorkers = 1
for i := 0; i < numWorkers; i++ {
go wait.Until(c.runWorker, time.Second, ctx.Done())
}

<-ctx.Done()
c.logger.Info("stopping etcd controller")
}

func (c *Controller) initResource() error {
Expand All @@ -89,56 +105,27 @@ func (c *Controller) initResource() error {
}

func (c *Controller) onAddEtcdClus(obj interface{}) {
c.syncEtcdClus(obj.(*api.EtcdCluster))
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
panic(err)
}
c.queue.Add(key)
}

func (c *Controller) onUpdateEtcdClus(oldObj, newObj interface{}) {
c.syncEtcdClus(newObj.(*api.EtcdCluster))
}

func (c *Controller) onDeleteEtcdClus(obj interface{}) {
clus, ok := obj.(*api.EtcdCluster)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
panic(fmt.Sprintf("unknown object from EtcdCluster delete event: %#v", obj))
}
clus, ok = tombstone.Obj.(*api.EtcdCluster)
if !ok {
panic(fmt.Sprintf("Tombstone contained object that is not an EtcdCluster: %#v", obj))
}
}
ev := &Event{
Type: kwatch.Deleted,
Object: clus,
}

pt.start()
_, err := c.handleClusterEvent(ev)
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
c.logger.Warningf("fail to handle event: %v", err)
panic(err)
}
pt.stop()
c.queue.Add(key)
}

func (c *Controller) syncEtcdClus(clus *api.EtcdCluster) {
ev := &Event{
Type: kwatch.Added,
Object: clus,
}
// re-watch or restart could give ADD event.
// If for an ADD event the cluster spec is invalid then it is not added to the local cache
// so modifying that cluster will result in another ADD event
if _, ok := c.clusters[getNamespacedName(clus)]; ok {
ev.Type = kwatch.Modified
}

pt.start()
_, err := c.handleClusterEvent(ev)
func (c *Controller) onDeleteEtcdClus(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Warningf("fail to handle event: %v", err)
panic(err)
}
pt.stop()
c.queue.Add(key)
}

func (c *Controller) managed(clus *api.EtcdCluster) bool {
Expand Down
113 changes: 113 additions & 0 deletions pkg/controller/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package controller

import (
api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
kwatch "k8s.io/apimachinery/pkg/watch"
)

const (
// Copy from deployment_controller.go:
// maxRetries is the number of times a etcd backup will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// an etcd backup is going to be requeued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)

func (c *Controller) runWorker() {
for c.processNextItem() {
}
}

func (c *Controller) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}

// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)
err := c.processItem(key.(string))
c.handleErr(err, key)
return true
}

func (c *Controller) processItem(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
return err
}

if !exists {
if _, ok := c.clusters[key]; !ok {
return nil
}
c.clusters[key].Delete()
delete(c.clusters, key)
clustersDeleted.Inc()
clustersTotal.Dec()
return nil
}

clus := obj.(*api.EtcdCluster)

ev := &Event{
Type: kwatch.Added,
Object: clus,
}

if clus.DeletionTimestamp != nil {
ev.Type = kwatch.Deleted

pt.start()
_, err := c.handleClusterEvent(ev)
if err != nil {
c.logger.Warningf("fail to handle event: %v", err)
}
pt.stop()
}

// re-watch or restart could give ADD event.
// If for an ADD event the cluster spec is invalid then it is not added to the local cache
// so modifying that cluster will result in another ADD event
if _, ok := c.clusters[getNamespacedName(clus)]; ok {
ev.Type = kwatch.Modified
}

pt.start()
_, err = c.handleClusterEvent(ev)
if err != nil {
c.logger.Warningf("fail to handle event: %v", err)
return err
}
pt.stop()
return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
c.queue.Forget(key)
return
}

// This controller retries maxRetries times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < maxRetries {
c.logger.Errorf("error syncing etcd cluster (%v): %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}

c.queue.Forget(key)
// Report that, even after several retries, we could not successfully process this key
c.logger.Infof("Dropping etcd cluster(%v) out of the queue: %v", key, err)
}