Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ func (e *ServerRequest) setError(err types.Error, errMsg string) {
}
}

func (e *ServerRequest) HasResponse() bool {
e.mux.Lock()
defer e.mux.Unlock()
return e.hasResponse()
}

func (e *ServerRequest) hasResponse() bool {
return e.response != nil
}
Expand Down
10 changes: 10 additions & 0 deletions core/capabilities/remote/executable/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageID, "lenRequestIDs", len(requestIDs))
}

// If a previous request with this ID already completed (success or error),
// remove it so the retry creates a fresh ServerRequest. This handles the
// case where an upstream timeout (e.g. OCR consensus) resolves the request
// before the server's own expiry ticker cleans it up.
if existing, ok := r.requestIDToRequest[requestID]; ok && existing.request.HasResponse() {
r.lggr.Infow("replacing completed request to allow retry",
"requestID", requestID, "messageID", messageID)
delete(r.requestIDToRequest, requestID)
}

if _, ok := r.requestIDToRequest[requestID]; !ok {
callingDon, ok := cfg.workflowDONs[msg.CallerDonId]
if !ok {
Expand Down
68 changes: 68 additions & 0 deletions core/capabilities/remote/executable/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1000,3 +1001,70 @@ func Test_Server_Execute_WithConcurrentSetConfig(t *testing.T) {
expectedResponses := numWorkflowPeers * numExecuteCalls
require.Equal(t, expectedResponses, successCount)
}

func Test_Server_Execute_RetryAfterCapabilityError(t *testing.T) {
ctx := testutils.Context(t)
numCapabilityPeers := 4

capability := &TestFailThenSucceedCapability{failsRemaining: int64(numCapabilityPeers)}

callers, srvcs := testRemoteExecutableCapabilityServer(ctx, t,
&commoncap.RemoteExecutableConfig{}, capability,
10, 9, numCapabilityPeers, 3, 10*time.Minute, nil)

// First attempt — all callers send, capability returns error
for _, caller := range callers {
_, err := caller.Execute(t.Context(), commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
}

for _, caller := range callers {
for range numCapabilityPeers {
msg := <-caller.receivedMessages
assert.Equal(t, remotetypes.Error_INTERNAL_ERROR, msg.Error)
}
}

// Retry with the same execution ID — should succeed because the server
// replaces the completed (errored) request with a fresh one.
for _, caller := range callers {
_, err := caller.Execute(t.Context(), commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
}

for _, caller := range callers {
for range numCapabilityPeers {
msg := <-caller.receivedMessages
assert.Equal(t, remotetypes.Error_OK, msg.Error)
}
}

closeServices(t, srvcs)
}

type TestFailThenSucceedCapability struct {
abstractTestCapability
failsRemaining int64
}

func (c *TestFailThenSucceedCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
if atomic.AddInt64(&c.failsRemaining, -1) >= 0 {
return commoncap.CapabilityResponse{}, errors.New("transient error")
}

response, err := values.NewMap(map[string]any{"response": "ok"})
if err != nil {
return commoncap.CapabilityResponse{}, err
}
return commoncap.CapabilityResponse{Value: response}, nil
}
Loading