From ec215a35bdd6fa4c07e98c57c5505a028592ff6c Mon Sep 17 00:00:00 2001 From: Annirudh Prasad Date: Mon, 6 May 2024 08:12:37 -0700 Subject: [PATCH 1/6] fix --- hedge.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ hedge_test.go | 1 + 2 files changed, 45 insertions(+) create mode 100644 hedge.go create mode 100644 hedge_test.go diff --git a/hedge.go b/hedge.go new file mode 100644 index 0000000..ecbd375 --- /dev/null +++ b/hedge.go @@ -0,0 +1,44 @@ +package parallel + +import ( + "context" + "time" +) + +type HedgedRequestConfig struct { + delay time.Duration // time to wait before issuing hedged requests + maxOutstandingRequests int // the maximum permitted number of outstanding requests +} + +func (h *HedgedRequestConfig) Apply(opts ...HedgedRequestOpt) { + for _, opt := range opts { + opt(h) + } +} + +type HedgedRequestOpt func(config *HedgedRequestConfig) + +func WithDelay(delay time.Duration) HedgedRequestOpt { + return func(config *HedgedRequestConfig) { + config.delay = delay + } +} + +func WithMaxOutstandingRequests(maxOutstandingRequests int) HedgedRequestOpt { + return func(config *HedgedRequestConfig) { + config.maxOutstandingRequests = maxOutstandingRequests + } +} + +func HedgedRequest[T any]( + ctx context.Context, + requester func(context.Context) (T, error), + opts ...HedgedRequestOpt, +) (T, error) { + cfg := HedgedRequestConfig{ + delay: 50 * time.Millisecond, + maxOutstandingRequests: 3, + } + cfg.Apply(opts...) + +} diff --git a/hedge_test.go b/hedge_test.go new file mode 100644 index 0000000..13c46a5 --- /dev/null +++ b/hedge_test.go @@ -0,0 +1 @@ +package parallel From 1239f672065a39515eb0afc6845df4afdd8a8ef9 Mon Sep 17 00:00:00 2001 From: Annirudh Prasad Date: Mon, 6 May 2024 11:57:45 -0700 Subject: [PATCH 2/6] fix --- hedge.go | 65 +++++++++++++++++++++++++++++++++++---- hedge_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 6 deletions(-) diff --git a/hedge.go b/hedge.go index ecbd375..2046184 100644 --- a/hedge.go +++ b/hedge.go @@ -6,8 +6,8 @@ import ( ) type HedgedRequestConfig struct { - delay time.Duration // time to wait before issuing hedged requests - maxOutstandingRequests int // the maximum permitted number of outstanding requests + delay time.Duration // time to wait before issuing hedged requests + numHedgedRequests int // the maximum permitted number of outstanding hedged requests } func (h *HedgedRequestConfig) Apply(opts ...HedgedRequestOpt) { @@ -24,9 +24,9 @@ func WithDelay(delay time.Duration) HedgedRequestOpt { } } -func WithMaxOutstandingRequests(maxOutstandingRequests int) HedgedRequestOpt { +func WithNumHedgedRequests(numHedgedRequests int) HedgedRequestOpt { return func(config *HedgedRequestConfig) { - config.maxOutstandingRequests = maxOutstandingRequests + config.numHedgedRequests = numHedgedRequests } } @@ -36,9 +36,62 @@ func HedgedRequest[T any]( opts ...HedgedRequestOpt, ) (T, error) { cfg := HedgedRequestConfig{ - delay: 50 * time.Millisecond, - maxOutstandingRequests: 3, + delay: 50 * time.Millisecond, + numHedgedRequests: 2, } cfg.Apply(opts...) + hedgeSignal := make(chan struct{}) // will be closed when hedged requests should be fire. + responses := make(chan T) // unbuffered, we only expect one response + ctx, cancel := context.WithCancelCause(ctx) + + group := ErrGroup(Limited(ctx, cfg.numHedgedRequests+1)) + for i := 0; i < cfg.numHedgedRequests+1; i++ { + i := i + group.Go(func(ctx context.Context) (rerr error) { + defer func() { + cancel(rerr) + }() + + if i == 0 { + // Initial request case: if this does not complete within the hedge delay, we signal the + // hedge requests to fire off. + time.AfterFunc(cfg.delay, func() { + close(hedgeSignal) + }) + } else { + // Hedged request case: wait for the go-ahead for hedged requests first. + select { + case <-ctx.Done(): + return context.Cause(ctx) + case <-hedgeSignal: + // good to proceed + } + } + + res, err := requester(ctx) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return context.Cause(ctx) + case responses <- res: + return nil + } + }) + } + + go func() { + _ = group.Wait() + close(responses) + }() + + for response := range responses { + return response, nil + } + + var empty T + return empty, context.Cause(ctx) } diff --git a/hedge_test.go b/hedge_test.go index 13c46a5..733d312 100644 --- a/hedge_test.go +++ b/hedge_test.go @@ -1 +1,85 @@ package parallel + +import ( + "time" + "context" + "testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" + "fmt" + "errors" +) + +func TestHedgedRequestBasic(t *testing.T) { + ctx := context.Background() + count := 0 + expectedResult := "success" + requester := func(ctx context.Context) (string, error) { + count++ + if count == 1 { + return expectedResult, nil + } else { + return "fail", fmt.Errorf("should not trigger hedged request") + } + } + + result, err := HedgedRequest[string](ctx, requester) + + require.NoError(t, err) + assert.Equal(t, expectedResult, result) +} + +func TestHedgedRequestHedgingTriggered(t *testing.T) { + ctx := context.Background() + count := 0 + delay := 50 * time.Millisecond + expectedResult := "success" + requester := func(ctx context.Context) (string, error) { + count++ + if count == 0 { + select { + case <-time.After(2 * delay): + return "fail", fmt.Errorf("original request slow") + case <-ctx.Done(): + return "fail", ctx.Err() + } + } else { + return "success", nil + } + } + + result, err := HedgedRequest[string](ctx, requester, WithDelay(delay), WithNumHedgedRequests(1)) + + require.NoError(t, err) + assert.Equal(t, expectedResult, result) +} + +func TestHedgedRequestErrorPropagation(t *testing.T) { + ctx := context.Background() + expectedError := errors.New("failure") + requester := func(ctx context.Context) (string, error) { + return "fail", expectedError + } + + _, err := HedgedRequest[string](ctx, requester) + + require.ErrorIs(t, err, expectedError) +} + +func TestHedgedRequestCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + delay := 100 * time.Millisecond + requester := func(ctx context.Context) (string, error) { + <-time.After(delay) + return "fail", fmt.Errorf("context should be canceled") + } + + go func() { + // Cancel context before the requester completes + time.Sleep(10 * time.Millisecond) + cancel() + }() + + _, err := HedgedRequest[string](ctx, requester) + require.ErrorIs(t, err, context.Canceled) +} From df1173859b16bfade8ca59e0ef6d3142d33deba3 Mon Sep 17 00:00:00 2001 From: Annirudh Prasad Date: Mon, 6 May 2024 12:01:26 -0700 Subject: [PATCH 3/6] fix --- hedge.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hedge.go b/hedge.go index 2046184..eb114b6 100644 --- a/hedge.go +++ b/hedge.go @@ -41,7 +41,7 @@ func HedgedRequest[T any]( } cfg.Apply(opts...) - hedgeSignal := make(chan struct{}) // will be closed when hedged requests should be fire. + hedgeSignal := make(chan struct{}) // closed when hedged requests should fire responses := make(chan T) // unbuffered, we only expect one response ctx, cancel := context.WithCancelCause(ctx) From f4c04d9c41f7b06f4198e51822fdb17c4a0e363e Mon Sep 17 00:00:00 2001 From: Annirudh Prasad Date: Mon, 6 May 2024 12:05:16 -0700 Subject: [PATCH 4/6] linters --- hedge_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hedge_test.go b/hedge_test.go index 733d312..203a620 100644 --- a/hedge_test.go +++ b/hedge_test.go @@ -1,13 +1,14 @@ package parallel import ( - "time" "context" + "errors" + "fmt" "testing" - "github.com/stretchr/testify/require" + "time" + "github.com/stretchr/testify/assert" - "fmt" - "errors" + "github.com/stretchr/testify/require" ) func TestHedgedRequestBasic(t *testing.T) { From fdd7cb11595f6eb42ef4b7e8e2502cc940f86672 Mon Sep 17 00:00:00 2001 From: Annirudh Prasad Date: Mon, 6 May 2024 13:24:54 -0700 Subject: [PATCH 5/6] fix --- hedge_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hedge_test.go b/hedge_test.go index 203a620..893b381 100644 --- a/hedge_test.go +++ b/hedge_test.go @@ -45,7 +45,7 @@ func TestHedgedRequestHedgingTriggered(t *testing.T) { return "fail", ctx.Err() } } else { - return "success", nil + return expectedResult, nil } } @@ -82,5 +82,6 @@ func TestHedgedRequestCancellation(t *testing.T) { }() _, err := HedgedRequest[string](ctx, requester) + require.ErrorIs(t, err, context.Canceled) } From 871d875b27d9f46ac0a81b7d6ce371bbd557c61d Mon Sep 17 00:00:00 2001 From: Annirudh Prasad Date: Mon, 6 May 2024 14:52:16 -0700 Subject: [PATCH 6/6] fix --- hedge_test.go | 55 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/hedge_test.go b/hedge_test.go index 893b381..936f781 100644 --- a/hedge_test.go +++ b/hedge_test.go @@ -14,27 +14,27 @@ import ( func TestHedgedRequestBasic(t *testing.T) { ctx := context.Background() count := 0 - expectedResult := "success" + expected := "success" requester := func(ctx context.Context) (string, error) { count++ if count == 1 { - return expectedResult, nil + return expected, nil } else { return "fail", fmt.Errorf("should not trigger hedged request") } } - result, err := HedgedRequest[string](ctx, requester) + actual, err := HedgedRequest[string](ctx, requester) require.NoError(t, err) - assert.Equal(t, expectedResult, result) + assert.Equal(t, expected, actual) } func TestHedgedRequestHedgingTriggered(t *testing.T) { ctx := context.Background() count := 0 delay := 50 * time.Millisecond - expectedResult := "success" + expected := "success" requester := func(ctx context.Context) (string, error) { count++ if count == 0 { @@ -45,26 +45,59 @@ func TestHedgedRequestHedgingTriggered(t *testing.T) { return "fail", ctx.Err() } } else { - return expectedResult, nil + return expected, nil } } - result, err := HedgedRequest[string](ctx, requester, WithDelay(delay), WithNumHedgedRequests(1)) + actual, err := HedgedRequest[string](ctx, requester, WithDelay(delay), WithNumHedgedRequests(1)) require.NoError(t, err) - assert.Equal(t, expectedResult, result) + assert.Equal(t, expected, actual) +} + +func TestHedgedRequestMultipleSuccess(t *testing.T) { + ctx := context.Background() + expected := "success" + delay := 5 * time.Millisecond + done := make(chan struct{}) + requester := func(ctx context.Context) (string, error) { + // Synchronize on the done channel. The original request and + // all hedged requests will line up and block here. + select { + case <-done: + return expected, nil + case <-ctx.Done(): + return "fail", ctx.Err() + } + } + + // Wait for 2x the delay to ensure that all hedged requests have + // fired alongside the original request. Then close the done channel + // so the original and hedged requests complete almost simultaneously. + time.AfterFunc(2*delay, func() { + close(done) + }) + + actual, err := HedgedRequest[string]( + ctx, + requester, + WithDelay(delay), + ) + + require.NoError(t, err) + assert.Equal(t, expected, actual) } func TestHedgedRequestErrorPropagation(t *testing.T) { ctx := context.Background() - expectedError := errors.New("failure") + expectedErr := errors.New("failure") requester := func(ctx context.Context) (string, error) { - return "fail", expectedError + return "fail", expectedErr } _, err := HedgedRequest[string](ctx, requester) - require.ErrorIs(t, err, expectedError) + require.ErrorIs(t, err, expectedErr) } func TestHedgedRequestCancellation(t *testing.T) {