diff --git a/feature_store.go b/feature_store.go index 4c6939c..14fa879 100644 --- a/feature_store.go +++ b/feature_store.go @@ -30,7 +30,7 @@ type FeatureStore interface { // returns an empty map. All(kind VersionedDataKind) (map[string]VersionedData, error) // Init performs an update of the entire data store, replacing any existing data. - Init(map[VersionedDataKind]map[string]VersionedData) error + Init(data map[VersionedDataKind]map[string]VersionedData) error // Delete removes the specified item from the data store, unless its Version property is greater // than or equal to the specified version, in which case nothing happens. Removal should be done // by storing an item whose Deleted property is true (use VersionedDataKind.MakeDeleteItem()). diff --git a/ldconsul/consul.go b/ldconsul/consul.go index 8cb49b9..a6c3197 100644 --- a/ldconsul/consul.go +++ b/ldconsul/consul.go @@ -174,7 +174,7 @@ func NewConsulFeatureStore(options ...FeatureStoreOption) (ld.FeatureStore, erro if err != nil { return nil, err } - return utils.NewFeatureStoreWrapper(store), nil + return utils.NewNonAtomicFeatureStoreWrapper(store), nil } func newConsulFeatureStoreInternal(options ...FeatureStoreOption) (*featureStore, error) { @@ -237,7 +237,7 @@ func (store *featureStore) GetAllInternal(kind ld.VersionedDataKind) (map[string return results, nil } -func (store *featureStore) InitInternal(allData map[ld.VersionedDataKind]map[string]ld.VersionedData) error { +func (store *featureStore) InitCollectionsInternal(allData []utils.StoreCollection) error { kv := store.client.KV() // Start by reading the existing keys; we will later delete any of these that weren't in allData. @@ -252,14 +252,14 @@ func (store *featureStore) InitInternal(allData map[ld.VersionedDataKind]map[str ops := make([]*c.KVTxnOp, 0) - for kind, items := range allData { - for k, v := range items { - data, jsonErr := json.Marshal(v) + for _, coll := range allData { + for _, item := range coll.Items { + data, jsonErr := json.Marshal(item) if jsonErr != nil { return jsonErr } - key := store.featureKeyFor(kind, k) + key := store.featureKeyFor(coll.Kind, item.GetKey()) op := &c.KVTxnOp{Verb: c.KVSet, Key: key, Value: data} ops = append(ops, op) diff --git a/ldconsul/consul_test.go b/ldconsul/consul_test.go index a7542f5..023048e 100644 --- a/ldconsul/consul_test.go +++ b/ldconsul/consul_test.go @@ -29,7 +29,7 @@ func TestConsulFeatureStorePrefixes(t *testing.T) { func TestConsulFeatureStoreConcurrentModification(t *testing.T) { store1Core, err := newConsulFeatureStoreInternal() // we need the underlying implementation object so we can set testTxHook require.NoError(t, err) - store1 := utils.NewFeatureStoreWrapper(store1Core) + store1 := utils.NewNonAtomicFeatureStoreWrapper(store1Core) store2, err := NewConsulFeatureStore() require.NoError(t, err) diff --git a/lddynamodb/dynamodb.go b/lddynamodb/dynamodb.go index ca2d916..458ec3c 100644 --- a/lddynamodb/dynamodb.go +++ b/lddynamodb/dynamodb.go @@ -238,7 +238,7 @@ func NewDynamoDBFeatureStore(table string, options ...FeatureStoreOption) (ld.Fe if err != nil { return nil, err } - return utils.NewFeatureStoreWrapper(store), nil + return utils.NewNonAtomicFeatureStoreWrapper(store), nil } func newDynamoDBFeatureStoreInternal(table string, options ...FeatureStoreOption) (*dynamoDBFeatureStore, error) { @@ -273,7 +273,7 @@ func (store *dynamoDBFeatureStore) GetCacheTTL() time.Duration { return store.cacheTTL } -func (store *dynamoDBFeatureStore) InitInternal(allData map[ld.VersionedDataKind]map[string]ld.VersionedData) error { +func (store *dynamoDBFeatureStore) InitCollectionsInternal(allData []utils.StoreCollection) error { // Start by reading the existing keys; we will later delete any of these that weren't in allData. unusedOldKeys, err := store.readExistingKeys(allData) if err != nil { @@ -285,17 +285,18 @@ func (store *dynamoDBFeatureStore) InitInternal(allData map[ld.VersionedDataKind numItems := 0 // Insert or update every provided item - for kind, items := range allData { - for k, v := range items { - av, err := store.marshalItem(kind, v) + for _, coll := range allData { + for _, item := range coll.Items { + key := item.GetKey() + av, err := store.marshalItem(coll.Kind, item) if err != nil { - store.logger.Printf("ERROR: Failed to marshal item (key=%s): %s", k, err) + store.logger.Printf("ERROR: Failed to marshal item (key=%s): %s", key, err) return err } requests = append(requests, &dynamodb.WriteRequest{ PutRequest: &dynamodb.PutRequest{Item: av}, }) - nk := namespaceAndKey{namespace: store.namespaceForKind(kind), key: v.GetKey()} + nk := namespaceAndKey{namespace: store.namespaceForKind(coll.Kind), key: key} unusedOldKeys[nk] = false numItems++ } @@ -474,9 +475,10 @@ func (store *dynamoDBFeatureStore) makeQueryForKind(kind ld.VersionedDataKind) * } } -func (store *dynamoDBFeatureStore) readExistingKeys(newData map[ld.VersionedDataKind]map[string]ld.VersionedData) (map[namespaceAndKey]bool, error) { +func (store *dynamoDBFeatureStore) readExistingKeys(newData []utils.StoreCollection) (map[namespaceAndKey]bool, error) { keys := make(map[namespaceAndKey]bool) - for kind := range newData { + for _, coll := range newData { + kind := coll.Kind query := store.makeQueryForKind(kind) query.ProjectionExpression = aws.String("#namespace, #key") query.ExpressionAttributeNames = map[string]*string{ diff --git a/lddynamodb/dynamodb_test.go b/lddynamodb/dynamodb_test.go index 2e77375..44254c8 100644 --- a/lddynamodb/dynamodb_test.go +++ b/lddynamodb/dynamodb_test.go @@ -45,10 +45,10 @@ func TestDynamoDBFeatureStorePrefixes(t *testing.T) { func TestDynamoDBFeatureStoreConcurrentModification(t *testing.T) { store1Internal, err := newDynamoDBFeatureStoreInternal(testTableName, SessionOptions(makeTestOptions())) require.NoError(t, err) - store1 := utils.NewFeatureStoreWrapper(store1Internal) + store1 := utils.NewNonAtomicFeatureStoreWrapper(store1Internal) store2Internal, err := newDynamoDBFeatureStoreInternal(testTableName, SessionOptions(makeTestOptions())) require.NoError(t, err) - store2 := utils.NewFeatureStoreWrapper(store2Internal) + store2 := utils.NewNonAtomicFeatureStoreWrapper(store2Internal) ldtest.RunFeatureStoreConcurrentModificationTests(t, store1, store2, func(hook func()) { store1Internal.testUpdateHook = hook }) diff --git a/utils/dependency_ordering.go b/utils/dependency_ordering.go new file mode 100644 index 0000000..eb56771 --- /dev/null +++ b/utils/dependency_ordering.go @@ -0,0 +1,79 @@ +package utils + +import ( + "sort" + + ld "gopkg.in/launchdarkly/go-client.v4" +) + +func transformUnorderedDataToOrderedData(allData map[ld.VersionedDataKind]map[string]ld.VersionedData) []StoreCollection { + colls := make([]StoreCollection, 0, len(allData)) + for kind, itemsMap := range allData { + items := make([]ld.VersionedData, 0, len(itemsMap)) + if doesDataKindSupportDependencies(kind) { + addItemsInDependencyOrder(itemsMap, &items) + } else { + for _, item := range itemsMap { + items = append(items, item) + } + } + colls = append(colls, StoreCollection{Kind: kind, Items: items}) + } + sort.Slice(colls, func(i, j int) bool { + return dataKindPriority(colls[i].Kind) < dataKindPriority(colls[j].Kind) + }) + return colls +} + +func doesDataKindSupportDependencies(kind ld.VersionedDataKind) bool { + return kind == ld.Features +} + +func addItemsInDependencyOrder(itemsMap map[string]ld.VersionedData, out *[]ld.VersionedData) { + remainingItems := make(map[string]ld.VersionedData, len(itemsMap)) + for key, item := range itemsMap { // copy the map because we'll be consuming it + remainingItems[key] = item + } + for len(remainingItems) > 0 { + // pick a random item that hasn't been visited yet + for _, item := range remainingItems { + addWithDependenciesFirst(item, remainingItems, out) + break + } + } +} + +func addWithDependenciesFirst(startItem ld.VersionedData, remainingItems map[string]ld.VersionedData, out *[]ld.VersionedData) { + delete(remainingItems, startItem.GetKey()) // we won't need to visit this item again + for _, prereqKey := range getDependencyKeys(startItem) { + prereqItem := remainingItems[prereqKey] + if prereqItem != nil { + addWithDependenciesFirst(prereqItem, remainingItems, out) + } + } + *out = append(*out, startItem) +} + +func getDependencyKeys(item ld.VersionedData) []string { + var ret []string + switch i := item.(type) { + case *ld.FeatureFlag: + for _, p := range i.Prerequisites { + ret = append(ret, p.Key) + } + } + return ret +} + +// Logic for ensuring that segments are processed before features; if we get any other data types that +// haven't been accounted for here, they'll come after those two in an arbitrary order. +func dataKindPriority(kind ld.VersionedDataKind) int { + switch kind { + case ld.Segments: + return 0 + case ld.Features: + return 1 + default: + return len(kind.GetNamespace()) + 2 + } +} diff --git a/utils/feature_store_wrapper.go b/utils/feature_store_wrapper.go index c709efa..ab71c5d 100644 --- a/utils/feature_store_wrapper.go +++ b/utils/feature_store_wrapper.go @@ -25,12 +25,9 @@ func UnmarshalItem(kind ld.VersionedDataKind, raw []byte) (ld.VersionedData, err return nil, fmt.Errorf("unexpected data type from JSON unmarshal: %T", data) } -// FeatureStoreCore is an interface for a simplified subset of the functionality of -// ldclient.FeatureStore, to be used in conjunction with FeatureStoreWrapper. This allows -// developers of custom FeatureStore implementations to avoid repeating logic that would -// commonly be needed in any such implementation, such as caching. Instead, they can -// implement only FeatureStoreCore and then call NewFeatureStoreWrapper. -type FeatureStoreCore interface { +// FeatureStoreCoreBase defines methods that are common to the FeatureStoreCore and +// NonAtomicFeatureStoreCore interfaces. +type FeatureStoreCoreBase interface { // GetInternal queries a single item from the data store. The kind parameter distinguishes // between different categories of data (flags, segments) and the key is the unique key // within that category. If no such item exists, the method should return (nil, nil). @@ -41,11 +38,6 @@ type FeatureStoreCore interface { // a map of unique keys to items. It should not attempt to filter out any items based // on their Deleted property, nor to cache any items. GetAllInternal(kind ld.VersionedDataKind) (map[string]ld.VersionedData, error) - // InitInternal replaces the entire contents of the data store. It should either do - // this atomically (if the data store supports transactions), or if that is not - // possible, it should first add/update all items from the new data set and then - // delete any existing keys that were not in the new data set. - InitInternal(map[ld.VersionedDataKind]map[string]ld.VersionedData) error // UpsertInternal adds or updates a single item. If an item with the same key already // exists, it should update it only if the new item's GetVersion() value is greater // than the old one. It should return the final state of the item, i.e. if the update @@ -69,13 +61,62 @@ type FeatureStoreCore interface { GetCacheTTL() time.Duration } -// FeatureStoreWrapper is a partial implementation of ldclient.FeatureStore that delegates -// basic functionality to an instance of FeatureStoreCore. It provides optional caching +// FeatureStoreCore is an interface for a simplified subset of the functionality of +// ldclient.FeatureStore, to be used in conjunction with FeatureStoreWrapper. This allows +// developers of custom FeatureStore implementations to avoid repeating logic that would +// commonly be needed in any such implementation, such as caching. Instead, they can +// implement only FeatureStoreCore and then call NewFeatureStoreWrapper. +// +// This interface assumes that the feature store can update the data set atomically. If +// not, use NonAtomicFeatureStoreCore instead. FeatureStoreCoreBase defines the common methods. +type FeatureStoreCore interface { + FeatureStoreCoreBase + // InitInternal replaces the entire contents of the data store. This should be done + // atomically (i.e. within a transaction). + InitInternal(map[ld.VersionedDataKind]map[string]ld.VersionedData) error +} + +// NonAtomicFeatureStoreCore is an interface for a limited subset of the functionality of +// ldclient.FeatureStore, to be used in conjunction with FeatureStoreWrapper. This allows +// developers of custom FeatureStore implementations to avoid repeating logic that would +// commonly be needed in any such implementation, such as caching. Instead, they can +// implement only FeatureStoreCore and then call NewFeatureStoreWrapper. +// +// This interface assumes that the feature store cannot update the data set atomically and +// will require the SDK to specify the order of operations. If atomic updates are possible, +// then use FeatureStoreCore instead. FeatureStoreCoreBase defines the common methods. +// +// Note that this is somewhat different from the way the LaunchDarkly SDK addresses the +// atomicity issue on most other platforms. There, the feature stores just have one +// interface, which always receives the data as a map, but the SDK can control the +// iteration order of the map. That isn't possible in Go where maps never have a defined +// iteration order. +type NonAtomicFeatureStoreCore interface { + FeatureStoreCoreBase + // InitCollectionsInternal replaces the entire contents of the data store. The SDK will + // pass a data set with a defined ordering; the collections (kinds) should be processed in + // the specified order, and the items within each collection should be written in the + // specified order. The store should delete any obsolete items only after writing all of + // the items provided. + InitCollectionsInternal(allData []StoreCollection) error +} + +// StoreCollection is used by the NonAtomicFeatureStoreCore interface. +type StoreCollection struct { + Kind ld.VersionedDataKind + Items []ld.VersionedData +} + +// FeatureStoreWrapper is a partial implementation of ldclient.FeatureStore that delegates basic +// functionality to an instance of FeatureStoreCore. It provides optional caching, and will +// automatically provide the proper data ordering when using NonAtomicFeatureStoreCoreInitialization. type FeatureStoreWrapper struct { - core FeatureStoreCore - cache *cache.Cache - inited bool - initLock sync.RWMutex + core FeatureStoreCoreBase + coreAtomic FeatureStoreCore + coreNonAtomic NonAtomicFeatureStoreCore + cache *cache.Cache + inited bool + initLock sync.RWMutex } const initCheckedKey = "$initChecked" @@ -83,12 +124,25 @@ const initCheckedKey = "$initChecked" // NewFeatureStoreWrapper creates an instance of FeatureStoreWrapper that wraps an instance // of FeatureStoreCore. func NewFeatureStoreWrapper(core FeatureStoreCore) *FeatureStoreWrapper { - w := FeatureStoreWrapper{core: core} + w := FeatureStoreWrapper{core: core, coreAtomic: core} + w.cache = initCache(core) + return &w +} + +// NewNonAtomicFeatureStoreWrapper creates an instance of FeatureStoreWrapper that wraps an +// instance of NonAtomicFeatureStoreCore. +func NewNonAtomicFeatureStoreWrapper(core NonAtomicFeatureStoreCore) *FeatureStoreWrapper { + w := FeatureStoreWrapper{core: core, coreNonAtomic: core} + w.cache = initCache(core) + return &w +} + +func initCache(core FeatureStoreCoreBase) *cache.Cache { cacheTTL := core.GetCacheTTL() if cacheTTL > 0 { - w.cache = cache.New(cacheTTL, 5*time.Minute) + return cache.New(cacheTTL, 5*time.Minute) } - return &w + return nil } func featureStoreCacheKey(kind ld.VersionedDataKind, key string) string { @@ -101,7 +155,15 @@ func featureStoreAllItemsCacheKey(kind ld.VersionedDataKind) string { // Init performs an update of the entire data store, with optional caching. func (w *FeatureStoreWrapper) Init(allData map[ld.VersionedDataKind]map[string]ld.VersionedData) error { - err := w.core.InitInternal(allData) + var err error + if w.coreNonAtomic != nil { + // If the store uses non-atomic initialization, we'll need to put the data in the proper update + // order and call InitCollectionsInternal. + colls := transformUnorderedDataToOrderedData(allData) + err = w.coreNonAtomic.InitCollectionsInternal(colls) + } else { + err = w.coreAtomic.InitInternal(allData) + } if w.cache != nil { w.cache.Flush() if err == nil { diff --git a/utils/feature_store_wrapper_test.go b/utils/feature_store_wrapper_test.go index 665c36e..35155c6 100644 --- a/utils/feature_store_wrapper_test.go +++ b/utils/feature_store_wrapper_test.go @@ -1,6 +1,7 @@ package utils import ( + "strings" "testing" "time" @@ -17,6 +18,11 @@ type mockCore struct { initQueriedCount int } +// Test implementation of NonAtomicFeatureStoreCore - we test this in somewhat less deteail +type mockNonAtomicCore struct { + data []StoreCollection +} + func newCore(ttl time.Duration) *mockCore { return &mockCore{ cacheTTL: ttl, @@ -64,6 +70,31 @@ func (c *mockCore) InitializedInternal() bool { return c.inited } +func (c *mockNonAtomicCore) GetCacheTTL() time.Duration { + return 0 +} + +func (c *mockNonAtomicCore) InitCollectionsInternal(allData []StoreCollection) error { + c.data = allData + return nil +} + +func (c *mockNonAtomicCore) GetInternal(kind ld.VersionedDataKind, key string) (ld.VersionedData, error) { + return nil, nil // not used in tests +} + +func (c *mockNonAtomicCore) GetAllInternal(kind ld.VersionedDataKind) (map[string]ld.VersionedData, error) { + return nil, nil // not used in tests +} + +func (c *mockNonAtomicCore) UpsertInternal(kind ld.VersionedDataKind, item ld.VersionedData) (ld.VersionedData, error) { + return nil, nil // not used in tests +} + +func (c *mockNonAtomicCore) InitializedInternal() bool { + return false // not used in tests +} + func TestFeatureStoreWrapper(t *testing.T) { cacheTime := 30 * time.Second @@ -355,4 +386,71 @@ func TestFeatureStoreWrapper(t *testing.T) { assert.True(t, w.Initialized()) assert.Equal(t, 2, core.initQueriedCount) }) + + t.Run("Non-atomic init passes ordered data to core", func(t *testing.T) { + core := &mockNonAtomicCore{} + w := NewNonAtomicFeatureStoreWrapper(core) + + assert.NoError(t, w.Init(dependencyOrderingTestData)) + + receivedData := core.data + assert.Equal(t, 2, len(receivedData)) + assert.Equal(t, ld.Segments, receivedData[0].Kind) // Segments should always be first + assert.Equal(t, len(dependencyOrderingTestData[ld.Segments]), len(receivedData[0].Items)) + assert.Equal(t, ld.Features, receivedData[1].Kind) + assert.Equal(t, len(dependencyOrderingTestData[ld.Features]), len(receivedData[1].Items)) + + flags := receivedData[1].Items + findFlagIndex := func(key string) int { + for i, item := range flags { + if item.GetKey() == key { + return i + } + } + return -1 + } + + for _, item := range dependencyOrderingTestData[ld.Features] { + if flag, ok := item.(*ld.FeatureFlag); ok { + flagIndex := findFlagIndex(flag.Key) + for _, prereq := range flag.Prerequisites { + prereqIndex := findFlagIndex(prereq.Key) + if prereqIndex > flagIndex { + keys := make([]string, 0, len(flags)) + for _, item := range flags { + keys = append(keys, item.GetKey()) + } + assert.True(t, false, "%s depends on %s, but %s was listed first; keys in order are [%s]", + flag.Key, prereq.Key, strings.Join(keys, ", ")) + } + } + } + } + }) +} + +var dependencyOrderingTestData = map[ld.VersionedDataKind]map[string]ld.VersionedData{ + ld.Features: { + "a": &ld.FeatureFlag{ + Key: "a", + Prerequisites: []ld.Prerequisite{ + ld.Prerequisite{Key: "b"}, + ld.Prerequisite{Key: "c"}, + }, + }, + "b": &ld.FeatureFlag{ + Key: "b", + Prerequisites: []ld.Prerequisite{ + ld.Prerequisite{Key: "c"}, + ld.Prerequisite{Key: "e"}, + }, + }, + "c": &ld.FeatureFlag{Key: "c"}, + "d": &ld.FeatureFlag{Key: "d"}, + "e": &ld.FeatureFlag{Key: "e"}, + "f": &ld.FeatureFlag{Key: "f"}, + }, + ld.Segments: { + "1": &ld.Segment{Key: "1"}, + }, }