From 692e58fe15c7ef3ac97c46b1e4a6a5542ddd8cfc Mon Sep 17 00:00:00 2001 From: Eli Bishop <35503443+eli-darkly@users.noreply.github.com> Date: Tue, 26 Jun 2018 15:08:09 -0700 Subject: [PATCH 1/3] prepare 4.2.0 release (#125) --- event_processor.go | 7 ++- event_processor_test.go | 64 ++++++++++++++++------- polling.go | 4 +- streaming.go | 15 +++--- streaming_test.go | 109 +++++++++++++++++++++++++++++++--------- util.go | 28 +++++++++++ 6 files changed, 172 insertions(+), 55 deletions(-) diff --git a/event_processor.go b/event_processor.go index 30c5f9f..35df954 100644 --- a/event_processor.go +++ b/event_processor.go @@ -306,9 +306,8 @@ func (ed *eventDispatcher) isDisabled() bool { func (ed *eventDispatcher) handleResponse(resp *http.Response) { err := checkStatusCode(resp.StatusCode, resp.Request.URL.String()) if err != nil { - ed.config.Logger.Printf("Unexpected status code when sending events: %+v", err) - if err != nil && err.Code == 401 { - ed.config.Logger.Printf("Received 401 error, no further events will be posted since SDK key is invalid") + ed.config.Logger.Println(httpErrorMessage(err.Code, "posting events", "some events were dropped")) + if err != nil && !isHTTPErrorRecoverable(err.Code) { ed.stateLock.Lock() defer ed.stateLock.Unlock() ed.disabled = true @@ -422,7 +421,7 @@ func (t *sendEventsTask) postEvents(outputEvents []interface{}) *http.Response { if respErr != nil { t.logger.Printf("Unexpected error while sending events: %+v", respErr) continue - } else if resp.StatusCode >= 500 { + } else if resp.StatusCode >= 400 && isHTTPErrorRecoverable(resp.StatusCode) { t.logger.Printf("Received error status %d when sending events", resp.StatusCode) continue } else { diff --git a/event_processor_test.go b/event_processor_test.go index e436dc9..3eeffcc 100644 --- a/event_processor_test.go +++ b/event_processor_test.go @@ -2,6 +2,7 @@ package ldclient import ( "encoding/json" + "fmt" "io/ioutil" "log" "net/http" @@ -502,23 +503,52 @@ func TestUserAgentIsSent(t *testing.T) { assert.Equal(t, config.UserAgent, msg.Header.Get("User-Agent")) } -func TestFlushIsRetriedOnceAfter5xxError(t *testing.T) { - ep, st := createEventProcessor(epDefaultConfig) - defer ep.Close() - - st.statusCode = 503 - - ie := NewIdentifyEvent(epDefaultUser) - ep.SendEvent(ie) - ep.Flush() - ep.waitUntilInactive() - - msg := st.getNextRequest() - assert.NotNil(t, msg) - msg = st.getNextRequest() - assert.NotNil(t, msg) - msg = st.getNextRequest() - assert.Nil(t, msg) +var httpErrorTests = []struct { + status int + recoverable bool +}{ + {401, false}, + {403, false}, + {408, true}, + {429, true}, + {500, true}, + {503, true}, +} + +func TestHTTPErrorHandling(t *testing.T) { + for _, tt := range httpErrorTests { + t.Run(fmt.Sprintf("%d error, recoverable: %v", tt.status, tt.recoverable), func(t *testing.T) { + ep, st := createEventProcessor(epDefaultConfig) + defer ep.Close() + + st.statusCode = tt.status + + ie := NewIdentifyEvent(epDefaultUser) + ep.SendEvent(ie) + ep.Flush() + ep.waitUntilInactive() + + msg := st.getNextRequest() + assert.NotNil(t, msg) + + if tt.recoverable { + msg = st.getNextRequest() // 2nd request is a retry of the 1st + assert.NotNil(t, msg) + msg = st.getNextRequest() + assert.Nil(t, msg) + } else { + msg = st.getNextRequest() + assert.Nil(t, msg) + + ep.SendEvent(ie) + ep.Flush() + ep.waitUntilInactive() + + msg = st.getNextRequest() + assert.Nil(t, msg) + } + }) + } } func jsonMap(o interface{}) map[string]interface{} { diff --git a/polling.go b/polling.go index d2dcbc0..990d525 100644 --- a/polling.go +++ b/polling.go @@ -51,8 +51,8 @@ func (pp *pollingProcessor) Start(closeWhenReady chan<- struct{}) { if err := pp.poll(); err != nil { pp.config.Logger.Printf("ERROR: Error when requesting feature updates: %+v", err) if hse, ok := err.(*HttpStatusError); ok { - if hse.Code == 401 { - pp.config.Logger.Printf("ERROR: Received 401 error, no further polling requests will be made since SDK key is invalid") + pp.config.Logger.Printf("ERROR: %s", httpErrorMessage(hse.Code, "polling request", "will retry")) + if !isHTTPErrorRecoverable(hse.Code) { notifyReady() return } diff --git a/streaming.go b/streaming.go index c57ce33..84ec0ec 100644 --- a/streaming.go +++ b/streaming.go @@ -179,7 +179,7 @@ func (sp *streamProcessor) events(closeWhenReady chan<- struct{}) { } if err != io.EOF { sp.config.Logger.Printf("ERROR: Error encountered processing stream: %+v", err) - if sp.checkUnauthorized(err) { + if sp.checkIfPermanentFailure(err) { sp.closeOnce.Do(func() { sp.config.Logger.Printf("Closing event stream.") // TODO: enable this when we trust stream.Close() never to panic (see https://github.com/donovanhide/eventsource/pull/33) @@ -215,8 +215,7 @@ func (sp *streamProcessor) subscribe(closeWhenReady chan<- struct{}) { sp.config.Logger.Printf("Connecting to LaunchDarkly stream using URL: %s", req.URL.String()) if stream, err := es.SubscribeWithRequest("", req); err != nil { - sp.config.Logger.Printf("ERROR: Error subscribing to stream: %+v using URL: %s", err, req.URL.String()) - if sp.checkUnauthorized(err) { + if sp.checkIfPermanentFailure(err) { close(closeWhenReady) return } @@ -239,10 +238,10 @@ func (sp *streamProcessor) subscribe(closeWhenReady chan<- struct{}) { } } -func (sp *streamProcessor) checkUnauthorized(err error) bool { +func (sp *streamProcessor) checkIfPermanentFailure(err error) bool { if se, ok := err.(es.SubscriptionError); ok { - if se.Code == 401 { - sp.config.Logger.Printf("ERROR: Received 401 error, no further streaming connection will be made since SDK key is invalid") + sp.config.Logger.Printf("ERROR: %s", httpErrorMessage(se.Code, "streaming connection", "will retry")) + if !isHTTPErrorRecoverable(se.Code) { return true } } @@ -253,7 +252,9 @@ func (sp *streamProcessor) checkUnauthorized(err error) bool { func (sp *streamProcessor) Close() error { sp.closeOnce.Do(func() { sp.config.Logger.Printf("Closing event stream.") - sp.stream.Close() + if sp.stream != nil { + sp.stream.Close() + } close(sp.halt) }) return nil diff --git a/streaming_test.go b/streaming_test.go index 620dc5f..a484e58 100644 --- a/streaming_test.go +++ b/streaming_test.go @@ -12,30 +12,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestStreamProcessor_401ShouldNotBlock(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusUnauthorized) - })) - defer ts.Close() - - cfg := Config{ - StreamUri: ts.URL, - Logger: log.New(ioutil.Discard, "", 0), - } - - sp := newStreamProcessor("sdkKey", cfg, nil) - - closeWhenReady := make(chan struct{}) - - sp.Start(closeWhenReady) - - select { - case <-closeWhenReady: - case <-time.After(time.Second): - assert.Fail(t, "Receiving 401 shouldn't block") - } -} - type testEvent struct { id, event, data string } @@ -88,6 +64,7 @@ func runStreamingTest(t *testing.T, initialEvent eventsource.Event, test func(ev requestor := newRequestor("sdkKey", cfg) sp := newStreamProcessor("sdkKey", cfg, requestor) + defer sp.Close() closeWhenReady := make(chan struct{}) @@ -108,7 +85,7 @@ func TestStreamProcessor(t *testing.T) { initialPutEvent := &testEvent{ event: putEvent, data: `{"path": "/", "data": { -"flags": {"my-flag": {"key": "my-flag", "version": 2}}, +"flags": {"my-flag": {"key": "my-flag", "version": 2}}, "segments": {"my-segment": {"key": "my-segment", "version": 5}} }}`, } @@ -207,3 +184,85 @@ func waitForDelete(t *testing.T, store FeatureStore, kind VersionedDataKind, key assert.NoError(t, err) assert.Nil(t, item) } + +func TestStreamProcessorFailsImmediatelyOn401(t *testing.T) { + testStreamProcessorUnrecoverableError(t, 401) +} + +func TestStreamProcessorFailsImmediatelyOn403(t *testing.T) { + testStreamProcessorUnrecoverableError(t, 403) +} + +func TestStreamProcessorDoesNotFailImmediatelyOn500(t *testing.T) { + testStreamProcessorRecoverableError(t, 500) +} + +func testStreamProcessorUnrecoverableError(t *testing.T, statusCode int) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(statusCode) + })) + defer ts.Close() + + cfg := Config{ + StreamUri: ts.URL, + FeatureStore: NewInMemoryFeatureStore(log.New(ioutil.Discard, "", 0)), + Logger: log.New(ioutil.Discard, "", 0), + } + + sp := newStreamProcessor("sdkKey", cfg, nil) + defer sp.Close() + + closeWhenReady := make(chan struct{}) + + sp.Start(closeWhenReady) + + select { + case <-closeWhenReady: + assert.False(t, sp.Initialized()) + case <-time.After(time.Second * 3): + assert.Fail(t, "Initialization shouldn't block after this error") + } +} + +func testStreamProcessorRecoverableError(t *testing.T, statusCode int) { + initialPutEvent := &testEvent{ + event: putEvent, + data: `{"path": "/", "data": { +"flags": {"my-flag": {"key": "my-flag", "version": 2}}, +"segments": {"my-segment": {"key": "my-segment", "version": 5}} +}}`, + } + esserver := eventsource.NewServer() + esserver.ReplayAll = true + esserver.Register("test", &testRepo{initialEvent: initialPutEvent}) + + attempt := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if attempt == 0 { + w.WriteHeader(statusCode) + } else { + esserver.Handler("test").ServeHTTP(w, r) + } + attempt++ + })) + defer ts.Close() + + cfg := Config{ + StreamUri: ts.URL, + FeatureStore: NewInMemoryFeatureStore(log.New(ioutil.Discard, "", 0)), + Logger: log.New(ioutil.Discard, "", 0), + } + + sp := newStreamProcessor("sdkKey", cfg, nil) + defer sp.Close() + + closeWhenReady := make(chan struct{}) + sp.Start(closeWhenReady) + + select { + case <-closeWhenReady: + assert.True(t, sp.Initialized()) + case <-time.After(time.Second * 3): + assert.Fail(t, "Should have successfully retried before now") + } +} diff --git a/util.go b/util.go index 095b7f8..71a4ac9 100644 --- a/util.go +++ b/util.go @@ -139,3 +139,31 @@ func MakeAllVersionedDataMap( } return allData } + +// Tests whether an HTTP error status represents a condition that might resolve on its own if we retry. +func isHTTPErrorRecoverable(statusCode int) bool { + if statusCode >= 400 && statusCode < 500 { + switch statusCode { + case 408: // request timeout + return true + case 429: // too many requests + return true + default: + return false // all other 4xx errors are unrecoverable + } + } + return true +} + +func httpErrorMessage(statusCode int, context string, recoverableMessage string) string { + statusDesc := "" + if statusCode == 401 { + statusDesc = " (invalid SDK key)" + } + resultMessage := recoverableMessage + if !isHTTPErrorRecoverable(statusCode) { + resultMessage = "giving up permanently" + } + return fmt.Sprintf("Received HTTP error %d%s for %s - %s", + statusCode, statusDesc, context, resultMessage) +} From 80095e6916cdbd59f0d68ae69270125627769303 Mon Sep 17 00:00:00 2001 From: LaunchDarklyCI Date: Tue, 26 Jun 2018 22:08:39 +0000 Subject: [PATCH 2/3] Update Changelog for release of version 4.2.0 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f2ea23..d53611d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to the LaunchDarkly Go SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [4.2.0] - 2018-06-26 +### Changed: +- The client now treats most HTTP 4xx errors as unrecoverable: that is, after receiving such an error, it will not make any more HTTP requests for the lifetime of the client instance, in effect taking the client offline. This is because such errors indicate either a configuration problem (invalid SDK key) or a bug, which is not likely to resolve without a restart or an upgrade. This does not apply if the error is 400, 408, 429, or any 5xx error. + ## [4.1.0] - 2018-06-14 ### Changed From fdb2cf3b46fc1ad7bbba0b21c23953b8fdced0f9 Mon Sep 17 00:00:00 2001 From: LaunchDarklyCI Date: Tue, 26 Jun 2018 22:08:39 +0000 Subject: [PATCH 3/3] Preparing for release of version 4.2.0 --- ldclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ldclient.go b/ldclient.go index 91fc243..60db324 100644 --- a/ldclient.go +++ b/ldclient.go @@ -15,7 +15,7 @@ import ( ) // Version is the client version -const Version = "4.1.0" +const Version = "4.2.0" // LDClient is the LaunchDarkly client. Client instances are thread-safe. // Applications should instantiate a single instance for the lifetime