diff --git a/CHANGELOG.md b/CHANGELOG.md index 0be9c3a..045652c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ All notable changes to the LaunchDarkly Go SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [4.0.0] - 2018-05-10 + +### Changed: +- To reduce the network bandwidth used for analytics events, feature request events are now sent as counters rather than individual events, and user details are now sent only at intervals rather than in each event. These behaviors can be modified through the LaunchDarkly UI and with the new configuration option `InlineUsersInEvents`. For more details, see [Analytics Data Stream Reference](https://docs.launchdarkly.com/v2.0/docs/analytics-data-stream-reference). +- When sending analytics events, if there is a connection error or an HTTP 5xx response, the client will try to send the events again one more time after a one-second delay. +- The `Close` method on the client now conforms to the `io.Closer` interface. + +### Added: +- The new global `VersionedDataKinds` is an array of all existing `VersionedDataKind` instances. This is mainly useful if you are writing a custom `FeatureStore` implementation. (Thanks, [mlafeldt](https://github.com/launchdarkly/go-client/pull/117)!) + + ## [3.1.0] - 2018-03-19 ### Added - Convenience functions `NewUser` and `NewAnonymousUser`, for creating a user struct given only the key. (Thanks, [mlafeldt](https://github.com/launchdarkly/go-client/pull/109)!) diff --git a/Gopkg.lock b/Gopkg.lock index 9ab6aed..091eb0a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -21,7 +21,10 @@ [[projects]] name = "github.com/garyburd/redigo" - packages = ["internal","redis"] + packages = [ + "internal", + "redis" + ] revision = "d1ed5c67e5794de818ea85e6b522fda02623a484" version = "v1.4.0" diff --git a/event_processor.go b/event_processor.go new file mode 100644 index 0000000..22ce668 --- /dev/null +++ b/event_processor.go @@ -0,0 +1,440 @@ +package ldclient + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "math/rand" + "net/http" + "sync" + "time" +) + +// EventProcessor defines the interface for dispatching analytics events. +type EventProcessor interface { + // Records an event asynchronously. + SendEvent(Event) + // Specifies that any buffered events should be sent as soon as possible, rather than waiting + // for the next flush interval. This method is asynchronous, so events still may not be sent + // until a later time. + Flush() + // Shuts down all event processor activity, after first ensuring that all events have been + // delivered. Subsequent calls to SendEvent() or Flush() will be ignored. + Close() error +} + +type nullEventProcessor struct{} + +type defaultEventProcessor struct { + inputCh chan eventDispatcherMessage + closeOnce sync.Once +} + +type eventDispatcher struct { + sdkKey string + config Config + lastKnownPastTime uint64 + disabled bool + stateLock sync.Mutex +} + +type eventBuffer struct { + events []Event + summarizer eventSummarizer + capacity int + capacityExceeded bool + logger Logger +} + +type flushPayload struct { + events []Event + summary eventSummary +} + +type sendEventsTask struct { + client *http.Client + eventsURI string + logger Logger + sdkKey string + userAgent string + formatter eventOutputFormatter +} + +// Payload of the inputCh channel. +type eventDispatcherMessage interface{} + +type sendEventMessage struct { + event Event +} + +type flushEventsMessage struct{} + +type shutdownEventsMessage struct { + replyCh chan struct{} +} + +type syncEventsMessage struct { + replyCh chan struct{} +} + +const ( + maxFlushWorkers = 5 + eventSchemaHeader = "X-LaunchDarkly-Event-Schema" + currentEventSchema = "3" +) + +func newNullEventProcessor() *nullEventProcessor { + return &nullEventProcessor{} +} + +func (n *nullEventProcessor) SendEvent(e Event) {} + +func (n *nullEventProcessor) Flush() {} + +func (n *nullEventProcessor) Close() error { + return nil +} + +// NewDefaultEventProcessor creates an instance of the default implementation of analytics event processing. +// This is normally only used internally; it is public because the Go SDK code is reused by other LaunchDarkly +// components. +func NewDefaultEventProcessor(sdkKey string, config Config, client *http.Client) *defaultEventProcessor { + if client == nil { + client = &http.Client{} + } + inputCh := make(chan eventDispatcherMessage, config.Capacity) + startEventDispatcher(sdkKey, config, client, inputCh) + return &defaultEventProcessor{ + inputCh: inputCh, + } +} + +func (ep *defaultEventProcessor) SendEvent(e Event) { + ep.inputCh <- sendEventMessage{event: e} +} + +func (ep *defaultEventProcessor) Flush() { + ep.inputCh <- flushEventsMessage{} +} + +func (ep *defaultEventProcessor) Close() error { + ep.closeOnce.Do(func() { + ep.inputCh <- flushEventsMessage{} + m := shutdownEventsMessage{replyCh: make(chan struct{})} + ep.inputCh <- m + <-m.replyCh + }) + return nil +} + +// used only for testing - ensures that all pending messages and flushes have completed +func (ep *defaultEventProcessor) waitUntilInactive() { + m := syncEventsMessage{replyCh: make(chan struct{})} + ep.inputCh <- m + <-m.replyCh // Now we know that all events prior to this call have been processed +} + +func startEventDispatcher(sdkKey string, config Config, client *http.Client, + inputCh <-chan eventDispatcherMessage) { + ed := &eventDispatcher{ + sdkKey: sdkKey, + config: config, + } + + // Start a fixed-size pool of workers that wait on flushTriggerCh. This is the + // maximum number of flushes we can do concurrently. + flushCh := make(chan *flushPayload, 1) + var workersGroup sync.WaitGroup + for i := 0; i < maxFlushWorkers; i++ { + startFlushTask(sdkKey, config, client, flushCh, &workersGroup, + func(r *http.Response) { ed.handleResponse(r) }) + } + go ed.runMainLoop(inputCh, flushCh, &workersGroup) +} + +func (ed *eventDispatcher) runMainLoop(inputCh <-chan eventDispatcherMessage, + flushCh chan<- *flushPayload, workersGroup *sync.WaitGroup) { + if err := recover(); err != nil { + ed.config.Logger.Printf("Unexpected panic in event processing thread: %+v", err) + } + + buffer := eventBuffer{ + events: make([]Event, 0, ed.config.Capacity), + summarizer: newEventSummarizer(), + capacity: ed.config.Capacity, + logger: ed.config.Logger, + } + userKeys := newLruCache(ed.config.UserKeysCapacity) + + flushInterval := ed.config.FlushInterval + if flushInterval <= 0 { + flushInterval = DefaultConfig.FlushInterval + } + userKeysFlushInterval := ed.config.UserKeysFlushInterval + if userKeysFlushInterval <= 0 { + userKeysFlushInterval = DefaultConfig.UserKeysFlushInterval + } + flushTicker := time.NewTicker(flushInterval) + usersResetTicker := time.NewTicker(userKeysFlushInterval) + + for { + // Drain the response channel with a higher priority than anything else + // to ensure that the flush workers don't get blocked. + select { + case message := <-inputCh: + switch m := message.(type) { + case sendEventMessage: + ed.processEvent(m.event, &buffer, &userKeys) + case flushEventsMessage: + ed.triggerFlush(&buffer, flushCh, workersGroup) + case syncEventsMessage: + workersGroup.Wait() + m.replyCh <- struct{}{} + case shutdownEventsMessage: + flushTicker.Stop() + usersResetTicker.Stop() + workersGroup.Wait() // Wait for all in-progress flushes to complete + close(flushCh) // Causes all idle flush workers to terminate + m.replyCh <- struct{}{} + return + } + case <-flushTicker.C: + ed.triggerFlush(&buffer, flushCh, workersGroup) + case <-usersResetTicker.C: + userKeys.clear() + } + } +} + +func (ed *eventDispatcher) processEvent(evt Event, buffer *eventBuffer, userKeys *lruCache) { + + // Always record the event in the summarizer. + buffer.addToSummary(evt) + + // Decide whether to add the event to the payload. Feature events may be added twice, once for + // the event (if tracked) and once for debugging. + willAddFullEvent := false + var debugEvent Event + switch evt := evt.(type) { + case FeatureRequestEvent: + if ed.shouldSampleEvent() { + willAddFullEvent = evt.TrackEvents + if ed.shouldDebugEvent(&evt) { + de := evt + de.Debug = true + debugEvent = de + } + } + default: + willAddFullEvent = ed.shouldSampleEvent() + } + + // For each user we haven't seen before, we add an index event - unless this is already + // an identify event for that user. This should be added before the event that referenced + // the user, and can be omitted if that event will contain an inline user. + if !(willAddFullEvent && ed.config.InlineUsersInEvents) { + user := evt.GetBase().User + if !noticeUser(userKeys, &user) { + if _, ok := evt.(IdentifyEvent); !ok { + indexEvent := IndexEvent{ + BaseEvent{CreationDate: evt.GetBase().CreationDate, User: user}, + } + buffer.addEvent(indexEvent) + } + } + } + if willAddFullEvent { + buffer.addEvent(evt) + } + if debugEvent != nil { + buffer.addEvent(debugEvent) + } +} + +// Add to the set of users we've noticed, and return true if the user was already known to us. +func noticeUser(userKeys *lruCache, user *User) bool { + if user == nil || user.Key == nil { + return true + } + return userKeys.add(*user.Key) +} + +func (ed *eventDispatcher) shouldSampleEvent() bool { + return ed.config.SamplingInterval == 0 || rand.Int31n(ed.config.SamplingInterval) == 0 +} + +func (ed *eventDispatcher) shouldDebugEvent(evt *FeatureRequestEvent) bool { + if evt.DebugEventsUntilDate == nil { + return false + } + // The "last known past time" comes from the last HTTP response we got from the server. + // In case the client's time is set wrong, at least we know that any expiration date + // earlier than that point is definitely in the past. If there's any discrepancy, we + // want to err on the side of cutting off event debugging sooner. + ed.stateLock.Lock() // This should be done infrequently since it's only for debug events + defer ed.stateLock.Unlock() + return *evt.DebugEventsUntilDate > ed.lastKnownPastTime && + *evt.DebugEventsUntilDate > now() +} + +// Signal that we would like to do a flush as soon as possible. +func (ed *eventDispatcher) triggerFlush(buffer *eventBuffer, flushCh chan<- *flushPayload, + workersGroup *sync.WaitGroup) { + if ed.isDisabled() { + buffer.clear() + return + } + // Is there anything to flush? + payload := buffer.getPayload() + if len(payload.events) == 0 && len(payload.summary.counters) == 0 { + return + } + workersGroup.Add(1) // Increment the count of active flushes + select { + case flushCh <- &payload: + // If the channel wasn't full, then there is a worker available who will pick up + // this flush payload and send it. The event buffer and summary state can now be + // cleared from the main goroutine. + buffer.clear() + default: + // We can't start a flush right now because we're waiting for one of the workers + // to pick up the last one. Do not reset the event buffer or summary state. + workersGroup.Done() + } +} + +func (ed *eventDispatcher) isDisabled() bool { + // Since we're using a mutex, we should avoid calling this often. + ed.stateLock.Lock() + defer ed.stateLock.Unlock() + return ed.disabled +} + +func (ed *eventDispatcher) handleResponse(resp *http.Response) { + err := checkStatusCode(resp.StatusCode, resp.Request.URL.String()) + if err != nil { + ed.config.Logger.Printf("Unexpected status code when sending events: %+v", err) + if err != nil && err.Code == 401 { + ed.config.Logger.Printf("Received 401 error, no further events will be posted since SDK key is invalid") + ed.stateLock.Lock() + defer ed.stateLock.Unlock() + ed.disabled = true + } + } else { + dt, err := http.ParseTime(resp.Header.Get("Date")) + if err == nil { + ed.stateLock.Lock() + defer ed.stateLock.Unlock() + ed.lastKnownPastTime = toUnixMillis(dt) + } + } +} + +func (b *eventBuffer) addEvent(event Event) { + if len(b.events) >= b.capacity { + if !b.capacityExceeded { + b.capacityExceeded = true + b.logger.Printf("WARN: Exceeded event queue capacity. Increase capacity to avoid dropping events.") + } + return + } + b.capacityExceeded = false + b.events = append(b.events, event) +} + +func (b *eventBuffer) addToSummary(event Event) { + b.summarizer.summarizeEvent(event) +} + +func (b *eventBuffer) getPayload() flushPayload { + return flushPayload{ + events: b.events, + summary: b.summarizer.snapshot(), + } +} + +func (b *eventBuffer) clear() { + b.events = make([]Event, 0, b.capacity) + b.summarizer.reset() +} + +func startFlushTask(sdkKey string, config Config, client *http.Client, flushCh <-chan *flushPayload, + workersGroup *sync.WaitGroup, responseFn func(*http.Response)) { + ef := eventOutputFormatter{ + userFilter: newUserFilter(config), + inlineUsers: config.InlineUsersInEvents, + } + t := sendEventsTask{ + client: client, + eventsURI: config.EventsUri + "/bulk", + logger: config.Logger, + sdkKey: sdkKey, + userAgent: config.UserAgent, + formatter: ef, + } + go t.run(flushCh, responseFn, workersGroup) +} + +func (t *sendEventsTask) run(flushCh <-chan *flushPayload, responseFn func(*http.Response), + workersGroup *sync.WaitGroup) { + for { + payload, more := <-flushCh + if !more { + // Channel has been closed - we're shutting down + break + } + outputEvents := t.formatter.makeOutputEvents(payload.events, payload.summary) + if len(outputEvents) > 0 { + resp := t.postEvents(outputEvents) + if resp != nil { + responseFn(resp) + } + } + workersGroup.Done() // Decrement the count of in-progress flushes + } +} + +func (t *sendEventsTask) postEvents(outputEvents []interface{}) *http.Response { + jsonPayload, marshalErr := json.Marshal(outputEvents) + if marshalErr != nil { + t.logger.Printf("Unexpected error marshalling event json: %+v", marshalErr) + return nil + } + + var resp *http.Response + var respErr error + for attempt := 0; attempt < 2; attempt++ { + if attempt > 0 { + t.logger.Printf("Will retry posting events after 1 second") + time.Sleep(1 * time.Second) + } + req, reqErr := http.NewRequest("POST", t.eventsURI, bytes.NewReader(jsonPayload)) + if reqErr != nil { + t.logger.Printf("Unexpected error while creating event request: %+v", reqErr) + return nil + } + + req.Header.Add("Authorization", t.sdkKey) + req.Header.Add("Content-Type", "application/json") + req.Header.Add("User-Agent", t.userAgent) + req.Header.Add(eventSchemaHeader, currentEventSchema) + + resp, respErr = t.client.Do(req) + + if resp != nil && resp.Body != nil { + ioutil.ReadAll(resp.Body) + resp.Body.Close() + } + + if respErr != nil { + t.logger.Printf("Unexpected error while sending events: %+v", respErr) + continue + } else if resp.StatusCode >= 500 { + t.logger.Printf("Received error status %d when sending events", resp.StatusCode) + continue + } else { + break + } + } + return resp +} diff --git a/event_processor_test.go b/event_processor_test.go new file mode 100644 index 0000000..0ea27cc --- /dev/null +++ b/event_processor_test.go @@ -0,0 +1,653 @@ +package ldclient + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "os" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var BuiltinAttributes = []string{ + "avatar", + "country", + "email", + "firstName", + "ip", + "lastName", + "name", + "secondary", +} + +var epDefaultConfig = Config{ + SendEvents: true, + Capacity: 1000, + FlushInterval: 1 * time.Hour, + Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), + UserKeysCapacity: 1000, + UserKeysFlushInterval: 1 * time.Hour, +} + +var epDefaultUser = User{ + Key: strPtr("userKey"), + Name: strPtr("Red"), +} + +var userJson = map[string]interface{}{"key": "userKey", "name": "Red"} +var filteredUserJson = map[string]interface{}{"key": "userKey", "privateAttrs": []interface{}{"name"}} + +const ( + sdkKey = "SDK_KEY" +) + +type stubTransport struct { + messageSent chan *http.Request + statusCode int + serverTime uint64 + error error +} + +var epoch = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + +func init() { + sort.Strings(BuiltinAttributes) +} + +func TestIdentifyEventIsQueued(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + ie := NewIdentifyEvent(epDefaultUser) + ep.SendEvent(ie) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 1, len(output)) { + assertIdentifyEventMatches(t, ie, userJson, output[0]) + } +} + +func TestUserDetailsAreScrubbedInIdentifyEvent(t *testing.T) { + config := epDefaultConfig + config.AllAttributesPrivate = true + ep, st := createEventProcessor(config) + defer ep.Close() + + ie := NewIdentifyEvent(epDefaultUser) + ep.SendEvent(ie) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 1, len(output)) { + assertIdentifyEventMatches(t, ie, filteredUserJson, output[0]) + } +} + +func TestFeatureEventIsSummarizedAndNotTrackedByDefault(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertIndexEventMatches(t, fe, userJson, output[0]) + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[1]) + } +} + +func TestIndividualFeatureEventIsQueuedWhenTrackEventsIsTrue(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: true, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 3, len(output)) { + assertIndexEventMatches(t, fe, userJson, output[0]) + assertFeatureEventMatches(t, fe, flag, value, false, nil, output[1]) + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[2]) + } +} + +func TestUserDetailsAreScrubbedInIndexEvent(t *testing.T) { + config := epDefaultConfig + config.AllAttributesPrivate = true + ep, st := createEventProcessor(config) + defer ep.Close() + + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: true, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 3, len(output)) { + assertIndexEventMatches(t, fe, filteredUserJson, output[0]) + assertFeatureEventMatches(t, fe, flag, value, false, nil, output[1]) + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[2]) + } +} + +func TestFeatureEventCanContainInlineUser(t *testing.T) { + config := epDefaultConfig + config.InlineUsersInEvents = true + ep, st := createEventProcessor(config) + defer ep.Close() + + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: true, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertFeatureEventMatches(t, fe, flag, value, false, &userJson, output[0]) + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[1]) + } +} + +func TestUserDetailsAreScrubbedInFeatureEvent(t *testing.T) { + config := epDefaultConfig + config.InlineUsersInEvents = true + config.AllAttributesPrivate = true + ep, st := createEventProcessor(config) + defer ep.Close() + + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: true, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertFeatureEventMatches(t, fe, flag, value, false, &filteredUserJson, output[0]) + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[1]) + } +} + +func TestIndexEventIsGeneratedForNonTrackedFeatureEventEvenIfInliningIsOn(t *testing.T) { + config := epDefaultConfig + config.InlineUsersInEvents = true + ep, st := createEventProcessor(config) + defer ep.Close() + + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: false, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertIndexEventMatches(t, fe, userJson, output[0]) // we get this because we are *not* getting the full event + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[1]) + } +} + +func TestDebugEventIsAddedIfFlagIsTemporarilyInDebugMode(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + futureTime := now() + 1000000 + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: false, + DebugEventsUntilDate: &futureTime, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 3, len(output)) { + assertIndexEventMatches(t, fe, userJson, output[0]) + assertFeatureEventMatches(t, fe, flag, value, true, &userJson, output[1]) + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[2]) + } +} + +func TestEventCanBeBothTrackedAndDebugged(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + futureTime := now() + 1000000 + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: true, + DebugEventsUntilDate: &futureTime, + } + value := "value" + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 4, len(output)) { + assertIndexEventMatches(t, fe, userJson, output[0]) + assertFeatureEventMatches(t, fe, flag, value, false, nil, output[1]) + assertFeatureEventMatches(t, fe, flag, value, true, &userJson, output[2]) + assertSummaryEventHasCounter(t, flag, intPtr(2), value, 1, output[3]) + } +} + +func TestDebugModeExpiresBasedOnClientTimeIfClienttTimeIsLater(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + // Pick a server time that is somewhat behind the client time + serverTime := now() - 20000 + st.serverTime = serverTime + + // Send and flush an event we don't care about, just to set the last server time + ie := NewIdentifyEvent(User{Key: strPtr("otherUser")}) + ep.SendEvent(ie) + ep.Flush() + ep.waitUntilInactive() + st.getNextRequest() + + // Now send an event with debug mode on, with a "debug until" time that is further in + // the future than the server time, but in the past compared to the client. + debugUntil := serverTime + 1000 + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: false, + DebugEventsUntilDate: &debugUntil, + } + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, nil, nil, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertIndexEventMatches(t, fe, userJson, output[0]) + // should get a summary event only, not a debug event + assertSummaryEventHasCounter(t, flag, nil, nil, 1, output[1]) + } +} + +func TestDebugModeExpiresBasedOnServerTimeIfServerTimeIsLater(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + // Pick a server time that is somewhat ahead of the client time + serverTime := now() + 20000 + st.serverTime = serverTime + + // Send and flush an event we don't care about, just to set the last server time + ie := NewIdentifyEvent(User{Key: strPtr("otherUser")}) + ep.SendEvent(ie) + ep.Flush() + ep.waitUntilInactive() + st.getNextRequest() + + // Now send an event with debug mode on, with a "debug until" time that is further in + // the future than the client time, but in the past compared to the server. + debugUntil := serverTime - 1000 + flag := FeatureFlag{ + Key: "flagkey", + Version: 11, + TrackEvents: false, + DebugEventsUntilDate: &debugUntil, + } + fe := NewFeatureRequestEvent(flag.Key, &flag, epDefaultUser, nil, nil, nil, nil) + ep.SendEvent(fe) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertIndexEventMatches(t, fe, userJson, output[0]) + // should get a summary event only, not a debug event + assertSummaryEventHasCounter(t, flag, nil, nil, 1, output[1]) + } +} + +func TestTwoFeatureEventsForSameUserGenerateOnlyOneIndexEvent(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + flag1 := FeatureFlag{ + Key: "flagkey1", + Version: 11, + TrackEvents: true, + } + flag2 := FeatureFlag{ + Key: "flagkey2", + Version: 22, + TrackEvents: true, + } + value := "value" + fe1 := NewFeatureRequestEvent(flag1.Key, &flag1, epDefaultUser, intPtr(2), value, nil, nil) + fe2 := NewFeatureRequestEvent(flag2.Key, &flag2, epDefaultUser, intPtr(2), value, nil, nil) + ep.SendEvent(fe1) + ep.SendEvent(fe2) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 4, len(output)) { + assertIndexEventMatches(t, fe1, userJson, output[0]) + assertFeatureEventMatches(t, fe1, flag1, value, false, nil, output[1]) + assertFeatureEventMatches(t, fe2, flag2, value, false, nil, output[2]) + assertSummaryEventHasCounter(t, flag1, intPtr(2), value, 1, output[3]) + assertSummaryEventHasCounter(t, flag2, intPtr(2), value, 1, output[3]) + } +} + +func TestNonTrackedEventsAreSummarized(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + flag1 := FeatureFlag{ + Key: "flagkey1", + Version: 11, + TrackEvents: false, + } + flag2 := FeatureFlag{ + Key: "flagkey2", + Version: 22, + TrackEvents: false, + } + value := "value" + fe1 := NewFeatureRequestEvent(flag1.Key, &flag1, epDefaultUser, intPtr(2), value, nil, nil) + fe2 := NewFeatureRequestEvent(flag2.Key, &flag2, epDefaultUser, intPtr(3), value, nil, nil) + fe3 := NewFeatureRequestEvent(flag2.Key, &flag2, epDefaultUser, intPtr(3), value, nil, nil) + ep.SendEvent(fe1) + ep.SendEvent(fe2) + ep.SendEvent(fe3) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertIndexEventMatches(t, fe1, userJson, output[0]) + + seo := output[1] + assertSummaryEventHasCounter(t, flag1, intPtr(2), value, 1, seo) + assertSummaryEventHasCounter(t, flag2, intPtr(3), value, 2, seo) + assert.Equal(t, float64(fe1.CreationDate), seo["startDate"]) + assert.Equal(t, float64(fe2.CreationDate), seo["endDate"]) + } +} + +func TestCustomEventIsQueuedWithUser(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + data := map[string]interface{}{ + "thing": "stuff", + } + ce := NewCustomEvent("eventkey", epDefaultUser, data) + ep.SendEvent(ce) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 2, len(output)) { + assertIndexEventMatches(t, ce, userJson, output[0]) + + ceo := output[1] + expected := map[string]interface{}{ + "kind": "custom", + "creationDate": float64(ce.CreationDate), + "key": ce.Key, + "data": data, + "userKey": *epDefaultUser.Key, + } + assert.Equal(t, expected, ceo) + } +} + +func TestCustomEventCanContainInlineUser(t *testing.T) { + config := epDefaultConfig + config.InlineUsersInEvents = true + ep, st := createEventProcessor(config) + defer ep.Close() + + data := map[string]interface{}{ + "thing": "stuff", + } + ce := NewCustomEvent("eventkey", epDefaultUser, data) + ep.SendEvent(ce) + + output := flushAndGetEvents(ep, st) + if assert.Equal(t, 1, len(output)) { + ceo := output[0] + expected := map[string]interface{}{ + "kind": "custom", + "creationDate": float64(ce.CreationDate), + "key": ce.Key, + "data": data, + "user": jsonMap(epDefaultUser), + } + assert.Equal(t, expected, ceo) + } +} + +func TestClosingEventProcessorForcesSynchronousFlush(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + ie := NewIdentifyEvent(epDefaultUser) + ep.SendEvent(ie) + ep.Close() + + output := getEventsFromRequest(st) + if assert.Equal(t, 1, len(output)) { + assertIdentifyEventMatches(t, ie, userJson, output[0]) + } +} + +func TestNothingIsSentIfThereAreNoEvents(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + ep.Flush() + ep.waitUntilInactive() + + msg := st.getNextRequest() + assert.Nil(t, msg) +} + +func TestSdkKeyIsSent(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + ie := NewIdentifyEvent(epDefaultUser) + ep.SendEvent(ie) + ep.Flush() + ep.waitUntilInactive() + + msg := st.getNextRequest() + assert.Equal(t, sdkKey, msg.Header.Get("Authorization")) +} + +func TestUserAgentIsSent(t *testing.T) { + config := epDefaultConfig + config.UserAgent = "SecretAgent" + ep, st := createEventProcessor(config) + defer ep.Close() + + ie := NewIdentifyEvent(epDefaultUser) + ep.SendEvent(ie) + ep.Flush() + ep.waitUntilInactive() + + msg := st.getNextRequest() + assert.Equal(t, config.UserAgent, msg.Header.Get("User-Agent")) +} + +func TestFlushIsRetriedOnceAfter5xxError(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + st.statusCode = 503 + + ie := NewIdentifyEvent(epDefaultUser) + ep.SendEvent(ie) + ep.Flush() + ep.waitUntilInactive() + + msg := st.getNextRequest() + assert.NotNil(t, msg) + msg = st.getNextRequest() + assert.NotNil(t, msg) + msg = st.getNextRequest() + assert.Nil(t, msg) +} + +func jsonMap(o interface{}) map[string]interface{} { + bytes, _ := json.Marshal(o) + var result map[string]interface{} + json.Unmarshal(bytes, &result) + return result +} + +func assertIdentifyEventMatches(t *testing.T, sourceEvent Event, encodedUser map[string]interface{}, output map[string]interface{}) { + expected := map[string]interface{}{ + "kind": "identify", + "key": *sourceEvent.GetBase().User.Key, + "creationDate": float64(sourceEvent.GetBase().CreationDate), + "user": encodedUser, + } + assert.Equal(t, expected, output) +} + +func assertIndexEventMatches(t *testing.T, sourceEvent Event, encodedUser map[string]interface{}, output map[string]interface{}) { + expected := map[string]interface{}{ + "kind": "index", + "creationDate": float64(sourceEvent.GetBase().CreationDate), + "user": encodedUser, + } + assert.Equal(t, expected, output) +} + +func assertFeatureEventMatches(t *testing.T, sourceEvent FeatureRequestEvent, flag FeatureFlag, + value interface{}, debug bool, inlineUser *map[string]interface{}, output map[string]interface{}) { + kind := "feature" + if debug { + kind = "debug" + } + expected := map[string]interface{}{ + "kind": kind, + "creationDate": float64(sourceEvent.CreationDate), + "key": flag.Key, + "version": float64(flag.Version), + "value": value, + "default": nil, + } + if sourceEvent.Variation != nil { + expected["variation"] = float64(*sourceEvent.Variation) + } + if inlineUser == nil { + expected["userKey"] = *sourceEvent.User.Key + } else { + expected["user"] = *inlineUser + } + assert.Equal(t, expected, output) +} + +func assertSummaryEventHasFlag(t *testing.T, flag FeatureFlag, output map[string]interface{}) bool { + if assert.Equal(t, "summary", output["kind"]) { + flags, _ := output["features"].(map[string]interface{}) + return assert.NotNil(t, flags) && assert.NotNil(t, flags[flag.Key]) + } + return false +} + +func assertSummaryEventHasCounter(t *testing.T, flag FeatureFlag, variation *int, value interface{}, count int, output map[string]interface{}) { + if assertSummaryEventHasFlag(t, flag, output) { + f, _ := output["features"].(map[string]interface{})[flag.Key].(map[string]interface{}) + assert.NotNil(t, f) + expected := map[string]interface{}{ + "value": value, + "count": float64(count), + "version": float64(flag.Version), + } + if variation != nil { + expected["variation"] = float64(*variation) + } + assert.Contains(t, f["counters"], expected) + } +} + +func createEventProcessor(config Config) (*defaultEventProcessor, *stubTransport) { + transport := &stubTransport{ + statusCode: 200, + messageSent: make(chan *http.Request, 100), + } + client := &http.Client{ + Transport: transport, + } + return NewDefaultEventProcessor(sdkKey, config, client), transport +} + +func flushAndGetEvents(ep *defaultEventProcessor, st *stubTransport) []map[string]interface{} { + ep.Flush() + ep.waitUntilInactive() + return getEventsFromRequest(st) +} + +func getEventsFromRequest(st *stubTransport) (output []map[string]interface{}) { + msg := st.getNextRequest() + if msg == nil { + return + } + bytes, err := ioutil.ReadAll(msg.Body) + if err != nil { + return + } + json.Unmarshal(bytes, &output) + return +} + +func (t *stubTransport) RoundTrip(request *http.Request) (*http.Response, error) { + t.messageSent <- request + if t.error != nil { + return nil, t.error + } + resp := http.Response{ + StatusCode: t.statusCode, + Header: make(http.Header), + Request: request, + } + if t.serverTime != 0 { + ts := epoch.Add(time.Duration(t.serverTime) * time.Millisecond) + resp.Header.Add("Date", ts.Format(http.TimeFormat)) + } + return &resp, nil +} + +func (t *stubTransport) getNextRequest() *http.Request { + select { + case msg := <-t.messageSent: + return msg + default: + return nil + } +} diff --git a/event_summarizer.go b/event_summarizer.go new file mode 100644 index 0000000..e0e927a --- /dev/null +++ b/event_summarizer.go @@ -0,0 +1,87 @@ +package ldclient + +// Manages the state of summarizable information for the EventProcessor, including the +// event counters and user deduplication. Note that the methods for this type are +// deliberately not thread-safe, because they should always be called from EventProcessor's +// single event-processing goroutine. +type eventSummarizer struct { + eventsState eventSummary +} + +type eventSummary struct { + counters map[counterKey]*counterValue + startDate uint64 + endDate uint64 +} + +type counterKey struct { + key string + variation int + version int +} + +const ( + nilVariation = -1 +) + +type counterValue struct { + count int + flagValue interface{} + flagDefault interface{} +} + +func newEventSummarizer() eventSummarizer { + return eventSummarizer{eventsState: newEventSummary()} +} + +func newEventSummary() eventSummary { + return eventSummary{ + counters: make(map[counterKey]*counterValue), + } +} + +// Adds this event to our counters, if it is a type of event we need to count. +func (s *eventSummarizer) summarizeEvent(evt Event) { + var fe FeatureRequestEvent + var ok bool + if fe, ok = evt.(FeatureRequestEvent); !ok { + return + } + + key := counterKey{key: fe.Key} + if fe.Variation != nil { + key.variation = *fe.Variation + } else { + key.variation = nilVariation + } + if fe.Version != nil { + key.version = *fe.Version + } + + if value, ok := s.eventsState.counters[key]; ok { + value.count++ + } else { + s.eventsState.counters[key] = &counterValue{ + count: 1, + flagValue: fe.Value, + flagDefault: fe.Default, + } + } + + creationDate := fe.CreationDate + if s.eventsState.startDate == 0 || creationDate < s.eventsState.startDate { + s.eventsState.startDate = creationDate + } + if creationDate > s.eventsState.endDate { + s.eventsState.endDate = creationDate + } +} + +// Returns a snapshot of the current summarized event data. +func (s *eventSummarizer) snapshot() eventSummary { + return s.eventsState +} + +func (s *eventSummarizer) reset() { + s.eventsState = newEventSummary() +} diff --git a/event_summarizer_test.go b/event_summarizer_test.go new file mode 100644 index 0000000..e72989a --- /dev/null +++ b/event_summarizer_test.go @@ -0,0 +1,106 @@ +package ldclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +var user = NewUser("key") + +func TestSummarizeEventDoesNothingForIdentifyEvent(t *testing.T) { + es := newEventSummarizer() + snapshot := es.snapshot() + + event := NewIdentifyEvent(user) + es.summarizeEvent(event) + + assert.Equal(t, snapshot, es.snapshot()) +} + +func TestSummarizeEventDoesNothingForCustomEvent(t *testing.T) { + es := newEventSummarizer() + snapshot := es.snapshot() + + event := NewCustomEvent("whatever", user, nil) + es.summarizeEvent(event) + + assert.Equal(t, snapshot, es.snapshot()) +} + +func TestSummarizeEventSetsStartAndEndDates(t *testing.T) { + es := newEventSummarizer() + flag := FeatureFlag{ + Key: "key", + } + event1 := NewFeatureRequestEvent(flag.Key, &flag, user, nil, nil, nil, nil) + event2 := NewFeatureRequestEvent(flag.Key, &flag, user, nil, nil, nil, nil) + event3 := NewFeatureRequestEvent(flag.Key, &flag, user, nil, nil, nil, nil) + event1.BaseEvent.CreationDate = 2000 + event2.BaseEvent.CreationDate = 1000 + event3.BaseEvent.CreationDate = 1500 + es.summarizeEvent(event1) + es.summarizeEvent(event2) + data := es.snapshot() + + assert.Equal(t, uint64(1000), data.startDate) + assert.Equal(t, uint64(2000), data.endDate) +} + +func TestSummarizeEventIncrementsCounters(t *testing.T) { + es := newEventSummarizer() + flag1 := FeatureFlag{ + Key: "key1", + Version: 11, + } + flag2 := FeatureFlag{ + Key: "key2", + Version: 22, + } + unknownFlagKey := "badkey" + variation1 := 1 + variation2 := 2 + event1 := NewFeatureRequestEvent(flag1.Key, &flag1, user, &variation1, "value1", "default1", nil) + event2 := NewFeatureRequestEvent(flag1.Key, &flag1, user, &variation2, "value2", "default1", nil) + event3 := NewFeatureRequestEvent(flag2.Key, &flag2, user, &variation1, "value99", "default2", nil) + event4 := NewFeatureRequestEvent(flag1.Key, &flag1, user, &variation1, "value1", "default1", nil) + event5 := NewFeatureRequestEvent(unknownFlagKey, nil, user, nil, "default3", "default3", nil) + es.summarizeEvent(event1) + es.summarizeEvent(event2) + es.summarizeEvent(event3) + es.summarizeEvent(event4) + es.summarizeEvent(event5) + data := es.snapshot() + + expectedCounters := map[counterKey]*counterValue{ + counterKey{flag1.Key, variation1, flag1.Version}: &counterValue{2, "value1", "default1"}, + counterKey{flag1.Key, variation2, flag1.Version}: &counterValue{1, "value2", "default1"}, + counterKey{flag2.Key, variation1, flag2.Version}: &counterValue{1, "value99", "default2"}, + counterKey{unknownFlagKey, -1, 0}: &counterValue{1, "default3", "default3"}, + } + assert.Equal(t, expectedCounters, data.counters) +} + +func TestCounterForNilVariationIsDistinctFromOthers(t *testing.T) { + es := newEventSummarizer() + flag := FeatureFlag{ + Key: "key1", + Version: 11, + } + variation1 := 1 + variation2 := 2 + event1 := NewFeatureRequestEvent(flag.Key, &flag, user, &variation1, "value1", "default1", nil) + event2 := NewFeatureRequestEvent(flag.Key, &flag, user, &variation2, "value2", "default1", nil) + event3 := NewFeatureRequestEvent(flag.Key, &flag, user, nil, "default1", "default1", nil) + es.summarizeEvent(event1) + es.summarizeEvent(event2) + es.summarizeEvent(event3) + data := es.snapshot() + + expectedCounters := map[counterKey]*counterValue{ + counterKey{flag.Key, variation1, flag.Version}: &counterValue{1, "value1", "default1"}, + counterKey{flag.Key, variation2, flag.Version}: &counterValue{1, "value2", "default1"}, + counterKey{flag.Key, -1, flag.Version}: &counterValue{1, "default1", "default1"}, + } + assert.Equal(t, expectedCounters, data.counters) +} diff --git a/events.go b/events.go index 59ce591..0e93f85 100644 --- a/events.go +++ b/events.go @@ -1,226 +1,88 @@ package ldclient import ( - "bytes" - "encoding/json" - "errors" - "io/ioutil" - "math/rand" - "net/http" - "sync" "time" ) -type eventProcessor struct { - queue []Event - sdkKey string - config Config - closed bool - mu *sync.Mutex - client *http.Client - closer chan struct{} -} - +// An Event represents an analytics event generated by the client, which will be passed to +// the EventProcessor. The event data that the EventProcessor actually sends to LaunchDarkly +// may be slightly different. type Event interface { GetBase() BaseEvent - GetKind() string } +// BaseEvent provides properties common to all events. type BaseEvent struct { - CreationDate uint64 `json:"creationDate"` - Key string `json:"key"` - Kind string `json:"kind"` - User User `json:"user"` + CreationDate uint64 + User User } +// FeatureRequestEvent is generated by evaluating a feature flag or one of a flag's prerequisites. type FeatureRequestEvent struct { BaseEvent - Value interface{} `json:"value"` - Default interface{} `json:"default"` - Version *int `json:"version,omitempty"` - PrereqOf *string `json:"prereqOf,omitempty"` -} - -const ( - FEATURE_REQUEST_EVENT = "feature" - CUSTOM_EVENT = "custom" - IDENTIFY_EVENT = "identify" -) - -func newEventProcessor(sdkKey string, config Config) *eventProcessor { - res := &eventProcessor{ - queue: make([]Event, 0), - sdkKey: sdkKey, - config: config, - client: &http.Client{}, - closer: make(chan struct{}), - mu: &sync.Mutex{}, - } - - go func() { - if err := recover(); err != nil { - res.config.Logger.Printf("Unexpected panic in event processing thread: %+v", err) - } - - ticker := time.NewTicker(config.FlushInterval) - for { - select { - case <-ticker.C: - res.flush() - case <-res.closer: - ticker.Stop() - return - } - } - }() - - return res -} - -func (ep *eventProcessor) close() { - ep.mu.Lock() - closed := ep.closed - ep.closed = true - ep.mu.Unlock() - - if !closed { - close(ep.closer) - ep.flush() - } + Key string + Variation *int + Value interface{} + Default interface{} + Version *int + PrereqOf *string + TrackEvents bool + DebugEventsUntilDate *uint64 + Debug bool +} + +// CustomEvent is generated by calling the client's Track method. +type CustomEvent struct { + BaseEvent + Key string + Data interface{} } -func (ep *eventProcessor) flush() { - uri := ep.config.EventsUri + "/bulk" - ep.mu.Lock() - - if len(ep.queue) == 0 || ep.closed { - ep.mu.Unlock() - return - } - - events := ep.queue - ep.queue = make([]Event, 0) - ep.mu.Unlock() - - payload, marshalErr := json.Marshal(events) - - if marshalErr != nil { - ep.config.Logger.Printf("Unexpected error marshalling event json: %+v", marshalErr) - } - - req, reqErr := http.NewRequest("POST", uri, bytes.NewReader(payload)) - - if reqErr != nil { - ep.config.Logger.Printf("Unexpected error while creating event request: %+v", reqErr) - } - - req.Header.Add("Authorization", ep.sdkKey) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("User-Agent", ep.config.UserAgent) - - resp, respErr := ep.client.Do(req) - - defer func() { - if resp != nil && resp.Body != nil { - ioutil.ReadAll(resp.Body) - resp.Body.Close() - } - }() - - if respErr != nil { - ep.config.Logger.Printf("Unexpected error while sending events: %+v", respErr) - return - } - err := checkStatusCode(resp.StatusCode, uri) - if err != nil { - ep.config.Logger.Printf("Unexpected status code when sending events: %+v", err) - if err != nil && err.Code == 401 { - ep.config.Logger.Printf("Received 401 error, no further events will be posted since SDK key is invalid") - ep.mu.Lock() - ep.closed = true - ep.mu.Unlock() - } - } +// IdentifyEvent is generated by calling the client's Identify method. +type IdentifyEvent struct { + BaseEvent } -func (ep *eventProcessor) sendEvent(evt Event) error { - if !ep.config.SendEvents { - return nil - } - - if ep.config.SamplingInterval > 0 && rand.Int31n(ep.config.SamplingInterval) != 0 { - return nil - } - - scrubbedUser := scrubUser(evt.GetBase().User, ep.config.AllAttributesPrivate, ep.config.PrivateAttributeNames) - var newEvent Event - switch evt := evt.(type) { - case FeatureRequestEvent: - evt.User = scrubbedUser - newEvent = evt - case CustomEvent: - evt.User = scrubbedUser - newEvent = evt - case IdentifyEvent: - evt.User = scrubbedUser - newEvent = evt - default: - return errors.New("unknown event type") - } - - ep.mu.Lock() - defer ep.mu.Unlock() - - if ep.closed { - return nil - } - if len(ep.queue) >= ep.config.Capacity { - return errors.New("Exceeded event queue capacity. Increase capacity to avoid dropping events.") - } - ep.queue = append(ep.queue, newEvent) - return nil +// IndexEvent is generated internally to capture user details from other events. +type IndexEvent struct { + BaseEvent } // Used to just create the event. Normally, you don't need to call this; // the event is created and queued automatically during feature flag evaluation. -func NewFeatureRequestEvent(key string, user User, value, defaultVal interface{}, version *int, prereqOf *string) FeatureRequestEvent { - return FeatureRequestEvent{ +func NewFeatureRequestEvent(key string, flag *FeatureFlag, user User, variation *int, value, defaultVal interface{}, prereqOf *string) FeatureRequestEvent { + fre := FeatureRequestEvent{ BaseEvent: BaseEvent{ CreationDate: now(), - Key: key, User: user, - Kind: FEATURE_REQUEST_EVENT, }, - Value: value, - Default: defaultVal, - Version: version, - PrereqOf: prereqOf, + Key: key, + Variation: variation, + Value: value, + Default: defaultVal, + PrereqOf: prereqOf, } + if flag != nil { + fre.Version = &flag.Version + fre.TrackEvents = flag.TrackEvents + fre.DebugEventsUntilDate = flag.DebugEventsUntilDate + } + return fre } func (evt FeatureRequestEvent) GetBase() BaseEvent { return evt.BaseEvent } -func (evt FeatureRequestEvent) GetKind() string { - return evt.Kind -} - -type CustomEvent struct { - BaseEvent - Data interface{} `json:"data"` -} - // Constructs a new custom event, but does not send it. Typically, Track should be used to both create the // event and send it to LaunchDarkly. func NewCustomEvent(key string, user User, data interface{}) CustomEvent { return CustomEvent{ BaseEvent: BaseEvent{ CreationDate: now(), - Key: key, User: user, - Kind: CUSTOM_EVENT, }, + Key: key, Data: data, } } @@ -229,29 +91,13 @@ func (evt CustomEvent) GetBase() BaseEvent { return evt.BaseEvent } -func (evt CustomEvent) GetKind() string { - return evt.Kind -} - -type IdentifyEvent struct { - BaseEvent -} - // Constructs a new identify event, but does not send it. Typically, Identify should be used to both create the // event and send it to LaunchDarkly. func NewIdentifyEvent(user User) IdentifyEvent { - var key string - if user.Key == nil { - key = "" - } else { - key = *user.Key - } return IdentifyEvent{ BaseEvent: BaseEvent{ CreationDate: now(), - Key: key, User: user, - Kind: IDENTIFY_EVENT, }, } } @@ -260,8 +106,8 @@ func (evt IdentifyEvent) GetBase() BaseEvent { return evt.BaseEvent } -func (evt IdentifyEvent) GetKind() string { - return evt.Kind +func (evt IndexEvent) GetBase() BaseEvent { + return evt.BaseEvent } func now() uint64 { @@ -273,77 +119,3 @@ func toUnixMillis(t time.Time) uint64 { return uint64(ms) } - -func scrubUser(user User, allAttributesPrivate bool, globalPrivateAttributes []string) User { - user.PrivateAttributes = nil - - if len(user.PrivateAttributeNames) == 0 && len(globalPrivateAttributes) == 0 && !allAttributesPrivate { - return user - } - - isPrivate := map[string]bool{} - for _, n := range globalPrivateAttributes { - isPrivate[n] = true - } - for _, n := range user.PrivateAttributeNames { - isPrivate[n] = true - } - - if user.Custom != nil { - var custom = map[string]interface{}{} - for k, v := range *user.Custom { - if allAttributesPrivate || isPrivate[k] { - user.PrivateAttributes = append(user.PrivateAttributes, k) - } else { - custom[k] = v - } - } - user.Custom = &custom - } - - if !isEmpty(user.Avatar) && (allAttributesPrivate || isPrivate["avatar"]) { - user.Avatar = nil - user.PrivateAttributes = append(user.PrivateAttributes, "avatar") - } - - if !isEmpty(user.Country) && (allAttributesPrivate || isPrivate["country"]) { - user.Country = nil - user.PrivateAttributes = append(user.PrivateAttributes, "country") - } - - if !isEmpty(user.Ip) && (allAttributesPrivate || isPrivate["ip"]) { - user.Ip = nil - user.PrivateAttributes = append(user.PrivateAttributes, "ip") - } - - if !isEmpty(user.FirstName) && (allAttributesPrivate || isPrivate["firstName"]) { - user.FirstName = nil - user.PrivateAttributes = append(user.PrivateAttributes, "firstName") - } - - if !isEmpty(user.LastName) && (allAttributesPrivate || isPrivate["lastName"]) { - user.LastName = nil - user.PrivateAttributes = append(user.PrivateAttributes, "lastName") - } - - if !isEmpty(user.Name) && (allAttributesPrivate || isPrivate["name"]) { - user.Name = nil - user.PrivateAttributes = append(user.PrivateAttributes, "name") - } - - if !isEmpty(user.Secondary) && (allAttributesPrivate || isPrivate["secondary"]) { - user.Secondary = nil - user.PrivateAttributes = append(user.PrivateAttributes, "secondary") - } - - if !isEmpty(user.Email) && (allAttributesPrivate || isPrivate["email"]) { - user.Email = nil - user.PrivateAttributes = append(user.PrivateAttributes, "email") - } - - return user -} - -func isEmpty(s *string) bool { - return s == nil || *s == "" -} diff --git a/events_output.go b/events_output.go new file mode 100644 index 0000000..9ae995c --- /dev/null +++ b/events_output.go @@ -0,0 +1,188 @@ +package ldclient + +// The types in this file are for analytics event data structures that we send to +// LaunchDarkly. + +// Serializable form of a feature request event. This differs from the event that was +// passed in to us in that it usually has a user key instead of a user object. +type featureRequestEventOutput struct { + Kind string `json:"kind"` + CreationDate uint64 `json:"creationDate"` + Key string `json:"key"` + UserKey *string `json:"userKey,omitempty"` + User *User `json:"user,omitempty"` + Variation *int `json:"variation,omitempty"` + Value interface{} `json:"value"` + Default interface{} `json:"default"` + Version *int `json:"version,omitempty"` + PrereqOf *string `json:"prereqOf,omitempty"` +} + +// Serializable form of an identify event. +type identifyEventOutput struct { + Kind string `json:"kind"` + CreationDate uint64 `json:"creationDate"` + Key *string `json:"key"` + User *User `json:"user"` +} + +// Serializable form of a custom event. It has a user key instead of a user object. +type customEventOutput struct { + Kind string `json:"kind"` + CreationDate uint64 `json:"creationDate"` + Key string `json:"key"` + UserKey *string `json:"userKey,omitempty"` + User *User `json:"user,omitempty"` + Data interface{} `json:"data,omitempty"` +} + +// Serializable form of an index event. This is not generated by an explicit client call, +// but is created automatically whenever we see a user we haven't seen before in a feature +// request event or custom event. +type indexEventOutput struct { + Kind string `json:"kind"` + CreationDate uint64 `json:"creationDate"` + User *User `json:"user"` +} + +// Serializable form of a summary event, containing data generated by EventSummarizer. +type summaryEventOutput struct { + Kind string `json:"kind"` + StartDate uint64 `json:"startDate"` + EndDate uint64 `json:"endDate"` + Features map[string]flagSummaryData `json:"features"` +} + +type flagSummaryData struct { + Default interface{} `json:"default"` + Counters []flagCounterData `json:"counters"` +} + +type flagCounterData struct { + Value interface{} `json:"value"` + Variation *int `json:"variation,omitempty"` + Version *int `json:"version,omitempty"` + Count int `json:"count"` + Unknown *bool `json:"unknown,omitempty"` +} + +const ( + FeatureRequestEventKind = "feature" + FeatureDebugEventKind = "debug" + CustomEventKind = "custom" + IdentifyEventKind = "identify" + IndexEventKind = "index" + SummaryEventKind = "summary" +) + +type eventOutputFormatter struct { + userFilter userFilter + inlineUsers bool +} + +func (ef eventOutputFormatter) makeOutputEvents(events []Event, summary eventSummary) []interface{} { + out := make([]interface{}, 0, len(events)+1) // leave room for summary, if any + for _, e := range events { + oe := ef.makeOutputEvent(e) + if oe != nil { + out = append(out, oe) + } + } + if len(summary.counters) > 0 { + out = append(out, ef.makeSummaryEvent(summary)) + } + return out +} + +func (ef eventOutputFormatter) makeOutputEvent(evt interface{}) interface{} { + switch evt := evt.(type) { + case FeatureRequestEvent: + fe := featureRequestEventOutput{ + CreationDate: evt.BaseEvent.CreationDate, + Key: evt.Key, + Variation: evt.Variation, + Value: evt.Value, + Default: evt.Default, + Version: evt.Version, + PrereqOf: evt.PrereqOf, + } + if ef.inlineUsers || evt.Debug { + fe.User = ef.userFilter.scrubUser(evt.User) + } else { + fe.UserKey = evt.User.Key + } + if evt.Debug { + fe.Kind = FeatureDebugEventKind + } else { + fe.Kind = FeatureRequestEventKind + } + return fe + case CustomEvent: + ce := customEventOutput{ + Kind: CustomEventKind, + CreationDate: evt.BaseEvent.CreationDate, + Key: evt.Key, + Data: evt.Data, + } + if ef.inlineUsers { + ce.User = ef.userFilter.scrubUser(evt.User) + } else { + ce.UserKey = evt.User.Key + } + return ce + case IdentifyEvent: + return identifyEventOutput{ + Kind: IdentifyEventKind, + CreationDate: evt.BaseEvent.CreationDate, + Key: evt.User.Key, + User: ef.userFilter.scrubUser(evt.User), + } + case IndexEvent: + return indexEventOutput{ + Kind: IndexEventKind, + CreationDate: evt.BaseEvent.CreationDate, + User: ef.userFilter.scrubUser(evt.User), + } + default: + return nil + } +} + +// Transforms the summary data into the format used for event sending. +func (ef eventOutputFormatter) makeSummaryEvent(snapshot eventSummary) summaryEventOutput { + features := make(map[string]flagSummaryData) + for key, value := range snapshot.counters { + var flagData flagSummaryData + var known bool + if flagData, known = features[key.key]; !known { + flagData = flagSummaryData{ + Default: value.flagDefault, + Counters: make([]flagCounterData, 0, 2), + } + } + data := flagCounterData{ + Value: value.flagValue, + Count: value.count, + } + if key.variation != nilVariation { + v := key.variation + data.Variation = &v + } + if key.version == 0 { + unknown := true + data.Unknown = &unknown + } else { + version := key.version + data.Version = &version + } + flagData.Counters = append(flagData.Counters, data) + features[key.key] = flagData + } + + return summaryEventOutput{ + Kind: SummaryEventKind, + StartDate: snapshot.startDate, + EndDate: snapshot.endDate, + Features: features, + } +} diff --git a/flag.go b/flag.go index e62da2d..4335f83 100644 --- a/flag.go +++ b/flag.go @@ -14,18 +14,20 @@ const ( ) type FeatureFlag struct { - Key string `json:"key" bson:"key"` - Version int `json:"version" bson:"version"` - On bool `json:"on" bson:"on"` - Prerequisites []Prerequisite `json:"prerequisites" bson:"prerequisites"` - Salt string `json:"salt" bson:"salt"` - Sel string `json:"sel" bson:"sel"` - Targets []Target `json:"targets" bson:"targets"` - Rules []Rule `json:"rules" bson:"rules"` - Fallthrough VariationOrRollout `json:"fallthrough" bson:"fallthrough"` - OffVariation *int `json:"offVariation" bson:"offVariation"` - Variations []interface{} `json:"variations" bson:"variations"` - Deleted bool `json:"deleted" bson:"deleted"` + Key string `json:"key" bson:"key"` + Version int `json:"version" bson:"version"` + On bool `json:"on" bson:"on"` + Prerequisites []Prerequisite `json:"prerequisites" bson:"prerequisites"` + Salt string `json:"salt" bson:"salt"` + Sel string `json:"sel" bson:"sel"` + Targets []Target `json:"targets" bson:"targets"` + Rules []Rule `json:"rules" bson:"rules"` + Fallthrough VariationOrRollout `json:"fallthrough" bson:"fallthrough"` + OffVariation *int `json:"offVariation" bson:"offVariation"` + Variations []interface{} `json:"variations" bson:"variations"` + TrackEvents bool `json:"trackEvents" bson:"trackEvents"` + DebugEventsUntilDate *uint64 `json:"debugEventsUntilDate" bson:"debugEventsUntilDate"` + Deleted bool `json:"deleted" bson:"deleted"` } func (f *FeatureFlag) GetKey() string { @@ -145,31 +147,32 @@ func bucketableStringValue(uValue interface{}) (string, bool) { type EvalResult struct { Value interface{} + Variation *int Explanation *Explanation PrerequisiteRequestEvents []FeatureRequestEvent //to be sent to LD } -func (f FeatureFlag) Evaluate(user User, store FeatureStore) (interface{}, []FeatureRequestEvent) { +func (f FeatureFlag) Evaluate(user User, store FeatureStore) (interface{}, *int, []FeatureRequestEvent) { var prereqEvents []FeatureRequestEvent if f.On { evalResult, err := f.EvaluateExplain(user, store) prereqEvents = evalResult.PrerequisiteRequestEvents if err != nil { - return nil, prereqEvents + return nil, nil, prereqEvents } if evalResult.Value != nil { - return evalResult.Value, prereqEvents + return evalResult.Value, evalResult.Variation, prereqEvents } // If the value is nil, but the error is not, fall through and use the off variation } if f.OffVariation != nil && *f.OffVariation < len(f.Variations) { value := f.Variations[*f.OffVariation] - return value, prereqEvents + return value, f.OffVariation, prereqEvents } - return nil, prereqEvents + return nil, nil, prereqEvents } func (f FeatureFlag) EvaluateExplain(user User, store FeatureStore) (*EvalResult, error) { @@ -177,16 +180,17 @@ func (f FeatureFlag) EvaluateExplain(user User, store FeatureStore) (*EvalResult return nil, nil } events := make([]FeatureRequestEvent, 0) - value, explanation, err := f.evaluateExplain(user, store, &events) + value, index, explanation, err := f.evaluateExplain(user, store, &events) return &EvalResult{ Value: value, + Variation: index, Explanation: explanation, PrerequisiteRequestEvents: events, }, err } -func (f FeatureFlag) evaluateExplain(user User, store FeatureStore, events *[]FeatureRequestEvent) (interface{}, *Explanation, error) { +func (f FeatureFlag) evaluateExplain(user User, store FeatureStore, events *[]FeatureRequestEvent) (interface{}, *int, *Explanation, error) { var failedPrereq *Prerequisite for _, prereq := range f.Prerequisites { data, err := store.Get(Features, prereq.Key) @@ -196,12 +200,12 @@ func (f FeatureFlag) evaluateExplain(user User, store FeatureStore, events *[]Fe } prereqFeatureFlag, _ := data.(*FeatureFlag) if prereqFeatureFlag.On { - prereqValue, _, err := prereqFeatureFlag.evaluateExplain(user, store, events) + prereqValue, prereqIndex, _, err := prereqFeatureFlag.evaluateExplain(user, store, events) if err != nil { failedPrereq = &prereq } - *events = append(*events, NewFeatureRequestEvent(prereq.Key, user, prereqValue, nil, &prereqFeatureFlag.Version, &f.Key)) + *events = append(*events, NewFeatureRequestEvent(prereq.Key, prereqFeatureFlag, user, prereqIndex, prereqValue, nil, &f.Key)) variation, verr := prereqFeatureFlag.getVariation(&prereq.Variation) if prereqValue == nil || verr != nil || prereqValue != variation { failedPrereq = &prereq @@ -217,16 +221,16 @@ func (f FeatureFlag) evaluateExplain(user User, store FeatureStore, events *[]Fe Prerequisite: failedPrereq, } //return the last prereq to fail - return nil, &explanation, nil + return nil, nil, &explanation, nil } index, explanation := f.evaluateExplainIndex(store, user) variation, verr := f.getVariation(index) if verr != nil { - return nil, explanation, verr + return nil, index, explanation, verr } - return variation, explanation, nil + return variation, index, explanation, nil } func (f FeatureFlag) getVariation(index *int) (interface{}, error) { diff --git a/flag_test.go b/flag_test.go index 3d1df8b..153ea29 100644 --- a/flag_test.go +++ b/flag_test.go @@ -22,8 +22,9 @@ func TestFlagReturnsOffVariationIfFlagIsOff(t *testing.T) { Variations: []interface{}{"fall", "off", "on"}, } - value, events := f.Evaluate(flagUser, emptyFeatureStore) + value, index, events := f.Evaluate(flagUser, emptyFeatureStore) assert.Equal(t, "off", value) + assert.Equal(t, intPtr(1), index) assert.Equal(t, 0, len(events)) } @@ -35,8 +36,9 @@ func TestFlagReturnsNilIfFlagIsOffAndOffVariationIsUnspecified(t *testing.T) { Variations: []interface{}{"fall", "off", "on"}, } - value, events := f.Evaluate(flagUser, emptyFeatureStore) - assert.Equal(t, nil, value) + value, index, events := f.Evaluate(flagUser, emptyFeatureStore) + assert.Nil(t, value) + assert.Nil(t, index) assert.Equal(t, 0, len(events)) } @@ -50,8 +52,9 @@ func TestFlagReturnsOffVariationIfPrerequisiteIsNotFound(t *testing.T) { Variations: []interface{}{"fall", "off", "on"}, } - value, events := f0.Evaluate(flagUser, emptyFeatureStore) + value, index, events := f0.Evaluate(flagUser, emptyFeatureStore) assert.Equal(t, "off", value) + assert.Equal(t, intPtr(1), index) assert.Equal(t, 0, len(events)) } @@ -76,15 +79,16 @@ func TestFlagReturnsOffVariationAndEventIfPrerequisiteIsNotMet(t *testing.T) { featureStore := NewInMemoryFeatureStore(nil) featureStore.Upsert(Features, &f1) - value, events := f0.Evaluate(flagUser, featureStore) + value, index, events := f0.Evaluate(flagUser, featureStore) assert.Equal(t, "off", value) + assert.Equal(t, intPtr(1), index) assert.Equal(t, 1, len(events)) e := events[0] assert.Equal(t, f1.Key, e.Key) - assert.Equal(t, "feature", e.Kind) assert.Equal(t, "nogo", e.Value) assert.Equal(t, intPtr(f1.Version), e.Version) + assert.Equal(t, intPtr(0), e.Variation) assert.Equal(t, strPtr(f0.Key), e.PrereqOf) } @@ -109,14 +113,15 @@ func TestFlagReturnsFallthroughVariationAndEventIfPrerequisiteIsMetAndThereAreNo featureStore := NewInMemoryFeatureStore(nil) featureStore.Upsert(Features, &f1) - value, events := f0.Evaluate(flagUser, featureStore) + value, index, events := f0.Evaluate(flagUser, featureStore) assert.Equal(t, "fall", value) + assert.Equal(t, intPtr(0), index) assert.Equal(t, 1, len(events)) e := events[0] assert.Equal(t, f1.Key, e.Key) - assert.Equal(t, "feature", e.Kind) assert.Equal(t, "go", e.Value) + assert.Equal(t, intPtr(1), e.Variation) assert.Equal(t, intPtr(f1.Version), e.Version) assert.Equal(t, strPtr(f0.Key), e.PrereqOf) } @@ -151,23 +156,24 @@ func TestMultipleLevelsOfPrerequisiteProduceMultipleEvents(t *testing.T) { featureStore.Upsert(Features, &f1) featureStore.Upsert(Features, &f2) - value, events := f0.Evaluate(flagUser, featureStore) + value, index, events := f0.Evaluate(flagUser, featureStore) assert.Equal(t, "fall", value) + assert.Equal(t, intPtr(0), index) assert.Equal(t, 2, len(events)) // events are generated recursively, so the deepest level of prerequisite appears first e0 := events[0] assert.Equal(t, f2.Key, e0.Key) - assert.Equal(t, "feature", e0.Kind) assert.Equal(t, "go", e0.Value) + assert.Equal(t, intPtr(1), e0.Variation) assert.Equal(t, intPtr(f2.Version), e0.Version) assert.Equal(t, strPtr(f1.Key), e0.PrereqOf) e1 := events[1] assert.Equal(t, f1.Key, e1.Key) - assert.Equal(t, "feature", e1.Kind) assert.Equal(t, "go", e1.Value) + assert.Equal(t, intPtr(1), e1.Variation) assert.Equal(t, intPtr(f1.Version), e1.Version) assert.Equal(t, strPtr(f0.Key), e1.PrereqOf) } @@ -183,8 +189,9 @@ func TestFlagMatchesUserFromTargets(t *testing.T) { } user := NewUser("userkey") - value, events := f.Evaluate(user, emptyFeatureStore) + value, index, events := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, "on", value) + assert.Equal(t, intPtr(2), index) assert.Equal(t, 0, len(events)) } @@ -209,8 +216,9 @@ func TestFlagMatchesUserFromRules(t *testing.T) { } user := NewUser("userkey") - value, events := f.Evaluate(user, emptyFeatureStore) + value, index, events := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, "on", value) + assert.Equal(t, intPtr(2), index) assert.Equal(t, 0, len(events)) } @@ -223,7 +231,7 @@ func TestClauseCanMatchBuiltInAttribute(t *testing.T) { f := booleanFlagWithClause(clause) user := User{Key: strPtr("key"), Name: strPtr("Bob")} - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, true, value) } @@ -237,7 +245,7 @@ func TestClauseCanMatchCustomAttribute(t *testing.T) { custom := map[string]interface{}{"legs": 4} user := User{Key: strPtr("key"), Custom: &custom} - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, true, value) } @@ -250,7 +258,7 @@ func TestClauseReturnsFalseForMissingAttribute(t *testing.T) { f := booleanFlagWithClause(clause) user := User{Key: strPtr("key"), Name: strPtr("Bob")} - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, false, value) } @@ -264,7 +272,7 @@ func TestClauseCanBeNegated(t *testing.T) { f := booleanFlagWithClause(clause) user := User{Key: strPtr("key"), Name: strPtr("Bob")} - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, false, value) } @@ -278,7 +286,7 @@ func TestClauseForMissingAttributeIsFalseEvenIfNegated(t *testing.T) { f := booleanFlagWithClause(clause) user := User{Key: strPtr("key"), Name: strPtr("Bob")} - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, false, value) } @@ -291,7 +299,7 @@ func TestClauseWithUnknownOperatorDoesNotMatch(t *testing.T) { f := booleanFlagWithClause(clause) user := User{Key: strPtr("key"), Name: strPtr("Bob")} - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, false, value) } @@ -317,7 +325,7 @@ func TestClauseWithUnknownOperatorDoesNotStopSubsequentRuleFromMatching(t *testi } user := User{Key: strPtr("key"), Name: strPtr("Bob")} - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, true, value) } @@ -332,7 +340,7 @@ func TestSegmentMatchClauseRetrievesSegmentFromStore(t *testing.T) { featureStore.Upsert(Segments, &segment) user := NewUser("foo") - value, _ := f.Evaluate(user, featureStore) + value, _, _ := f.Evaluate(user, featureStore) assert.Equal(t, true, value) } @@ -341,7 +349,7 @@ func TestSegmentMatchClauseFallsThroughIfSegmentNotFound(t *testing.T) { f := booleanFlagWithClause(clause) user := NewUser("foo") - value, _ := f.Evaluate(user, emptyFeatureStore) + value, _, _ := f.Evaluate(user, emptyFeatureStore) assert.Equal(t, false, value) } @@ -356,7 +364,7 @@ func TestCanMatchJustOneSegmentFromList(t *testing.T) { featureStore.Upsert(Segments, &segment) user := NewUser("foo") - value, _ := f.Evaluate(user, featureStore) + value, _, _ := f.Evaluate(user, featureStore) assert.Equal(t, true, value) } diff --git a/ldclient.go b/ldclient.go index 65dc951..c63c36f 100644 --- a/ldclient.go +++ b/ldclient.go @@ -14,7 +14,7 @@ import ( "time" ) -const Version = "3.1.0" +const Version = "4.0.0" // The LaunchDarkly client. Client instances are thread-safe. // Applications should instantiate a single instance for the lifetime @@ -22,7 +22,7 @@ const Version = "3.1.0" type LDClient struct { sdkKey string config Config - eventProcessor *eventProcessor + eventProcessor EventProcessor updateProcessor UpdateProcessor store FeatureStore } @@ -50,8 +50,21 @@ type Config struct { Offline bool AllAttributesPrivate bool PrivateAttributeNames []string - UpdateProcessor UpdateProcessor - UserAgent string + // An object that is responsible for receiving feature flag updates from LaunchDarkly. + // If nil, a default implementation will be used depending on the rest of the configuration + // (streaming, polling, etc.); a custom implementation can be substituted for testing. + UpdateProcessor UpdateProcessor + // An object that is responsible for recording or sending analytics events. If nil, a + // default implementation will be used; a custom implementation can be substituted for testing. + EventProcessor EventProcessor + // The number of user keys that the event processor can remember at any one time, so that + // duplicate user details will not be sent in analytics events. + UserKeysCapacity int + // The interval at which the event processor will reset its set of known user keys. + UserKeysFlushInterval time.Duration + // Set to true if you need to see the full user details in every analytics event. + InlineUsersInEvents bool + UserAgent string } // The minimum value for Config.PollInterval. If you specify a smaller interval, @@ -60,7 +73,7 @@ const MinimumPollInterval = 30 * time.Second type UpdateProcessor interface { Initialized() bool - Close() + Close() error Start(closeWhenReady chan<- struct{}) } @@ -70,20 +83,22 @@ type UpdateProcessor interface { // var config = DefaultConfig // config.Capacity = 2000 var DefaultConfig = Config{ - BaseUri: "https://app.launchdarkly.com", - StreamUri: "https://stream.launchdarkly.com", - EventsUri: "https://events.launchdarkly.com", - Capacity: 1000, - FlushInterval: 5 * time.Second, - PollInterval: MinimumPollInterval, - Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), - Timeout: 3000 * time.Millisecond, - Stream: true, - FeatureStore: nil, - UseLdd: false, - SendEvents: true, - Offline: false, - UserAgent: "", + BaseUri: "https://app.launchdarkly.com", + StreamUri: "https://stream.launchdarkly.com", + EventsUri: "https://events.launchdarkly.com", + Capacity: 1000, + FlushInterval: 5 * time.Second, + PollInterval: MinimumPollInterval, + Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), + Timeout: 3000 * time.Millisecond, + Stream: true, + FeatureStore: nil, + UseLdd: false, + SendEvents: true, + Offline: false, + UserKeysCapacity: 1000, + UserKeysFlushInterval: 5 * time.Minute, + UserAgent: "", } var ErrInitializationTimeout = errors.New("Timeout encountered waiting for LaunchDarkly client initialization") @@ -120,11 +135,17 @@ func MakeCustomClient(sdkKey string, config Config, waitFor time.Duration) (*LDC if config.Offline { config.Logger.Println("Started LaunchDarkly in offline mode") - client.config.SendEvents = false + client.eventProcessor = newNullEventProcessor() return &client, nil } - client.eventProcessor = newEventProcessor(sdkKey, config) + if config.EventProcessor != nil { + client.eventProcessor = config.EventProcessor + } else if config.SendEvents { + client.eventProcessor = NewDefaultEventProcessor(sdkKey, config, nil) + } else { + client.eventProcessor = newNullEventProcessor() + } if config.UseLdd { config.Logger.Println("Started LaunchDarkly in LDD mode") @@ -168,7 +189,8 @@ func (client *LDClient) Identify(user User) error { client.config.Logger.Printf("WARN: Identify called with empty/nil user key!") } evt := NewIdentifyEvent(user) - return client.eventProcessor.sendEvent(evt) + client.eventProcessor.SendEvent(evt) + return nil } // Tracks that a user has performed an event. Custom data can be attached to the @@ -181,7 +203,8 @@ func (client *LDClient) Track(key string, user User, data interface{}) error { client.config.Logger.Printf("WARN: Track called with empty/nil user key!") } evt := NewCustomEvent(key, user, data) - return client.eventProcessor.sendEvent(evt) + client.eventProcessor.SendEvent(evt) + return nil } // Returns whether the LaunchDarkly client is in offline mode. @@ -206,23 +229,21 @@ func (client *LDClient) Initialized() bool { // Shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client // should no longer be used. -func (client *LDClient) Close() { +func (client *LDClient) Close() error { client.config.Logger.Println("Closing LaunchDarkly Client") if client.IsOffline() { - return + return nil } - client.eventProcessor.close() + client.eventProcessor.Close() if !client.config.UseLdd { client.updateProcessor.Close() } + return nil } // Immediately flushes queued events. func (client *LDClient) Flush() { - if client.IsOffline() { - return - } - client.eventProcessor.flush() + client.eventProcessor.Flush() } // Returns a map from feature flag keys to values for @@ -259,7 +280,7 @@ func (client *LDClient) AllFlags(user User) map[string]interface{} { } for _, item := range items { if flag, ok := item.(*FeatureFlag); ok { - result, _ := client.evalFlag(*flag, user) + result, _, _ := client.evalFlag(*flag, user) results[flag.Key] = result } } @@ -267,7 +288,7 @@ func (client *LDClient) AllFlags(user User) map[string]interface{} { return results } -func (client *LDClient) evalFlag(flag FeatureFlag, user User) (interface{}, []FeatureRequestEvent) { +func (client *LDClient) evalFlag(flag FeatureFlag, user User) (interface{}, *int, []FeatureRequestEvent) { return flag.Evaluate(user, client.store) } @@ -280,12 +301,6 @@ func (client *LDClient) BoolVariation(key string, user User, defaultVal bool) (b return result, err } -// Deprecated: Use BoolVariation(). -func (client *LDClient) Toggle(key string, user User, defaultVal bool) (bool, error) { - client.config.Logger.Println("WARN: Deprecated Toggle() called on LDClient. Use BoolVariation() instead.") - return client.BoolVariation(key, user, defaultVal) -} - // Returns the value of a feature flag (whose variations are integers) for the given user. // Returns defaultVal if there is an error, if the flag doesn't exist, or the feature is turned off. func (client *LDClient) IntVariation(key string, user User, defaultVal int) (int, error) { @@ -316,18 +331,18 @@ func (client *LDClient) JsonVariation(key string, user User, defaultVal json.Raw if client.IsOffline() { return defaultVal, nil } - value, version, err := client.Evaluate(key, user, defaultVal) + value, index, flag, err := client.evaluateInternal(key, user, defaultVal) if err != nil { - client.sendFlagRequestEvent(key, user, defaultVal, defaultVal, version) + client.sendFlagRequestEvent(key, flag, user, index, defaultVal, defaultVal) return defaultVal, err } valueJsonRawMessage, err := ToJsonRawMessage(value) if err != nil { - client.sendFlagRequestEvent(key, user, defaultVal, defaultVal, version) + client.sendFlagRequestEvent(key, flag, user, index, defaultVal, defaultVal) return defaultVal, err } - client.sendFlagRequestEvent(key, user, valueJsonRawMessage, defaultVal, version) + client.sendFlagRequestEvent(key, flag, user, index, valueJsonRawMessage, defaultVal) return valueJsonRawMessage, nil } @@ -337,34 +352,36 @@ func (client *LDClient) variation(key string, user User, defaultVal interface{}, if client.IsOffline() { return defaultVal, nil } - value, version, err := client.Evaluate(key, user, defaultVal) + value, index, flag, err := client.evaluateInternal(key, user, defaultVal) if err != nil { - client.sendFlagRequestEvent(key, user, defaultVal, defaultVal, version) + client.sendFlagRequestEvent(key, flag, user, index, defaultVal, defaultVal) return defaultVal, err } valueType := reflect.TypeOf(value) if expectedType != valueType { - client.sendFlagRequestEvent(key, user, defaultVal, defaultVal, version) + client.sendFlagRequestEvent(key, flag, user, index, defaultVal, defaultVal) return defaultVal, fmt.Errorf("Feature flag returned value: %+v of incompatible type: %+v; Expected: %+v", value, valueType, expectedType) } - client.sendFlagRequestEvent(key, user, value, defaultVal, version) + client.sendFlagRequestEvent(key, flag, user, index, value, defaultVal) return value, nil } -func (client *LDClient) sendFlagRequestEvent(key string, user User, value, defaultVal interface{}, version *int) error { +func (client *LDClient) sendFlagRequestEvent(key string, flag *FeatureFlag, user User, variation *int, value, defaultVal interface{}) { if client.IsOffline() { - return nil + return } - evt := NewFeatureRequestEvent(key, user, value, defaultVal, version, nil) - return client.eventProcessor.sendEvent(evt) + evt := NewFeatureRequestEvent(key, flag, user, variation, value, defaultVal, nil) + client.eventProcessor.SendEvent(evt) } func (client *LDClient) Evaluate(key string, user User, defaultVal interface{}) (interface{}, *int, error) { - if user.Key == nil { - return defaultVal, nil, fmt.Errorf("User.Key cannot be nil for user: %+v when evaluating flag: %s", user, key) - } - if *user.Key == "" { + value, index, _, err := client.evaluateInternal(key, user, defaultVal) + return value, index, err +} + +func (client *LDClient) evaluateInternal(key string, user User, defaultVal interface{}) (interface{}, *int, *FeatureFlag, error) { + if user.Key != nil && *user.Key == "" { client.config.Logger.Printf("WARN: User.Key is blank when evaluating flag: %s. Flag evaluation will proceed, but the user will not be stored in LaunchDarkly.", key) } @@ -376,7 +393,7 @@ func (client *LDClient) Evaluate(key string, user User, defaultVal interface{}) if client.store.Initialized() { client.config.Logger.Printf("WARN: Feature flag evaluation called before LaunchDarkly client initialization completed; using last known values from feature store") } else { - return defaultVal, nil, ErrClientNotInitialized + return defaultVal, nil, nil, ErrClientNotInitialized } } @@ -384,29 +401,28 @@ func (client *LDClient) Evaluate(key string, user User, defaultVal interface{}) if storeErr != nil { client.config.Logger.Printf("Encountered error fetching feature from store: %+v", storeErr) - return defaultVal, nil, storeErr + return defaultVal, nil, nil, storeErr } if data != nil { feature, ok = data.(*FeatureFlag) if !ok { - return defaultVal, nil, fmt.Errorf("Unexpected data type (%T) found in store for feature key: %s. Returning default value.", data, key) + return defaultVal, nil, nil, fmt.Errorf("Unexpected data type (%T) found in store for feature key: %s. Returning default value.", data, key) } } else { - return defaultVal, nil, fmt.Errorf("Unknown feature key: %s Verify that this feature key exists. Returning default value.", key) + return defaultVal, nil, nil, fmt.Errorf("Unknown feature key: %s Verify that this feature key exists. Returning default value.", key) } - result, prereqEvents := client.evalFlag(*feature, user) - if !client.IsOffline() { - for _, event := range prereqEvents { - err := client.eventProcessor.sendEvent(event) - if err != nil { - client.config.Logger.Printf("WARN: Error sending feature request event to LaunchDarkly: %+v", err) - } - } + if user.Key == nil { + return defaultVal, nil, feature, fmt.Errorf("User.Key cannot be nil for user: %+v when evaluating flag: %s", user, key) + } + + result, index, prereqEvents := client.evalFlag(*feature, user) + for _, event := range prereqEvents { + client.eventProcessor.SendEvent(event) } if result != nil { - return result, &feature.Version, nil + return result, index, feature, nil } - return defaultVal, &feature.Version, nil + return defaultVal, index, feature, nil } diff --git a/ldclient_test.go b/ldclient_test.go index 2ecd28c..b7c4ca6 100644 --- a/ldclient_test.go +++ b/ldclient_test.go @@ -7,13 +7,29 @@ import ( "os" "testing" "time" + + "github.com/stretchr/testify/assert" ) -type TestUpdateProcessor struct{} +type testUpdateProcessor struct{} + +func (u testUpdateProcessor) Initialized() bool { return true } +func (u testUpdateProcessor) Close() error { return nil } +func (u testUpdateProcessor) Start(chan<- struct{}) {} -func (u TestUpdateProcessor) Initialized() bool { return true } -func (u TestUpdateProcessor) Close() {} -func (u TestUpdateProcessor) Start(chan<- struct{}) {} +type testEventProcessor struct { + events []Event +} + +func (t *testEventProcessor) SendEvent(e Event) { + t.events = append(t.events, e) +} + +func (t *testEventProcessor) Flush() {} + +func (t *testEventProcessor) Close() error { + return nil +} func TestOfflineModeAlwaysReturnsDefaultValue(t *testing.T) { config := Config{ @@ -27,125 +43,84 @@ func TestOfflineModeAlwaysReturnsDefaultValue(t *testing.T) { } client, _ := MakeCustomClient("api_key", config, 0) defer client.Close() - client.config.Offline = true - key := "foo" - user := User{Key: &key} - //Toggle - expected := true - actual, err := client.Toggle("featureKey", user, expected) - if err != nil { - t.Errorf("Unexpected error in Toggle: %+v", err) - } - if actual != expected { - t.Errorf("Offline mode should return default value, but doesn't") - } + user := NewUser("foo") + + //BoolVariation + actual, err := client.BoolVariation("featureKey", user, true) + assert.NoError(t, err) + assert.True(t, actual) //IntVariation expectedInt := 100 actualInt, err := client.IntVariation("featureKey", user, expectedInt) - if err != nil { - t.Errorf("Unexpected error in IntVariation: %+v", err) - } - if actualInt != expectedInt { - t.Errorf("Offline mode should return default value: %+v, instead returned: %+v", expectedInt, actualInt) - } + assert.NoError(t, err) + assert.Equal(t, expectedInt, actualInt) //Float64Variation expectedFloat64 := 100.0 actualFloat64, err := client.Float64Variation("featureKey", user, expectedFloat64) - if err != nil { - t.Errorf("Unexpected error in Float64Variation: %+v", err) - } - if actualFloat64 != expectedFloat64 { - t.Errorf("Offline mode should return default value, but doesn't") - } + assert.NoError(t, err) + assert.Equal(t, expectedFloat64, actualFloat64) //StringVariation expectedString := "expected" actualString, err := client.StringVariation("featureKey", user, expectedString) - if err != nil { - t.Errorf("Unexpected error in StringVariation: %+v", err) - } - if actualString != expectedString { - t.Errorf("Offline mode should return default value, but doesn't") - } + assert.NoError(t, err) + assert.Equal(t, expectedString, actualString) //JsonVariation expectedJsonString := `{"fieldName":"fieldValue"}` expectedJson := json.RawMessage([]byte(expectedJsonString)) actualJson, err := client.JsonVariation("featureKey", user, expectedJson) - if err != nil { - t.Errorf("Unexpected error in JsonVariation: %+v", err) - } - if string([]byte(actualJson)) != string([]byte(expectedJson)) { - t.Errorf("Offline mode should return default value (%+v), instead got: %+v", expectedJson, actualJson) - } + assert.NoError(t, err) + assert.Equal(t, string([]byte(expectedJson)), string([]byte(actualJson))) client.Close() } -func TestToggle(t *testing.T) { +func TestBoolVariation(t *testing.T) { expected := true + variations := []interface{}{false, true} - variations := make([]interface{}, 2) - variations[0] = false - variations[1] = expected - - client := makeClientWithFeatureFlag(variations) + client := makeTestClient() defer client.Close() + client.store.Upsert(Features, featureFlagWithVariations("validFeatureKey", variations)) - userKey := "userKey" - actual, err := client.Toggle("validFeatureKey", User{Key: &userKey}, false) + actual, err := client.BoolVariation("validFeatureKey", NewUser("userKey"), false) - if err != nil { - t.Errorf("Unexpected error when calling Toggle: %+v", err) - } - if actual != expected { - t.Errorf("Got unexpected result when calling Toggle: %+v but expected: %+v", actual, expected) - } + assert.NoError(t, err) + assert.Equal(t, expected, actual) } func TestIntVariation(t *testing.T) { expected := float64(100) - variations := make([]interface{}, 2) - variations[0] = float64(-1) - variations[1] = expected + variations := []interface{}{float64(-1), expected} - client := makeClientWithFeatureFlag(variations) + client := makeTestClient() defer client.Close() + client.store.Upsert(Features, featureFlagWithVariations("validFeatureKey", variations)) - userKey := "userKey" - actual, err := client.IntVariation("validFeatureKey", User{Key: &userKey}, 10000) + actual, err := client.IntVariation("validFeatureKey", NewUser("userKey"), 10000) - if err != nil { - t.Errorf("Unexpected error when calling IntVariation: %+v", err) - } - if actual != int(expected) { - t.Errorf("Got unexpected result when calling IntVariation: %+v but expected: %+v", actual, expected) - } + assert.NoError(t, err) + assert.Equal(t, int(expected), actual) } func TestFloat64Variation(t *testing.T) { expected := 100.01 - variations := make([]interface{}, 2) - variations[0] = -1.0 - variations[1] = expected + variations := []interface{}{-1.0, expected} - client := makeClientWithFeatureFlag(variations) + client := makeTestClient() defer client.Close() + client.store.Upsert(Features, featureFlagWithVariations("validFeatureKey", variations)) - userKey := "userKey" - actual, err := client.Float64Variation("validFeatureKey", User{Key: &userKey}, 0.0) + actual, err := client.Float64Variation("validFeatureKey", NewUser("userKey"), 0.0) - if err != nil { - t.Errorf("Unexpected error when calling Float64Variation: %+v", err) - } - if actual != expected { - t.Errorf("Got unexpected result when calling Float64Variation: %+v but expected: %+v", actual, expected) - } + assert.NoError(t, err) + assert.Equal(t, expected, actual) } func TestJsonVariation(t *testing.T) { @@ -154,19 +129,15 @@ func TestJsonVariation(t *testing.T) { var variations []interface{} json.Unmarshal([]byte(fmt.Sprintf(`[{"jsonFieldName1" : "jsonFieldValue"},%s]`, expectedJsonString)), &variations) - client := makeClientWithFeatureFlag(variations) + client := makeTestClient() defer client.Close() + client.store.Upsert(Features, featureFlagWithVariations("validFeatureKey", variations)) - userKey := "userKey" var actual json.RawMessage - actual, err := client.JsonVariation("validFeatureKey", User{Key: &userKey}, []byte(`{"default":"default"}`)) + actual, err := client.JsonVariation("validFeatureKey", NewUser("userKey"), []byte(`{"default":"default"}`)) - if err != nil { - t.Errorf("Unexpected error when calling JsonVariation: %+v", err) - } - if string(actual) != expectedJsonString { - t.Errorf("Got unexpected result when calling JsonVariation: %+v but expected: %+v", string(actual), expectedJsonString) - } + assert.NoError(t, err) + assert.Equal(t, expectedJsonString, string(actual)) } func TestSecureModeHash(t *testing.T) { @@ -179,44 +150,199 @@ func TestSecureModeHash(t *testing.T) { hash := client.SecureModeHash(User{Key: &key}) - if hash != expected { - t.Errorf("Got unexpected result when calling SecureModeHash: %s but expected %s", hash, expected) + assert.Equal(t, expected, hash) +} + +func TestEvaluatingExistingFlagSendsEvent(t *testing.T) { + flag := featureFlagWithVariations("flagKey", []interface{}{"a", "b"}) + client := makeTestClient() + defer client.Close() + client.store.Upsert(Features, flag) + + user := NewUser("userKey") + _, err := client.StringVariation(flag.Key, user, "x") + assert.NoError(t, err) + + events := client.eventProcessor.(*testEventProcessor).events + + assert.Equal(t, 1, len(events)) + e := events[0].(FeatureRequestEvent) + expectedEvent := FeatureRequestEvent{ + BaseEvent: BaseEvent{ + CreationDate: e.CreationDate, + User: user, + }, + Key: flag.Key, + Version: &flag.Version, + Value: "b", + Variation: intPtr(1), + Default: "x", + PrereqOf: nil, } + assert.Equal(t, expectedEvent, e) +} + +func TestEvaluatingUnknownFlagSendsEvent(t *testing.T) { + client := makeTestClient() + defer client.Close() + + user := NewUser("userKey") + _, err := client.StringVariation("flagKey", user, "x") + assert.Error(t, err) + + events := client.eventProcessor.(*testEventProcessor).events + assert.Equal(t, 1, len(events)) + + e := events[0].(FeatureRequestEvent) + expectedEvent := FeatureRequestEvent{ + BaseEvent: BaseEvent{ + CreationDate: e.CreationDate, + User: user, + }, + Key: "flagKey", + Version: nil, + Value: "x", + Variation: nil, + Default: "x", + PrereqOf: nil, + } + assert.Equal(t, expectedEvent, e) +} + +func TestEvaluatingFlagWithNilUserKeySendsEvent(t *testing.T) { + flag := featureFlagWithVariations("flagKey", []interface{}{"a", "b"}) + client := makeTestClient() + defer client.Close() + client.store.Upsert(Features, flag) + + user := User{Name: strPtr("Bob")} + _, err := client.StringVariation(flag.Key, user, "x") + assert.Error(t, err) + + events := client.eventProcessor.(*testEventProcessor).events + + assert.Equal(t, 1, len(events)) + e := events[0].(FeatureRequestEvent) + expectedEvent := FeatureRequestEvent{ + BaseEvent: BaseEvent{ + CreationDate: e.CreationDate, + User: user, + }, + Key: flag.Key, + Version: &flag.Version, + Value: "x", + Variation: nil, + Default: "x", + PrereqOf: nil, + } + assert.Equal(t, expectedEvent, e) +} + +func TestEvaluatingFlagWithPrerequisiteSendsPrerequisiteEvent(t *testing.T) { + client := makeTestClient() + defer client.Close() + + flag0 := featureFlagWithVariations("flag0", []interface{}{"a", "b"}) + flag0.Prerequisites = []Prerequisite{ + Prerequisite{Key: "flag1", Variation: 1}, + } + flag1 := featureFlagWithVariations("flag1", []interface{}{"c", "d"}) + client.store.Upsert(Features, flag0) + client.store.Upsert(Features, flag1) + + user := NewUser("userKey") + _, err := client.StringVariation(flag0.Key, user, "x") + assert.NoError(t, err) + + events := client.eventProcessor.(*testEventProcessor).events + assert.Equal(t, 2, len(events)) + + e0 := events[0].(FeatureRequestEvent) + expected0 := FeatureRequestEvent{ + BaseEvent: BaseEvent{ + CreationDate: e0.CreationDate, + User: user, + }, + Key: flag1.Key, + Version: &flag1.Version, + Value: "d", + Variation: intPtr(1), + Default: nil, + PrereqOf: &flag0.Key, + } + assert.Equal(t, expected0, e0) + + e1 := events[1].(FeatureRequestEvent) + expected1 := FeatureRequestEvent{ + BaseEvent: BaseEvent{ + CreationDate: e1.CreationDate, + User: user, + }, + Key: flag0.Key, + Version: &flag0.Version, + Value: "b", + Variation: intPtr(1), + Default: "x", + PrereqOf: nil, + } + assert.Equal(t, expected1, e1) +} + +func TestIdentifySendsIdentifyEvent(t *testing.T) { + client := makeTestClient() + defer client.Close() + + user := NewUser("userKey") + err := client.Identify(user) + assert.NoError(t, err) + + events := client.eventProcessor.(*testEventProcessor).events + assert.Equal(t, 1, len(events)) + e := events[0].(IdentifyEvent) + assert.Equal(t, user, e.User) +} + +func TestTrackSendsCustomEvent(t *testing.T) { + client := makeTestClient() + defer client.Close() + + user := NewUser("userKey") + key := "eventKey" + data := map[string]interface{}{"thing": "stuff"} + err := client.Track(key, user, data) + assert.NoError(t, err) + + events := client.eventProcessor.(*testEventProcessor).events + assert.Equal(t, 1, len(events)) + e := events[0].(CustomEvent) + assert.Equal(t, user, e.User) + assert.Equal(t, key, e.Key) + assert.Equal(t, data, e.Data) } // Creates LdClient loaded with one feature flag with key: "validFeatureKey". // Variations param should have at least 2 items with variations[1] being the expected // fallthrough value when passing in a valid user -func makeClientWithFeatureFlag(variations []interface{}) *LDClient { +func makeTestClient() *LDClient { config := Config{ - BaseUri: "https://localhost:3000", - Capacity: 1000, - FlushInterval: 5 * time.Second, - Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), - Timeout: 1500 * time.Millisecond, - Stream: true, - Offline: false, - SendEvents: false, - } - - client := LDClient{ - sdkKey: "sdkKey", - config: config, - eventProcessor: newEventProcessor("sdkKey", config), - updateProcessor: TestUpdateProcessor{}, - store: NewInMemoryFeatureStore(nil), + Logger: log.New(os.Stderr, "[LaunchDarkly]", log.LstdFlags), + Offline: false, + SendEvents: true, + FeatureStore: NewInMemoryFeatureStore(nil), + UpdateProcessor: testUpdateProcessor{}, + EventProcessor: &testEventProcessor{}, + UserKeysFlushInterval: 30 * time.Second, } - featureFlag := featureFlagWithVariations(variations) - client.store.Upsert(Features, &featureFlag) - return &client + client, _ := MakeCustomClient("sdkKey", config, 0*time.Second) + return client } -func featureFlagWithVariations(variations []interface{}) FeatureFlag { +func featureFlagWithVariations(key string, variations []interface{}) *FeatureFlag { fallThroughVariation := 1 - return FeatureFlag{ - Key: "validFeatureKey", + return &FeatureFlag{ + Key: key, Version: 1, On: true, Fallthrough: VariationOrRollout{Variation: &fallThroughVariation}, diff --git a/lru_cache.go b/lru_cache.go new file mode 100644 index 0000000..93d0a3e --- /dev/null +++ b/lru_cache.go @@ -0,0 +1,44 @@ +package ldclient + +import ( + "container/list" +) + +type lruCache struct { + values map[interface{}]*list.Element + lruList *list.List + capacity int +} + +func newLruCache(capacity int) lruCache { + return lruCache{ + values: make(map[interface{}]*list.Element), + lruList: list.New(), + capacity: capacity, + } +} + +func (c *lruCache) clear() { + c.values = make(map[interface{}]*list.Element) + c.lruList.Init() +} + +// Stores a value in the cache, returning true (and marking it as recently used) if it was +// already there, or false if it was newly added. +func (c *lruCache) add(value interface{}) bool { + if c.capacity == 0 { + return false + } + if e, ok := c.values[value]; ok { + c.lruList.MoveToFront(e) + return true + } + for len(c.values) >= c.capacity { + oldest := c.lruList.Back() + delete(c.values, oldest.Value) + c.lruList.Remove(oldest) + } + e := c.lruList.PushFront(value) + c.values[value] = e + return false +} diff --git a/lru_cache_test.go b/lru_cache_test.go new file mode 100644 index 0000000..621dccb --- /dev/null +++ b/lru_cache_test.go @@ -0,0 +1,47 @@ +package ldclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLRUCache(t *testing.T) { + t.Run("add returns false for never-seen value", func(t *testing.T) { + cache := newLruCache(10) + assert.False(t, cache.add("a")) + }) + + t.Run("add returns true for already-seen value", func(t *testing.T) { + cache := newLruCache(10) + cache.add("a") + assert.True(t, cache.add("a")) + }) + + t.Run("oldest value is discarded when capacity is exceeded", func(t *testing.T) { + cache := newLruCache(2) + cache.add("a") + cache.add("b") + cache.add("c") + assert.True(t, cache.add("c")) + assert.True(t, cache.add("b")) + assert.False(t, cache.add("a")) + }) + + t.Run("re-adding an existing value makes it new again", func(t *testing.T) { + cache := newLruCache(2) + cache.add("a") + cache.add("b") + cache.add("a") + cache.add("c") + assert.True(t, cache.add("c")) + assert.True(t, cache.add("a")) + assert.False(t, cache.add("b")) + }) + + t.Run("zero-length cache treats values as new", func(t *testing.T) { + cache := newLruCache(0) + assert.False(t, cache.add("a")) + assert.False(t, cache.add("a")) + }) +} diff --git a/polling.go b/polling.go index 89f16a9..86e5f30 100644 --- a/polling.go +++ b/polling.go @@ -75,11 +75,12 @@ func (pp *pollingProcessor) poll() error { return nil } -func (pp *pollingProcessor) Close() { +func (pp *pollingProcessor) Close() error { pp.closeOnce.Do(func() { pp.config.Logger.Printf("Closing Polling Processor") close(pp.quit) }) + return nil } func (pp *pollingProcessor) Initialized() bool { diff --git a/streaming.go b/streaming.go index 01bd3af..4f0c049 100644 --- a/streaming.go +++ b/streaming.go @@ -228,11 +228,12 @@ func (sp *streamProcessor) checkUnauthorized(err error) bool { return false } -func (sp *streamProcessor) Close() { +func (sp *streamProcessor) Close() error { sp.closeOnce.Do(func() { sp.config.Logger.Printf("Closing event stream.") // TODO: enable this when we trust stream.Close() never to panic (see https://github.com/donovanhide/eventsource/pull/33) // sp.stream.Close() close(sp.halt) }) + return nil } diff --git a/user_filter.go b/user_filter.go new file mode 100644 index 0000000..ab4a79b --- /dev/null +++ b/user_filter.go @@ -0,0 +1,92 @@ +package ldclient + +type userFilter struct { + allAttributesPrivate bool + globalPrivateAttributes []string +} + +func newUserFilter(config Config) userFilter { + return userFilter{ + allAttributesPrivate: config.AllAttributesPrivate, + globalPrivateAttributes: config.PrivateAttributeNames, + } +} + +func (uf userFilter) scrubUser(user User) *User { + if len(user.PrivateAttributeNames) == 0 && len(uf.globalPrivateAttributes) == 0 && !uf.allAttributesPrivate { + return &user + } + + isPrivate := map[string]bool{} + for _, n := range uf.globalPrivateAttributes { + isPrivate[n] = true + } + for _, n := range user.PrivateAttributeNames { + isPrivate[n] = true + } + user.PrivateAttributeNames = nil // this property is not used in the output schema for events + user.PrivateAttributes = nil // see below + // Because we're only resetting these properties if we're going to proceed with the scrubbing logic, it is + // possible to pass an already-scrubbed user to this function (with, potentially, some attribute names in + // PrivateAttributes) and get back the same object, as long as the configuration does not have private + // attributes enabled. This allows us to reuse the event processor code in ld-relay, where we may have to + // reprocess events that have already been through the scrubbing process. + + if user.Custom != nil { + var custom = map[string]interface{}{} + for k, v := range *user.Custom { + if uf.allAttributesPrivate || isPrivate[k] { + user.PrivateAttributes = append(user.PrivateAttributes, k) + } else { + custom[k] = v + } + } + user.Custom = &custom + } + + if !isEmpty(user.Avatar) && (uf.allAttributesPrivate || isPrivate["avatar"]) { + user.Avatar = nil + user.PrivateAttributes = append(user.PrivateAttributes, "avatar") + } + + if !isEmpty(user.Country) && (uf.allAttributesPrivate || isPrivate["country"]) { + user.Country = nil + user.PrivateAttributes = append(user.PrivateAttributes, "country") + } + + if !isEmpty(user.Ip) && (uf.allAttributesPrivate || isPrivate["ip"]) { + user.Ip = nil + user.PrivateAttributes = append(user.PrivateAttributes, "ip") + } + + if !isEmpty(user.FirstName) && (uf.allAttributesPrivate || isPrivate["firstName"]) { + user.FirstName = nil + user.PrivateAttributes = append(user.PrivateAttributes, "firstName") + } + + if !isEmpty(user.LastName) && (uf.allAttributesPrivate || isPrivate["lastName"]) { + user.LastName = nil + user.PrivateAttributes = append(user.PrivateAttributes, "lastName") + } + + if !isEmpty(user.Name) && (uf.allAttributesPrivate || isPrivate["name"]) { + user.Name = nil + user.PrivateAttributes = append(user.PrivateAttributes, "name") + } + + if !isEmpty(user.Secondary) && (uf.allAttributesPrivate || isPrivate["secondary"]) { + user.Secondary = nil + user.PrivateAttributes = append(user.PrivateAttributes, "secondary") + } + + if !isEmpty(user.Email) && (uf.allAttributesPrivate || isPrivate["email"]) { + user.Email = nil + user.PrivateAttributes = append(user.PrivateAttributes, "email") + } + + return &user +} + +func isEmpty(s *string) bool { + return s == nil || *s == "" +} diff --git a/events_test.go b/user_filter_test.go similarity index 85% rename from events_test.go rename to user_filter_test.go index b745d23..288319e 100644 --- a/events_test.go +++ b/user_filter_test.go @@ -7,23 +7,9 @@ import ( "github.com/stretchr/testify/assert" ) -var BuiltinAttributes = []string{ - "avatar", - "country", - "email", - "firstName", - "ip", - "lastName", - "name", - "secondary", -} - -func init() { - sort.Strings(BuiltinAttributes) -} - func TestScrubUser(t *testing.T) { t.Run("private built-in attributes per user", func(t *testing.T) { + filter := newUserFilter(DefaultConfig) user := User{ Key: strPtr("user-key"), FirstName: strPtr("sam"), @@ -38,7 +24,7 @@ func TestScrubUser(t *testing.T) { for _, attr := range BuiltinAttributes { user.PrivateAttributeNames = []string{attr} - scrubbedUser := scrubUser(user, false, nil) + scrubbedUser := *filter.scrubUser(user) assert.Equal(t, []string{attr}, scrubbedUser.PrivateAttributes) scrubbedUser.PrivateAttributes = nil assert.NotEqual(t, user, scrubbedUser) @@ -59,7 +45,8 @@ func TestScrubUser(t *testing.T) { } for _, attr := range BuiltinAttributes { - scrubbedUser := scrubUser(user, false, []string{attr}) + filter := newUserFilter(Config{PrivateAttributeNames: []string{attr}}) + scrubbedUser := *filter.scrubUser(user) assert.Equal(t, []string{attr}, scrubbedUser.PrivateAttributes) scrubbedUser.PrivateAttributes = nil assert.NotEqual(t, user, scrubbedUser) @@ -67,6 +54,7 @@ func TestScrubUser(t *testing.T) { }) t.Run("private custom attribute", func(t *testing.T) { + filter := newUserFilter(DefaultConfig) userKey := "userKey" user := User{ Key: &userKey, @@ -75,13 +63,14 @@ func TestScrubUser(t *testing.T) { "my-secret-attr": "my secret value", }} - scrubbedUser := scrubUser(user, false, nil) + scrubbedUser := *filter.scrubUser(user) assert.Equal(t, []string{"my-secret-attr"}, scrubbedUser.PrivateAttributes) assert.NotContains(t, *scrubbedUser.Custom, "my-secret-attr") }) t.Run("all attributes private", func(t *testing.T) { + filter := newUserFilter(Config{AllAttributesPrivate: true}) userKey := "userKey" user := User{ Key: &userKey, @@ -97,7 +86,7 @@ func TestScrubUser(t *testing.T) { "my-secret-attr": "my secret value", }} - scrubbedUser := scrubUser(user, true, nil) + scrubbedUser := *filter.scrubUser(user) sort.Strings(scrubbedUser.PrivateAttributes) expectedAttributes := append(BuiltinAttributes, "my-secret-attr") sort.Strings(expectedAttributes) @@ -110,13 +99,14 @@ func TestScrubUser(t *testing.T) { }) t.Run("anonymous attribute can't be private", func(t *testing.T) { + filter := newUserFilter(Config{AllAttributesPrivate: true}) userKey := "userKey" anon := true user := User{ Key: &userKey, Anonymous: &anon} - scrubbedUser := scrubUser(user, true, nil) + scrubbedUser := *filter.scrubUser(user) assert.Equal(t, scrubbedUser, user) }) }