Skip to content
This repository has been archived by the owner on Oct 11, 2019. It is now read-only.

Commit

Permalink
Merge pull request #89 from launchdarkly/eb/ch18901/4xx-errors
Browse files Browse the repository at this point in the history
treat most 4xx errors as unrecoverable
  • Loading branch information
eli-darkly authored Jun 26, 2018
2 parents 4edfb40 + 0b7f81b commit aaee8b0
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 55 deletions.
7 changes: 3 additions & 4 deletions event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,8 @@ func (ed *eventDispatcher) isDisabled() bool {
func (ed *eventDispatcher) handleResponse(resp *http.Response) {
err := checkStatusCode(resp.StatusCode, resp.Request.URL.String())
if err != nil {
ed.config.Logger.Printf("Unexpected status code when sending events: %+v", err)
if err != nil && err.Code == 401 {
ed.config.Logger.Printf("Received 401 error, no further events will be posted since SDK key is invalid")
ed.config.Logger.Println(httpErrorMessage(err.Code, "posting events", "some events were dropped"))
if err != nil && !isHTTPErrorRecoverable(err.Code) {
ed.stateLock.Lock()
defer ed.stateLock.Unlock()
ed.disabled = true
Expand Down Expand Up @@ -422,7 +421,7 @@ func (t *sendEventsTask) postEvents(outputEvents []interface{}) *http.Response {
if respErr != nil {
t.logger.Printf("Unexpected error while sending events: %+v", respErr)
continue
} else if resp.StatusCode >= 500 {
} else if resp.StatusCode >= 400 && isHTTPErrorRecoverable(resp.StatusCode) {
t.logger.Printf("Received error status %d when sending events", resp.StatusCode)
continue
} else {
Expand Down
64 changes: 47 additions & 17 deletions event_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ldclient

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
Expand Down Expand Up @@ -502,23 +503,52 @@ func TestUserAgentIsSent(t *testing.T) {
assert.Equal(t, config.UserAgent, msg.Header.Get("User-Agent"))
}

func TestFlushIsRetriedOnceAfter5xxError(t *testing.T) {
ep, st := createEventProcessor(epDefaultConfig)
defer ep.Close()

st.statusCode = 503

ie := NewIdentifyEvent(epDefaultUser)
ep.SendEvent(ie)
ep.Flush()
ep.waitUntilInactive()

msg := st.getNextRequest()
assert.NotNil(t, msg)
msg = st.getNextRequest()
assert.NotNil(t, msg)
msg = st.getNextRequest()
assert.Nil(t, msg)
var httpErrorTests = []struct {
status int
recoverable bool
}{
{401, false},
{403, false},
{408, true},
{429, true},
{500, true},
{503, true},
}

func TestHTTPErrorHandling(t *testing.T) {
for _, tt := range httpErrorTests {
t.Run(fmt.Sprintf("%d error, recoverable: %v", tt.status, tt.recoverable), func(t *testing.T) {
ep, st := createEventProcessor(epDefaultConfig)
defer ep.Close()

st.statusCode = tt.status

ie := NewIdentifyEvent(epDefaultUser)
ep.SendEvent(ie)
ep.Flush()
ep.waitUntilInactive()

msg := st.getNextRequest()
assert.NotNil(t, msg)

if tt.recoverable {
msg = st.getNextRequest() // 2nd request is a retry of the 1st
assert.NotNil(t, msg)
msg = st.getNextRequest()
assert.Nil(t, msg)
} else {
msg = st.getNextRequest()
assert.Nil(t, msg)

ep.SendEvent(ie)
ep.Flush()
ep.waitUntilInactive()

msg = st.getNextRequest()
assert.Nil(t, msg)
}
})
}
}

func jsonMap(o interface{}) map[string]interface{} {
Expand Down
4 changes: 2 additions & 2 deletions polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (pp *pollingProcessor) Start(closeWhenReady chan<- struct{}) {
if err := pp.poll(); err != nil {
pp.config.Logger.Printf("ERROR: Error when requesting feature updates: %+v", err)
if hse, ok := err.(*HttpStatusError); ok {
if hse.Code == 401 {
pp.config.Logger.Printf("ERROR: Received 401 error, no further polling requests will be made since SDK key is invalid")
pp.config.Logger.Printf("ERROR: %s", httpErrorMessage(hse.Code, "polling request", "will retry"))
if !isHTTPErrorRecoverable(hse.Code) {
notifyReady()
return
}
Expand Down
15 changes: 8 additions & 7 deletions streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (sp *streamProcessor) events(closeWhenReady chan<- struct{}) {
}
if err != io.EOF {
sp.config.Logger.Printf("ERROR: Error encountered processing stream: %+v", err)
if sp.checkUnauthorized(err) {
if sp.checkIfPermanentFailure(err) {
sp.closeOnce.Do(func() {
sp.config.Logger.Printf("Closing event stream.")
// TODO: enable this when we trust stream.Close() never to panic (see https://github.com/donovanhide/eventsource/pull/33)
Expand Down Expand Up @@ -215,8 +215,7 @@ func (sp *streamProcessor) subscribe(closeWhenReady chan<- struct{}) {
sp.config.Logger.Printf("Connecting to LaunchDarkly stream using URL: %s", req.URL.String())

if stream, err := es.SubscribeWithRequest("", req); err != nil {
sp.config.Logger.Printf("ERROR: Error subscribing to stream: %+v using URL: %s", err, req.URL.String())
if sp.checkUnauthorized(err) {
if sp.checkIfPermanentFailure(err) {
close(closeWhenReady)
return
}
Expand All @@ -239,10 +238,10 @@ func (sp *streamProcessor) subscribe(closeWhenReady chan<- struct{}) {
}
}

func (sp *streamProcessor) checkUnauthorized(err error) bool {
func (sp *streamProcessor) checkIfPermanentFailure(err error) bool {
if se, ok := err.(es.SubscriptionError); ok {
if se.Code == 401 {
sp.config.Logger.Printf("ERROR: Received 401 error, no further streaming connection will be made since SDK key is invalid")
sp.config.Logger.Printf("ERROR: %s", httpErrorMessage(se.Code, "streaming connection", "will retry"))
if !isHTTPErrorRecoverable(se.Code) {
return true
}
}
Expand All @@ -253,7 +252,9 @@ func (sp *streamProcessor) checkUnauthorized(err error) bool {
func (sp *streamProcessor) Close() error {
sp.closeOnce.Do(func() {
sp.config.Logger.Printf("Closing event stream.")
sp.stream.Close()
if sp.stream != nil {
sp.stream.Close()
}
close(sp.halt)
})
return nil
Expand Down
109 changes: 84 additions & 25 deletions streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestStreamProcessor_401ShouldNotBlock(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
}))
defer ts.Close()

cfg := Config{
StreamUri: ts.URL,
Logger: log.New(ioutil.Discard, "", 0),
}

sp := newStreamProcessor("sdkKey", cfg, nil)

closeWhenReady := make(chan struct{})

sp.Start(closeWhenReady)

select {
case <-closeWhenReady:
case <-time.After(time.Second):
assert.Fail(t, "Receiving 401 shouldn't block")
}
}

type testEvent struct {
id, event, data string
}
Expand Down Expand Up @@ -88,6 +64,7 @@ func runStreamingTest(t *testing.T, initialEvent eventsource.Event, test func(ev

requestor := newRequestor("sdkKey", cfg)
sp := newStreamProcessor("sdkKey", cfg, requestor)
defer sp.Close()

closeWhenReady := make(chan struct{})

Expand All @@ -108,7 +85,7 @@ func TestStreamProcessor(t *testing.T) {
initialPutEvent := &testEvent{
event: putEvent,
data: `{"path": "/", "data": {
"flags": {"my-flag": {"key": "my-flag", "version": 2}},
"flags": {"my-flag": {"key": "my-flag", "version": 2}},
"segments": {"my-segment": {"key": "my-segment", "version": 5}}
}}`,
}
Expand Down Expand Up @@ -207,3 +184,85 @@ func waitForDelete(t *testing.T, store FeatureStore, kind VersionedDataKind, key
assert.NoError(t, err)
assert.Nil(t, item)
}

func TestStreamProcessorFailsImmediatelyOn401(t *testing.T) {
testStreamProcessorUnrecoverableError(t, 401)
}

func TestStreamProcessorFailsImmediatelyOn403(t *testing.T) {
testStreamProcessorUnrecoverableError(t, 403)
}

func TestStreamProcessorDoesNotFailImmediatelyOn500(t *testing.T) {
testStreamProcessorRecoverableError(t, 500)
}

func testStreamProcessorUnrecoverableError(t *testing.T, statusCode int) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(statusCode)
}))
defer ts.Close()

cfg := Config{
StreamUri: ts.URL,
FeatureStore: NewInMemoryFeatureStore(log.New(ioutil.Discard, "", 0)),
Logger: log.New(ioutil.Discard, "", 0),
}

sp := newStreamProcessor("sdkKey", cfg, nil)
defer sp.Close()

closeWhenReady := make(chan struct{})

sp.Start(closeWhenReady)

select {
case <-closeWhenReady:
assert.False(t, sp.Initialized())
case <-time.After(time.Second * 3):
assert.Fail(t, "Initialization shouldn't block after this error")
}
}

func testStreamProcessorRecoverableError(t *testing.T, statusCode int) {
initialPutEvent := &testEvent{
event: putEvent,
data: `{"path": "/", "data": {
"flags": {"my-flag": {"key": "my-flag", "version": 2}},
"segments": {"my-segment": {"key": "my-segment", "version": 5}}
}}`,
}
esserver := eventsource.NewServer()
esserver.ReplayAll = true
esserver.Register("test", &testRepo{initialEvent: initialPutEvent})

attempt := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if attempt == 0 {
w.WriteHeader(statusCode)
} else {
esserver.Handler("test").ServeHTTP(w, r)
}
attempt++
}))
defer ts.Close()

cfg := Config{
StreamUri: ts.URL,
FeatureStore: NewInMemoryFeatureStore(log.New(ioutil.Discard, "", 0)),
Logger: log.New(ioutil.Discard, "", 0),
}

sp := newStreamProcessor("sdkKey", cfg, nil)
defer sp.Close()

closeWhenReady := make(chan struct{})
sp.Start(closeWhenReady)

select {
case <-closeWhenReady:
assert.True(t, sp.Initialized())
case <-time.After(time.Second * 3):
assert.Fail(t, "Should have successfully retried before now")
}
}
28 changes: 28 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,31 @@ func MakeAllVersionedDataMap(
}
return allData
}

// Tests whether an HTTP error status represents a condition that might resolve on its own if we retry.
func isHTTPErrorRecoverable(statusCode int) bool {
if statusCode >= 400 && statusCode < 500 {
switch statusCode {
case 408: // request timeout
return true
case 429: // too many requests
return true
default:
return false // all other 4xx errors are unrecoverable
}
}
return true
}

func httpErrorMessage(statusCode int, context string, recoverableMessage string) string {
statusDesc := ""
if statusCode == 401 {
statusDesc = " (invalid SDK key)"
}
resultMessage := recoverableMessage
if !isHTTPErrorRecoverable(statusCode) {
resultMessage = "giving up permanently"
}
return fmt.Sprintf("Received HTTP error %d%s for %s - %s",
statusCode, statusDesc, context, resultMessage)
}

0 comments on commit aaee8b0

Please sign in to comment.