Skip to content

Commit

Permalink
Store mastership term/master in Configuration store (#1642)
Browse files Browse the repository at this point in the history
* Store mastership term/master in Configuration store to prevent potential deletion of mastership state

* Refactor mastership controller to reconcile Configuration IDs instead of topo entity IDs to ensure masters are reconciled for all possible configurations
  • Loading branch information
kuujo authored Apr 7, 2023
1 parent 1a7f6e1 commit e087d6f
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 108 deletions.
54 changes: 19 additions & 35 deletions pkg/controller/configuration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,13 @@ func (r *Reconciler) reconcileConfiguration(ctx context.Context, config *configa
return controller.Result{}, err
}
log.Debugf("Target entity '%s' not found", config.TargetID)

// If the target was deleted after a master was already elected for it,
// SOUND THE ALARM! and revert back to the SYNCHRONIZING state.
log.Errorf("Mastership state lost for target '%s'. Future configuration changes may not be applicable!")
config.Status.State = configapi.ConfigurationStatus_UNKNOWN
config.Status.Mastership.Master = ""
config.Status.Mastership.Term = 0
if err := r.updateConfigurationStatus(ctx, config); err != nil {
return controller.Result{}, err
}
return controller.Result{}, nil
}

// Get the target configurable configuration
configurable := topoapi.Configurable{}
_ = target.GetAspect(&configurable)

// Get the target mastership state
mastership := topoapi.MastershipState{}
_ = target.GetAspect(&mastership)
mastershipTerm := configapi.MastershipTerm(mastership.Term)

