From ea5c55f1acf35866db54683b39a2a443007b2abe Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Fri, 7 Apr 2023 04:36:48 -0700 Subject: [PATCH] Store Configuration paths/values in separate Atomix map (#1606) * Modify Configuration store to store committed/applied paths/values in a separate Atomix map * Update Atomix Go SDK to v0.13.0 * Use Atomix Map transactions to update configuration paths/values * Prune Configuration values in Configuration store to remove redundant paths from store map * Update path/value when index is changed in Configuration store * Decrease log level of regular messages in controllers * Optimize Watch methods in stores to reduce primitive writes * Fix unit test failures * Remove unnecessary 'replace' statement from go.mod --- go.mod | 10 +- go.sum | 16 +- pkg/controller/configuration/controller.go | 2 +- pkg/controller/connection/controller.go | 2 +- pkg/controller/node/controller.go | 2 +- pkg/controller/proposal/controller.go | 9 +- pkg/controller/target/controller.go | 2 +- pkg/controller/transaction/controller.go | 2 +- pkg/controller/utils/utils.go | 3 +- pkg/controller/utils/utils_test.go | 2 +- pkg/store/configuration/configuration.go | 373 +++++++++++++++++---- pkg/store/transaction/store.go | 214 ++++++++---- 12 files changed, 485 insertions(+), 152 deletions(-) diff --git a/go.mod b/go.mod index f5622498f..645ee919a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/onosproject/onos-config go 1.19 require ( - github.com/atomix/go-sdk v0.12.7 + github.com/atomix/go-sdk v0.13.2 github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 @@ -26,9 +26,9 @@ require ( require ( github.com/Shopify/sarama v1.31.1 // indirect - github.com/atomix/atomix/api v0.9.2 // indirect - github.com/atomix/atomix/protocols/rsm v0.5.6 // indirect - github.com/atomix/atomix/runtime v0.9.0 // indirect + github.com/atomix/atomix/api v1.1.0 // indirect + github.com/atomix/atomix/protocols/rsm v1.1.0 // indirect + github.com/atomix/atomix/runtime v1.1.0 // indirect github.com/atomix/atomix/sidecar v0.4.4 // indirect github.com/bits-and-blooms/bitset v1.3.1 // indirect github.com/bits-and-blooms/bloom/v3 v3.3.1 // indirect @@ -87,5 +87,3 @@ require ( gopkg.in/square/go-jose.v2 v2.5.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/docker/docker => github.com/docker/engine v1.4.2-0.20200229013735-71373c6105e3 diff --git a/go.sum b/go.sum index 0b3887559..f0aca4f78 100644 --- a/go.sum +++ b/go.sum @@ -47,16 +47,16 @@ github.com/Shopify/sarama v1.31.1/go.mod h1:99E1xQ1Ql2bYcuJfwdXY3cE17W8+549Ty8PG github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/atomix/atomix/api v0.9.2 h1:BpT4zsXiKQWoIHQzGeCDQuC4BxcWvfrsMrPuUoSNLTI= -github.com/atomix/atomix/api v0.9.2/go.mod h1:Fz8zXQH6n28U0NTu5xctKhkNrN5RsWgX56lrMhqXlPg= -github.com/atomix/atomix/protocols/rsm v0.5.6 h1:55u/qIIUuLakypf1d1CQ5xxj/E720f6FRGDtv1VUOSs= -github.com/atomix/atomix/protocols/rsm v0.5.6/go.mod h1:SgkK3PVLqx1KmNjxw8OY7AY7T6t6TuPLmnP6Rr/oFAQ= -github.com/atomix/atomix/runtime v0.9.0 h1:yDViPymoOkR5gMJkk25CZP2Tjf2j1JMAWNd/I38ws8I= -github.com/atomix/atomix/runtime v0.9.0/go.mod h1:abB/WaP50Fm6MVi8CHRsfIOt8urDzcpQvMDFmhhlVgY= +github.com/atomix/atomix/api v1.1.0 h1:zUbuD4yPu+jBT8NkxvDKx+m8QiRqhVmFUMgRvQoC1Tc= +github.com/atomix/atomix/api v1.1.0/go.mod h1:Fz8zXQH6n28U0NTu5xctKhkNrN5RsWgX56lrMhqXlPg= +github.com/atomix/atomix/protocols/rsm v1.1.0 h1:IFsU/VqoFjjRWRc+ET0B0aYqMG3+oTzDwuiYhVbBQVo= +github.com/atomix/atomix/protocols/rsm v1.1.0/go.mod h1:TT5+SaXyrpS3CAHeGYftmi9DEybCsI8bZY/sFMJijTQ= +github.com/atomix/atomix/runtime v1.1.0 h1:K1fUQqfngOqkXTk+1CwbPLO9STQ6gUVe6qXLlg9ageM= +github.com/atomix/atomix/runtime v1.1.0/go.mod h1:7PtAhumBMs3TE3L/qXUSr4cNqqOWKGBFTvcHw+ZIQZ8= github.com/atomix/atomix/sidecar v0.4.4 h1:CjLPA3p1V83Tx2yxUGz4JLyQkeyXC8/1xXaiZVknJsY= github.com/atomix/atomix/sidecar v0.4.4/go.mod h1:psygbD10K+EdWS1XINexXP0QmiAd+yY2tGkIGqrVMHM= -github.com/atomix/go-sdk v0.12.7 h1:mNj44w930GMb6Sik60LbPhcS52LLqnWvQBVmSRT4jIU= -github.com/atomix/go-sdk v0.12.7/go.mod h1:Ua1SPP/5qms2DZrpVeH+RrIvbviIEExKEijlgZHMWng= +github.com/atomix/go-sdk v0.13.2 h1:KeCP4S84iCbQhq/Ci+3t9eYpKD5l57EfKaBVG46bcNI= +github.com/atomix/go-sdk v0.13.2/go.mod h1:AmNgqS80WqBj6AxdudXhMPhyEB+KW1kZIebWQpBgm9A= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/bits-and-blooms/bitset v1.3.1 h1:y+qrlmq3XsWi+xZqSaueaE8ry8Y127iMxlMfqcK8p0g= github.com/bits-and-blooms/bitset v1.3.1/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= diff --git a/pkg/controller/configuration/controller.go b/pkg/controller/configuration/controller.go index d5a648406..b4e52f7e2 100644 --- a/pkg/controller/configuration/controller.go +++ b/pkg/controller/configuration/controller.go @@ -71,7 +71,7 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) { return controller.Result{}, nil } - log.Infof("Reconciling Configuration '%s'", config.ID) + log.Debugf("Reconciling Configuration '%s'", config.ID) log.Debug(config) return r.reconcileConfiguration(ctx, config) } diff --git a/pkg/controller/connection/controller.go b/pkg/controller/connection/controller.go index 965201756..25993f45c 100644 --- a/pkg/controller/connection/controller.go +++ b/pkg/controller/connection/controller.go @@ -52,7 +52,7 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) { defer cancel() connID := id.Value.(gnmi.ConnID) - log.Infof("Reconciling Conn '%s'", connID) + log.Debugf("Reconciling Conn '%s'", connID) conn, ok := r.conns.Get(ctx, connID) if !ok { return r.deleteRelation(ctx, connID) diff --git a/pkg/controller/node/controller.go b/pkg/controller/node/controller.go index ce1124f83..5f8043afd 100644 --- a/pkg/controller/node/controller.go +++ b/pkg/controller/node/controller.go @@ -88,7 +88,7 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) { defer cancel() onosConfigID := id.Value.(topoapi.ID) - log.Infof("Reconciling onos-config entity with ID: %s", onosConfigID) + log.Debugf("Reconciling onos-config entity with ID: %s", onosConfigID) object, err := r.topo.Get(ctx, onosConfigID) if err == nil { // Reconciles an onos-config entity that’s not local so the controller should requeue diff --git a/pkg/controller/proposal/controller.go b/pkg/controller/proposal/controller.go index 3beec3fcd..04e536010 100644 --- a/pkg/controller/proposal/controller.go +++ b/pkg/controller/proposal/controller.go @@ -94,7 +94,7 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) { return controller.Result{}, nil } - log.Infof("Reconciling Proposal '%s'", proposal.ID) + log.Debugf("Reconciling Proposal '%s'", proposal.ID) log.Debug(proposal) return r.reconcileProposal(ctx, proposal) } @@ -456,12 +456,10 @@ func (r *Reconciler) reconcileCommit(ctx context.Context, proposal *configapi.Pr if config.Values == nil { config.Values = make(map[string]*configapi.PathValue) } - updatedChangeValues := controllerutils.AddDeleteChildren(changeValues, config.Values) + updatedChangeValues := controllerutils.AddDeleteChildren(proposal.TransactionIndex, changeValues, config.Values) for path, updatedChangeValue := range updatedChangeValues { _, _ = applyChangeToConfig(config.Values, path, updatedChangeValue) } - config.Values = tree.PrunePathMap(config.Values, true) - config.Status.Committed.Index = proposal.TransactionIndex err = r.configurations.Update(ctx, config) if err != nil { @@ -632,7 +630,7 @@ func (r *Reconciler) reconcileApply(ctx context.Context, proposal *configapi.Pro changeValues = proposal.Status.RollbackValues } - updatedChangeValues := controllerutils.AddDeleteChildren(changeValues, config.Values) + updatedChangeValues := controllerutils.AddDeleteChildren(proposal.TransactionIndex, changeValues, config.Values) // Create a list of PathValue pairs from which to construct a gNMI Set for the Proposal. pathValues := make([]*configapi.PathValue, 0, len(updatedChangeValues)) for _, changeValue := range updatedChangeValues { @@ -741,7 +739,6 @@ func (r *Reconciler) reconcileApply(ctx context.Context, proposal *configapi.Pro for path, changeValue := range updatedChangeValues { config.Status.Applied.Values[path] = changeValue } - config.Status.Applied.Values = tree.PrunePathMap(config.Status.Applied.Values, true) if err := r.configurations.UpdateStatus(ctx, config); err != nil { log.Warnf("Failed reconciling Transaction %d Proposal to target '%s'", proposal.TransactionIndex, proposal.TargetID, err) diff --git a/pkg/controller/target/controller.go b/pkg/controller/target/controller.go index fa9b1c7ec..d13fa575a 100644 --- a/pkg/controller/target/controller.go +++ b/pkg/controller/target/controller.go @@ -51,7 +51,7 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) { defer cancel() targetID := id.Value.(topoapi.ID) - log.Infof("Reconciling Target '%s'", targetID) + log.Debugf("Reconciling Target '%s'", targetID) target, err := r.topo.Get(ctx, targetID) if err != nil { if !errors.IsNotFound(err) { diff --git a/pkg/controller/transaction/controller.go b/pkg/controller/transaction/controller.go index 71d26d677..335e0ca1f 100644 --- a/pkg/controller/transaction/controller.go +++ b/pkg/controller/transaction/controller.go @@ -62,7 +62,7 @@ func (r *Reconciler) Reconcile(id controller.ID) (controller.Result, error) { return controller.Result{}, nil } - log.Infof("Reconciling Transaction %d", transaction.Index) + log.Debugf("Reconciling Transaction %d", transaction.Index) log.Debug(transaction) return r.reconcileTransaction(ctx, transaction) } diff --git a/pkg/controller/utils/utils.go b/pkg/controller/utils/utils.go index abb0cfe54..59fb461bb 100644 --- a/pkg/controller/utils/utils.go +++ b/pkg/controller/utils/utils.go @@ -23,7 +23,7 @@ func GetOnosConfigID() topoapi.ID { } // AddDeleteChildren adds all children of the intermediate path which is required to be deleted -func AddDeleteChildren(changeValues map[string]*configapi.PathValue, configStore map[string]*configapi.PathValue) map[string]*configapi.PathValue { +func AddDeleteChildren(index configapi.Index, changeValues map[string]*configapi.PathValue, configStore map[string]*configapi.PathValue) map[string]*configapi.PathValue { // defining new changeValues map, where we will include old changeValues map and new pathValues to be cascading deleted var updChangeValues = make(map[string]*configapi.PathValue) for _, changeValue := range changeValues { @@ -32,6 +32,7 @@ func AddDeleteChildren(changeValues map[string]*configapi.PathValue, configStore for _, value := range configStore { if strings.HasPrefix(value.Path, changeValue.Path) && !strings.EqualFold(value.Path, changeValue.Path) { updChangeValues[value.Path] = value + updChangeValues[value.Path].Index = index updChangeValues[value.Path].Deleted = true } } diff --git a/pkg/controller/utils/utils_test.go b/pkg/controller/utils/utils_test.go index 3156484a5..d50d44d88 100644 --- a/pkg/controller/utils/utils_test.go +++ b/pkg/controller/utils/utils_test.go @@ -107,7 +107,7 @@ func Test_CascadingDeleteAlgorithm(t *testing.T) { log.Infof("changeValues is\n%v", changeValues) // cascading delete algorithm - updatedChangeValues := AddDeleteChildren(changeValues, store) + updatedChangeValues := AddDeleteChildren(1, changeValues, store) log.Infof("updChangeValues is\n%v", updatedChangeValues) assert.Equal(t, 5, len(updatedChangeValues)) log.Infof("updChangeValues has %d PathValues to delete", len(updatedChangeValues)) diff --git a/pkg/store/configuration/configuration.go b/pkg/store/configuration/configuration.go index 48f72b81d..382d2a441 100644 --- a/pkg/store/configuration/configuration.go +++ b/pkg/store/configuration/configuration.go @@ -10,7 +10,10 @@ import ( "github.com/atomix/go-sdk/pkg/primitive" _map "github.com/atomix/go-sdk/pkg/primitive/map" "github.com/atomix/go-sdk/pkg/types" + "github.com/google/uuid" + "github.com/onosproject/onos-config/pkg/utils/tree" "io" + "sync" "time" configapi "github.com/onosproject/onos-api/go/onos/config/v2" @@ -59,9 +62,18 @@ func NewAtomixStore(client primitive.Client) (Store, error) { if err != nil { return nil, errors.FromAtomix(err) } - return &configurationStore{ + store := &configurationStore{ + client: client, configurations: configurations, - }, nil + committed: make(map[configapi.ConfigurationID]_map.Map[string, *configapi.PathValue]), + applied: make(map[configapi.ConfigurationID]_map.Map[string, *configapi.PathValue]), + watchers: make(map[uuid.UUID]chan<- configapi.ConfigurationEvent), + idWatchers: make(map[configapi.ConfigurationID]map[uuid.UUID]chan<- configapi.ConfigurationEvent), + } + if err := store.open(); err != nil { + return nil, err + } + return store, nil } type watchOptions struct { @@ -102,6 +114,86 @@ func WithConfigurationID(id configapi.ConfigurationID) WatchOption { type configurationStore struct { configurations _map.Map[configapi.ConfigurationID, *configapi.Configuration] + client primitive.Client + committed map[configapi.ConfigurationID]_map.Map[string, *configapi.PathValue] + applied map[configapi.ConfigurationID]_map.Map[string, *configapi.PathValue] + watchers map[uuid.UUID]chan<- configapi.ConfigurationEvent + idWatchers map[configapi.ConfigurationID]map[uuid.UUID]chan<- configapi.ConfigurationEvent + mu sync.RWMutex +} + +func (s *configurationStore) open() error { + events, err := s.configurations.Events(context.Background()) + if err != nil { + return err + } + go func() { + for { + event, err := events.Next() + if err == io.EOF { + break + } + if err != nil { + log.Error(err) + continue + } + + var configurationEvent configapi.ConfigurationEvent + switch e := event.(type) { + case *_map.Inserted[configapi.ConfigurationID, *configapi.Configuration]: + configuration := e.Entry.Value + configuration.Version = uint64(e.Entry.Version) + if err := s.populate(context.Background(), configuration); err != nil { + log.Error(err) + continue + } + configurationEvent = configapi.ConfigurationEvent{ + Type: configapi.ConfigurationEvent_CREATED, + Configuration: *configuration, + } + case *_map.Updated[configapi.ConfigurationID, *configapi.Configuration]: + configuration := e.Entry.Value + configuration.Version = uint64(e.Entry.Version) + if err := s.populate(context.Background(), configuration); err != nil { + log.Error(err) + continue + } + configurationEvent = configapi.ConfigurationEvent{ + Type: configapi.ConfigurationEvent_UPDATED, + Configuration: *configuration, + } + case *_map.Removed[configapi.ConfigurationID, *configapi.Configuration]: + configuration := e.Entry.Value + configuration.Version = uint64(e.Entry.Version) + if err := s.populate(context.Background(), configuration); err != nil { + log.Error(err) + continue + } + configurationEvent = configapi.ConfigurationEvent{ + Type: configapi.ConfigurationEvent_DELETED, + Configuration: *configuration, + } + } + + var watchers []chan<- configapi.ConfigurationEvent + s.mu.RLock() + for _, ch := range s.watchers { + watchers = append(watchers, ch) + } + idWatchers, ok := s.idWatchers[configurationEvent.Configuration.ID] + if ok { + for _, ch := range idWatchers { + watchers = append(watchers, ch) + } + } + s.mu.RUnlock() + + for _, ch := range watchers { + ch <- configurationEvent + } + } + }() + return nil } func (s *configurationStore) Get(ctx context.Context, id configapi.ConfigurationID) (*configapi.Configuration, error) { @@ -114,6 +206,10 @@ func (s *configurationStore) Get(ctx context.Context, id configapi.Configuration configuration := entry.Value configuration.Key = string(entry.Key) configuration.Version = uint64(entry.Version) + if err := s.populate(ctx, configuration); err != nil { + log.Error(err) + return nil, err + } return configuration, nil } @@ -131,10 +227,21 @@ func (s *configurationStore) Create(ctx context.Context, configuration *configap return errors.NewInvalid("cannot create configuration with version") } + if configuration.Values != nil { + committed, err := s.getCommitted(ctx, configuration.ID) + if err != nil { + return err + } + if err := s.store(ctx, committed, configuration.Values); err != nil { + return err + } + } + configuration.Key = string(configuration.ID) configuration.Revision = 1 configuration.Created = time.Now() configuration.Updated = time.Now() + configuration.Values = nil // Create the entry in the underlying map primitive. entry, err := s.configurations.Insert(ctx, configuration.ID, configuration) @@ -158,8 +265,20 @@ func (s *configurationStore) Update(ctx context.Context, configuration *configap if configuration.Version == 0 { return errors.NewInvalid("configuration must contain a version on update") } + + if configuration.Values != nil { + committed, err := s.getCommitted(ctx, configuration.ID) + if err != nil { + return err + } + if err := s.store(ctx, committed, configuration.Values); err != nil { + return err + } + } + configuration.Revision++ configuration.Updated = time.Now() + configuration.Values = nil // Update the entry in the underlying map primitive using the configuration version // as an optimistic lock. @@ -184,7 +303,19 @@ func (s *configurationStore) UpdateStatus(ctx context.Context, configuration *co if configuration.Version == 0 { return errors.NewInvalid("configuration must contain a version on update") } + + if configuration.Status.Applied.Values != nil { + applied, err := s.getApplied(ctx, configuration.ID) + if err != nil { + return err + } + if err := s.store(ctx, applied, configuration.Status.Applied.Values); err != nil { + return err + } + } + configuration.Updated = time.Now() + configuration.Status.Applied.Values = nil // Update the entry in the underlying map primitive using the configuration version // as an optimistic lock. @@ -214,6 +345,10 @@ func (s *configurationStore) List(ctx context.Context) ([]*configapi.Configurati } configuration := entry.Value configuration.Version = uint64(entry.Version) + if err := s.populate(ctx, configuration); err != nil { + log.Error(err) + return nil, err + } configurations = append(configurations, configuration) } } @@ -224,41 +359,69 @@ func (s *configurationStore) Watch(ctx context.Context, ch chan<- configapi.Conf opt.apply(&options) } - var eventsOpts []_map.EventsOption + id := uuid.New() + eventCh := make(chan configapi.ConfigurationEvent) + s.mu.Lock() if options.configurationID != "" { - eventsOpts = append(eventsOpts, _map.WithKey[configapi.ConfigurationID](options.configurationID)) - } - events, err := s.configurations.Events(ctx, eventsOpts...) - if err != nil { - return errors.FromAtomix(err) + watchers, ok := s.idWatchers[options.configurationID] + if !ok { + watchers = make(map[uuid.UUID]chan<- configapi.ConfigurationEvent) + s.idWatchers[options.configurationID] = watchers + } + watchers[id] = eventCh + } else { + s.watchers[id] = eventCh } - - if options.replay { - if options.configurationID != "" { - entry, err := s.configurations.Get(ctx, options.configurationID) - if err != nil { - err = errors.FromAtomix(err) - if !errors.IsNotFound(err) { - return err + s.mu.Unlock() + + go func() { + defer func() { + s.mu.Lock() + if options.configurationID != "" { + watchers, ok := s.idWatchers[options.configurationID] + if ok { + delete(watchers, id) + if len(watchers) == 0 { + delete(s.idWatchers, options.configurationID) + } } - go propagateEvents(events, ch) } else { - go func() { + delete(s.watchers, id) + } + s.mu.Unlock() + }() + + if options.replay { + if options.configurationID != "" { + entry, err := s.configurations.Get(ctx, options.configurationID) + if err != nil { + err = errors.FromAtomix(err) + if !errors.IsNotFound(err) { + log.Error(err) + } + } else { configuration := entry.Value configuration.Version = uint64(entry.Version) + if ctx.Err() != nil { + close(ch) + return + } + if err := s.populate(ctx, configuration); err != nil { + log.Error(err) + return + } ch <- configapi.ConfigurationEvent{ Type: configapi.ConfigurationEvent_REPLAYED, Configuration: *configuration, } - propagateEvents(events, ch) - }() - } - } else { - entries, err := s.configurations.List(ctx) - if err != nil { - return errors.FromAtomix(err) - } - go func() { + } + } else { + entries, err := s.configurations.List(ctx) + if err != nil { + log.Error(err) + close(ch) + return + } for { entry, err := entries.Next() if err == io.EOF { @@ -268,56 +431,154 @@ func (s *configurationStore) Watch(ctx context.Context, ch chan<- configapi.Conf log.Error(err) continue } + if ctx.Err() != nil { + close(ch) + return + } configuration := entry.Value configuration.Version = uint64(entry.Version) + if err := s.populate(ctx, configuration); err != nil { + log.Error(err) + return + } ch <- configapi.ConfigurationEvent{ Type: configapi.ConfigurationEvent_REPLAYED, Configuration: *configuration, } } - propagateEvents(events, ch) - }() + } } - } else { - go propagateEvents(events, ch) - } + + for { + select { + case event := <-eventCh: + ch <- event + case <-ctx.Done(): + close(ch) + go func() { + for range eventCh { + } + }() + return + } + } + }() return nil } -func propagateEvents(events _map.EventStream[configapi.ConfigurationID, *configapi.Configuration], ch chan<- configapi.ConfigurationEvent) { +func (s *configurationStore) populate(ctx context.Context, configuration *configapi.Configuration) error { + committed, err := s.getCommitted(ctx, configuration.ID) + if err != nil { + return err + } + stream, err := committed.List(ctx) + if err != nil { + return err + } for { - event, err := events.Next() + entry, err := stream.Next() if err == io.EOF { break } if err != nil { - log.Error(err) - continue + return err } - switch e := event.(type) { - case *_map.Inserted[configapi.ConfigurationID, *configapi.Configuration]: - configuration := e.Entry.Value - configuration.Version = uint64(e.Entry.Version) - ch <- configapi.ConfigurationEvent{ - Type: configapi.ConfigurationEvent_CREATED, - Configuration: *configuration, - } - case *_map.Updated[configapi.ConfigurationID, *configapi.Configuration]: - configuration := e.Entry.Value - configuration.Version = uint64(e.Entry.Version) - ch <- configapi.ConfigurationEvent{ - Type: configapi.ConfigurationEvent_UPDATED, - Configuration: *configuration, + if configuration.Values == nil { + configuration.Values = make(map[string]*configapi.PathValue) + } + configuration.Values[entry.Key] = entry.Value + } + + applied, err := s.getApplied(ctx, configuration.ID) + if err != nil { + return err + } + stream, err = applied.List(ctx) + if err != nil { + return err + } + for { + entry, err := stream.Next() + if err == io.EOF { + break + } + if err != nil { + return err + } + if configuration.Status.Applied.Values == nil { + configuration.Status.Applied.Values = make(map[string]*configapi.PathValue) + } + configuration.Status.Applied.Values[entry.Key] = entry.Value + } + return nil +} + +func (s *configurationStore) getCommitted(ctx context.Context, id configapi.ConfigurationID) (_map.Map[string, *configapi.PathValue], error) { + return s.getTarget(ctx, s.committed, id) +} + +func (s *configurationStore) getApplied(ctx context.Context, id configapi.ConfigurationID) (_map.Map[string, *configapi.PathValue], error) { + return s.getTarget(ctx, s.applied, id) +} + +func (s *configurationStore) store(ctx context.Context, store _map.Map[string, *configapi.PathValue], values map[string]*configapi.PathValue) error { + prunedValues := tree.PrunePathMap(values, true) + transaction := store.Transaction(ctx) + for _, pv := range values { + entry, err := store.Get(ctx, pv.Path) + if err != nil { + err = errors.FromAtomix(err) + if !errors.IsNotFound(err) { + return err } - case *_map.Removed[configapi.ConfigurationID, *configapi.Configuration]: - configuration := e.Entry.Value - configuration.Version = uint64(e.Entry.Version) - ch <- configapi.ConfigurationEvent{ - Type: configapi.ConfigurationEvent_DELETED, - Configuration: *configuration, + if _, ok := prunedValues[pv.Path]; ok { + transaction.Insert(pv.Path, pv) } + } else if _, ok := prunedValues[pv.Path]; !ok { + transaction.Remove(pv.Path, _map.IfVersion(entry.Version)) + } else if pv.Index != entry.Value.Index { + transaction.Update(pv.Path, pv, _map.IfVersion(entry.Version)) + } + } + if _, err := transaction.Commit(); err != nil { + err = errors.FromAtomix(err) + if errors.IsNotFound(err) || errors.IsAlreadyExists(err) || errors.IsConflict(err) { + return errors.NewConflict(err.Error()) } + return err + } + return nil +} + +func (s *configurationStore) getTarget( + ctx context.Context, + targets map[configapi.ConfigurationID]_map.Map[string, *configapi.PathValue], + id configapi.ConfigurationID) (_map.Map[string, *configapi.PathValue], error) { + s.mu.RLock() + target, ok := targets[id] + s.mu.RUnlock() + if ok { + return target, nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + target, ok = targets[id] + if ok { + return target, nil + } + + var err error + target, err = _map.NewBuilder[string, *configapi.PathValue](s.client, fmt.Sprintf("configurations-%s", id)). + Tag("onos-config", "path-value"). + Codec(types.Proto[*configapi.PathValue](&configapi.PathValue{})). + Get(ctx) + if err != nil { + return nil, errors.FromAtomix(err) } + targets[id] = target + return target, nil } func (s *configurationStore) Close(ctx context.Context) error { diff --git a/pkg/store/transaction/store.go b/pkg/store/transaction/store.go index 5a264ba4f..55de66df5 100644 --- a/pkg/store/transaction/store.go +++ b/pkg/store/transaction/store.go @@ -8,7 +8,9 @@ import ( "github.com/atomix/go-sdk/pkg/primitive" "github.com/atomix/go-sdk/pkg/primitive/indexedmap" "github.com/atomix/go-sdk/pkg/types" + "github.com/google/uuid" "io" + "sync" "time" "github.com/onosproject/onos-lib-go/pkg/errors" @@ -58,9 +60,15 @@ func NewAtomixStore(client primitive.Client) (Store, error) { if err != nil { return nil, errors.FromAtomix(err) } - return &transactionStore{ + store := &transactionStore{ transactions: transactions, - }, nil + watchers: make(map[uuid.UUID]chan<- configapi.TransactionEvent), + idWatchers: make(map[configapi.TransactionID]map[uuid.UUID]chan<- configapi.TransactionEvent), + } + if err := store.open(); err != nil { + return nil, err + } + return store, nil } type watchOptions struct { @@ -101,6 +109,74 @@ func WithTransactionID(id configapi.TransactionID) WatchOption { type transactionStore struct { transactions indexedmap.IndexedMap[configapi.TransactionID, *configapi.Transaction] + watchers map[uuid.UUID]chan<- configapi.TransactionEvent + idWatchers map[configapi.TransactionID]map[uuid.UUID]chan<- configapi.TransactionEvent + mu sync.RWMutex +} + +func (s *transactionStore) open() error { + events, err := s.transactions.Events(context.Background()) + if err != nil { + return err + } + go func() { + for { + event, err := events.Next() + if err == io.EOF { + break + } + if err != nil { + log.Error(err) + continue + } + + var transactionEvent configapi.TransactionEvent + switch e := event.(type) { + case *indexedmap.Inserted[configapi.TransactionID, *configapi.Transaction]: + transaction := e.Entry.Value + transaction.Index = configapi.Index(e.Entry.Index) + transaction.Version = uint64(e.Entry.Version) + transactionEvent = configapi.TransactionEvent{ + Type: configapi.TransactionEvent_CREATED, + Transaction: *transaction, + } + case *indexedmap.Updated[configapi.TransactionID, *configapi.Transaction]: + transaction := e.Entry.Value + transaction.Index = configapi.Index(e.Entry.Index) + transaction.Version = uint64(e.Entry.Version) + transactionEvent = configapi.TransactionEvent{ + Type: configapi.TransactionEvent_UPDATED, + Transaction: *transaction, + } + case *indexedmap.Removed[configapi.TransactionID, *configapi.Transaction]: + transaction := e.Entry.Value + transaction.Index = configapi.Index(e.Entry.Index) + transaction.Version = uint64(e.Entry.Version) + transactionEvent = configapi.TransactionEvent{ + Type: configapi.TransactionEvent_DELETED, + Transaction: *transaction, + } + } + + var watchers []chan<- configapi.TransactionEvent + s.mu.RLock() + for _, ch := range s.watchers { + watchers = append(watchers, ch) + } + idWatchers, ok := s.idWatchers[transactionEvent.Transaction.ID] + if ok { + for _, ch := range idWatchers { + watchers = append(watchers, ch) + } + } + s.mu.RUnlock() + + for _, ch := range watchers { + ch <- transactionEvent + } + } + }() + return nil } // Get gets a transaction @@ -225,42 +301,66 @@ func (s *transactionStore) Watch(ctx context.Context, ch chan<- configapi.Transa opt.apply(&options) } - var eventsOpts []indexedmap.EventsOption + id := uuid.New() + eventCh := make(chan configapi.TransactionEvent) + s.mu.Lock() if options.transactionID != "" { - eventsOpts = append(eventsOpts, indexedmap.WithKey[configapi.TransactionID](options.transactionID)) - } - events, err := s.transactions.Events(ctx, eventsOpts...) - if err != nil { - return errors.FromAtomix(err) + watchers, ok := s.idWatchers[options.transactionID] + if !ok { + watchers = make(map[uuid.UUID]chan<- configapi.TransactionEvent) + s.idWatchers[options.transactionID] = watchers + } + watchers[id] = eventCh + } else { + s.watchers[id] = eventCh } - - if options.replay { - if options.transactionID != "" { - entry, err := s.transactions.Get(ctx, options.transactionID) - if err != nil { - err = errors.FromAtomix(err) - if !errors.IsNotFound(err) { - return err + s.mu.Unlock() + + go func() { + defer func() { + s.mu.Lock() + if options.transactionID != "" { + watchers, ok := s.idWatchers[options.transactionID] + if ok { + delete(watchers, id) + if len(watchers) == 0 { + delete(s.idWatchers, options.transactionID) + } } - go propagateEvents(events, ch) } else { - go func() { + delete(s.watchers, id) + } + s.mu.Unlock() + }() + + if options.replay { + if options.transactionID != "" { + entry, err := s.transactions.Get(ctx, options.transactionID) + if err != nil { + err = errors.FromAtomix(err) + if !errors.IsNotFound(err) { + log.Error(err) + } + } else { transaction := entry.Value transaction.Index = configapi.Index(entry.Index) transaction.Version = uint64(entry.Version) + if ctx.Err() != nil { + close(ch) + return + } ch <- configapi.TransactionEvent{ Type: configapi.TransactionEvent_REPLAYED, Transaction: *transaction, } - propagateEvents(events, ch) - }() - } - } else { - entries, err := s.transactions.List(ctx) - if err != nil { - return errors.FromAtomix(err) - } - go func() { + } + } else { + entries, err := s.transactions.List(ctx) + if err != nil { + log.Error(err) + close(ch) + return + } for { entry, err := entries.Next() if err == io.EOF { @@ -270,6 +370,10 @@ func (s *transactionStore) Watch(ctx context.Context, ch chan<- configapi.Transa log.Error(err) continue } + if ctx.Err() != nil { + close(ch) + return + } transaction := entry.Value transaction.Index = configapi.Index(entry.Index) transaction.Version = uint64(entry.Version) @@ -278,52 +382,24 @@ func (s *transactionStore) Watch(ctx context.Context, ch chan<- configapi.Transa Transaction: *transaction, } } - propagateEvents(events, ch) - }() + } } - } else { - go propagateEvents(events, ch) - } - return nil -} -func propagateEvents(events indexedmap.EventStream[configapi.TransactionID, *configapi.Transaction], ch chan<- configapi.TransactionEvent) { - for { - event, err := events.Next() - if err == io.EOF { - break - } - if err != nil { - log.Error(err) - continue - } - switch e := event.(type) { - case *indexedmap.Inserted[configapi.TransactionID, *configapi.Transaction]: - transaction := e.Entry.Value - transaction.Index = configapi.Index(e.Entry.Index) - transaction.Version = uint64(e.Entry.Version) - ch <- configapi.TransactionEvent{ - Type: configapi.TransactionEvent_CREATED, - Transaction: *transaction, - } - case *indexedmap.Updated[configapi.TransactionID, *configapi.Transaction]: - transaction := e.Entry.Value - transaction.Index = configapi.Index(e.Entry.Index) - transaction.Version = uint64(e.Entry.Version) - ch <- configapi.TransactionEvent{ - Type: configapi.TransactionEvent_UPDATED, - Transaction: *transaction, - } - case *indexedmap.Removed[configapi.TransactionID, *configapi.Transaction]: - transaction := e.Entry.Value - transaction.Index = configapi.Index(e.Entry.Index) - transaction.Version = uint64(e.Entry.Version) - ch <- configapi.TransactionEvent{ - Type: configapi.TransactionEvent_DELETED, - Transaction: *transaction, + for { + select { + case event := <-eventCh: + ch <- event + case <-ctx.Done(): + close(ch) + go func() { + for range eventCh { + } + }() + return } } - } + }() + return nil } // Close closes the store