// If the target is persistent, mark the configuration PERSISTED.
if configurable.Persistent {
if config.Status.State != configapi.ConfigurationStatus_PERSISTED {
Expand All @@ -120,25 +105,22 @@ func (r *Reconciler) reconcileConfiguration(ctx context.Context, config *configa
return controller.Result{}, nil
}

// If the mastership term has changed, update the configuration and mark it SYNCHRONIZING for the next term.
if mastershipTerm > config.Status.Mastership.Term {
log.Infof("Synchronizing Configuration '%s'", config.ID)
config.Status.State = configapi.ConfigurationStatus_SYNCHRONIZING
config.Status.Mastership.Master = mastership.NodeId
config.Status.Mastership.Term = mastershipTerm
if err := r.updateConfigurationStatus(ctx, config); err != nil {
return controller.Result{}, err
}
return controller.Result{}, nil
}

// If the configuration is not SYNCHRONIZING, skip synchronization.
if config.Status.State != configapi.ConfigurationStatus_SYNCHRONIZING {
// If the configuration term is greater than the applied term, set the state to SYNCHRONIZING
if config.Status.Mastership.Term > config.Status.Applied.Mastership.Term {
log.Infof("Configuration '%s' mastership term has increased, synchronizing...", config.ID)
config.Status.State = configapi.ConfigurationStatus_SYNCHRONIZING
if err := r.updateConfigurationStatus(ctx, config); err != nil {
return controller.Result{}, err
}
return controller.Result{}, nil
}
return controller.Result{}, nil
}

// If the master node ID is not set, skip reconciliation.
if mastership.NodeId == "" {
// If the master ID is not set, skip reconciliation.
if config.Status.Mastership.Master == "" {
log.Debugf("No master for target '%s'", config.TargetID)
return controller.Result{}, nil
}
Expand All @@ -147,6 +129,8 @@ func (r *Reconciler) reconcileConfiguration(ctx context.Context, config *configa
if config.Status.Applied.Index == 0 {
log.Infof("Skipping synchronization of Configuration '%s': no applied changes to synchronize", config.ID)
config.Status.State = configapi.ConfigurationStatus_SYNCHRONIZED
config.Status.Applied.Mastership.Master = config.Status.Mastership.Master
config.Status.Applied.Mastership.Term = config.Status.Mastership.Term
if err := r.updateConfigurationStatus(ctx, config); err != nil {
return controller.Result{}, err
}
Expand All @@ -155,10 +139,10 @@ func (r *Reconciler) reconcileConfiguration(ctx context.Context, config *configa

// If we've made it this far, we know there's a master relation.
// Get the relation and check whether this node is the source
relation, err := r.topo.Get(ctx, topoapi.ID(mastership.NodeId))
relation, err := r.topo.Get(ctx, topoapi.ID(config.Status.Mastership.Master))
if err != nil {
if !errors.IsNotFound(err) {
log.Errorf("Failed fetching master Relation '%s' from topo", mastership.NodeId, err)
log.Errorf("Failed fetching master Relation '%s' from topo", config.Status.Mastership.Master, err)
return controller.Result{}, err
}
log.Warnf("Master relation not found for target '%s'", config.TargetID)
Expand Down Expand Up @@ -200,7 +184,7 @@ func (r *Reconciler) reconcileConfiguration(ctx context.Context, config *configa
Id: "onos-config",
},
ElectionId: &gnmi_ext.Uint128{
Low: uint64(mastershipTerm),
Low: uint64(config.Status.Mastership.Term),
},
},
},
Expand All @@ -214,7 +198,7 @@ func (r *Reconciler) reconcileConfiguration(ctx context.Context, config *configa
// Rather than reverting to the STALE state now, wait for this node to see the mastership state change
// to avoid flapping between states while the system converges.
if errors.IsForbidden(err) {
log.Warnf("Configuration '%s' mastership superseded for term %d", config.ID, mastershipTerm)
log.Warnf("Configuration '%s' mastership superseded for term %d", config.ID, config.Status.Mastership.Term)
return controller.Result{}, nil
}
log.Errorf("Failed sending SetRequest %+v", setRequest, err)
Expand All @@ -225,8 +209,8 @@ func (r *Reconciler) reconcileConfiguration(ctx context.Context, config *configa

// Update the configuration state and path statuses
log.Infof("Configuration '%s' synchronization complete", config.ID)
config.Status.Applied.Mastership.Master = mastership.NodeId
config.Status.Applied.Mastership.Term = mastershipTerm
config.Status.Applied.Mastership.Master = config.Status.Mastership.Master
config.Status.Applied.Mastership.Term = config.Status.Mastership.Term
config.Status.State = configapi.ConfigurationStatus_SYNCHRONIZED
if err := r.updateConfigurationStatus(ctx, config); err != nil {
return controller.Result{}, err
Expand Down
52 changes: 24 additions & 28 deletions pkg/controller/mastership/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package mastership

import (
"context"
configapi "github.com/onosproject/onos-api/go/onos/config/v2"
"github.com/onosproject/onos-config/pkg/controller/utils"
"github.com/onosproject/onos-config/pkg/store/configuration"
"math/rand"
"time"

Expand All @@ -24,36 +26,38 @@ const defaultTimeout = 30 * time.Second
var log = logging.GetLogger("controller", "mastership")

// NewController returns a new mastership controller
func NewController(topo topo.Store) *controller.Controller {
func NewController(topo topo.Store, configurations configuration.Store) *controller.Controller {
c := controller.NewController("mastership")
c.Watch(&TopoWatcher{
topo: topo,
})

c.Watch(&ConfigurationStoreWatcher{
configurations: configurations,
})
c.Reconcile(&Reconciler{
topo: topo,
topo: topo,
configurations: configurations,
})
return c
}

// Reconciler is mastership reconciler
type Reconciler struct {
topo topo.Store
topo topo.Store
configurations configuration.Store
}

// Reconcile reconciles the mastership state for a gnmi target
func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()

targetID := id.Value.(topoapi.ID)
log.Infof("Reconciling mastership election for the gNMI target %s", targetID)
targetEntity, err := r.topo.Get(ctx, targetID)
configID := id.Value.(configapi.ConfigurationID)
config, err := r.configurations.Get(ctx, configID)
if err != nil {
if errors.IsNotFound(err) {
return controller.Result{}, nil
}
log.Warnf("Failed to reconcile mastership election for the gNMI target with ID %s: %s", targetEntity.ID, err)
return controller.Result{}, err
}

Expand All @@ -66,25 +70,23 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) {
},
})
if err != nil {
log.Warnf("Updating MastershipState for target '%s' failed: %v", targetEntity.GetID(), err)
log.Warnf("Updating MastershipState for Configuration '%s' failed: %v", config.ID, err)
return controller.Result{}, err
}
targetRelations := make(map[topoapi.ID]topoapi.Object)
for _, object := range objects {
if object.GetRelation().TgtEntityID == targetID {
if object.GetRelation().TgtEntityID == topoapi.ID(config.TargetID) {
targetRelations[object.ID] = object
}
}

mastership := &topoapi.MastershipState{}
_ = targetEntity.GetAspect(mastership)
if _, ok := targetRelations[topoapi.ID(mastership.NodeId)]; !ok {
if _, ok := targetRelations[topoapi.ID(config.Status.Mastership.Master)]; !ok {
if len(targetRelations) == 0 {
if mastership.NodeId == "" {
if config.Status.Mastership.Master == "" {
return controller.Result{}, nil
}
log.Infof("Master in term %d resigned for the gNMI target '%s'", mastership.Term, targetEntity.GetID())
mastership.NodeId = ""
log.Infof("Master in term %d resigned for Configuration '%s'", config.Status.Mastership.Term, config.ID)
config.Status.Mastership.Master = ""
} else {
// Select a random master to assign to the gnmi target
relations := make([]topoapi.Object, 0, len(targetRelations))
Expand All @@ -94,22 +96,16 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) {
relation := relations[rand.Intn(len(relations))]

// Increment the mastership term and assign the selected master
mastership.Term++
mastership.NodeId = string(relation.ID)
log.Infof("Elected new master '%s' in term %d for the gNMI target '%s'", mastership.NodeId, mastership.Term, targetEntity.GetID())
}

err = targetEntity.SetAspect(mastership)
if err != nil {
log.Warnf("Updating MastershipState for gNMI target '%s' failed: %v", targetEntity.GetID(), err)
return controller.Result{}, err
config.Status.Mastership.Term++
config.Status.Mastership.Master = string(relation.ID)
log.Infof("Elected new master '%s' in term %d for Configuration '%s'", config.Status.Mastership.Master, config.Status.Mastership.Term, config.ID)
}

// Update the gNMI target entity
err = r.topo.Update(ctx, targetEntity)
// Update the Configuration status
err = r.configurations.UpdateStatus(ctx, config)
if err != nil {
if !errors.IsNotFound(err) && !errors.IsConflict(err) {
log.Warnf("Updating MastershipState for gNMI target '%s' failed: %v", targetEntity.GetID(), err)
log.Warnf("Updating mastership for Configuration '%s' failed: %v", config.ID, err)
return controller.Result{}, err
}
return controller.Result{}, nil
Expand Down
110 changes: 95 additions & 15 deletions pkg/controller/mastership/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package mastership

import (
"context"
configapi "github.com/onosproject/onos-api/go/onos/config/v2"
"github.com/onosproject/onos-config/pkg/store/configuration"
"github.com/onosproject/onos-lib-go/pkg/errors"
"sync"

"github.com/onosproject/onos-config/pkg/store/topo"
Expand Down Expand Up @@ -42,29 +45,64 @@ func (w *TopoWatcher) Start(ch chan<- controller.ID) error {
w.cancel = cancel

go func() {
defer close(ch)
for event := range eventCh {
log.Debugf("Received topo event '%s'", event.Object.ID)
if relation, ok := event.Object.Obj.(*topoapi.Object_Relation); ok &&
relation.Relation.KindID == topoapi.CONTROLS {
srcEntity, err := w.topo.Get(ctx, relation.Relation.SrcEntityID)
switch e := event.Object.Obj.(type) {
// If the event is a relation change and the source is ONOS_CONFIG kind, enqueue the target
// Configuration to be reconciled.
case *topoapi.Object_Relation:
// Get the source entity
srcEntity, err := w.topo.Get(ctx, e.Relation.SrcEntityID)
if err != nil {
log.Warn(err)
} else if srcEntity.GetEntity().KindID == topoapi.ONOS_CONFIG {
ch <- controller.NewID(relation.Relation.TgtEntityID)
if !errors.IsNotFound(err) {
log.Warn(err)
}
continue
}

}
if _, ok := event.Object.Obj.(*topoapi.Object_Entity); ok {
// If the entity object has configurable aspect then the controller
// can make a connection to it
err = event.Object.GetAspect(&topoapi.Configurable{})
if err == nil {
ch <- controller.NewID(event.Object.ID)
// Check that the source is ONOS_CONFIG kind
if srcEntity.GetEntity().KindID != topoapi.ONOS_CONFIG {
continue
}

// Get the target entity
tgtEntity, err := w.topo.Get(ctx, e.Relation.TgtEntityID)
if err != nil {
if !errors.IsNotFound(err) {
log.Warn(err)
}
continue
}
}

// Check that the target has the Configurable aspect
configurable := &topoapi.Configurable{}
if err := tgtEntity.GetAspect(configurable); err != nil {
continue
}

// Enqueue the associated Configuration for reconciliation
ch <- controller.NewID(configuration.NewID(
configapi.TargetID(tgtEntity.ID),
configapi.TargetType(configurable.Type),
configapi.TargetVersion(configurable.Version)))

// If the event is an entity change and the entity has a Configurable aspect, enqueue the
// associated Configuration for reconciliation.
case *topoapi.Object_Entity:
// Check that the entity has the Configurable aspect
configurable := &topoapi.Configurable{}
if err := event.Object.GetAspect(configurable); err != nil {
continue
}

// Enqueue the associated Configuration for reconciliation
ch <- controller.NewID(configuration.NewID(
configapi.TargetID(event.Object.ID),
configapi.TargetType(configurable.Type),
configapi.TargetVersion(configurable.Version)))
}
}
close(ch)
}()

return nil
Expand All @@ -79,3 +117,45 @@ func (w *TopoWatcher) Stop() {
}
w.mu.Unlock()
}

// ConfigurationStoreWatcher configuration store watcher
type ConfigurationStoreWatcher struct {
configurations configuration.Store
cancel context.CancelFunc
mu sync.Mutex
}

// Start starts the watcher
func (w *ConfigurationStoreWatcher) Start(ch chan<- controller.ID) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.cancel != nil {
return nil
}

eventCh := make(chan configapi.ConfigurationEvent, queueSize)
ctx, cancel := context.WithCancel(context.Background())

err := w.configurations.Watch(ctx, eventCh, configuration.WithReplay())
if err != nil {
cancel()
return err
}
w.cancel = cancel
go func() {
for event := range eventCh {
ch <- controller.NewID(event.Configuration.ID)
}
}()
return nil
}

// Stop stops the watcher
func (w *ConfigurationStoreWatcher) Stop() {
w.mu.Lock()
if w.cancel != nil {
w.cancel()
w.cancel = nil
}
w.mu.Unlock()
}
Loading

0 comments on commit e087d6f

Please sign in to comment.