Skip to content

Commit

Permalink
[Backport] Retry upon coordination proposal failure (#3807)
Browse files Browse the repository at this point in the history
This pull request backports
#3804 to the
`releases/mainnet/v2.0.1` branch.
  • Loading branch information
lukasz-zimnoch authored Apr 11, 2024
2 parents 45acddd + 552f9c1 commit f958ba0
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 2 deletions.
40 changes: 39 additions & 1 deletion pkg/tbtc/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"math/rand"
"sort"
"strings"
"time"

"github.com/keep-network/keep-core/pkg/internal/pb"
"go.uber.org/zap"
Expand Down Expand Up @@ -578,13 +580,15 @@ func (ce *coordinationExecutor) executeLeaderRoutine(
) (CoordinationProposal, error) {
walletPublicKeyHash := ce.walletPublicKeyHash()

proposal, err := ce.proposalGenerator.Generate(
proposal, err := ce.generateProposal(
&CoordinationProposalRequest{
WalletPublicKeyHash: walletPublicKeyHash,
WalletOperators: ce.coordinatedWallet.signingGroupOperators,
ExecutingOperator: ce.operatorAddress,
ActionsChecklist: actionsChecklist,
},
2, // 2 attempts at most
1*time.Minute, // 1 minute between attempts
)
if err != nil {
return nil, fmt.Errorf("failed to generate proposal: [%v]", err)
Expand Down Expand Up @@ -615,6 +619,40 @@ func (ce *coordinationExecutor) executeLeaderRoutine(
return proposal, nil
}

// generateProposal generates a proposal for the given coordination request.
// The generator retries the proposal generation if it fails. The number of
// attempts is limited to attemptLimit. The generator waits for retryDelay
// between attempts.
func (ce *coordinationExecutor) generateProposal(
request *CoordinationProposalRequest,
attemptLimit uint,
retryDelay time.Duration,
) (CoordinationProposal, error) {
var attemptErrs []string

for attempt := uint(1); attempt <= attemptLimit; attempt++ {
if attempt > 1 {
time.Sleep(retryDelay)
}

proposal, err := ce.proposalGenerator.Generate(request)
if err != nil {
attemptErrs = append(
attemptErrs,
fmt.Sprintf("attempt [%v] error: [%v]", attempt, err),
)
continue
}

return proposal, nil
}

return nil, fmt.Errorf(
"all attempts failed: [%v]",
strings.Join(attemptErrs, "; "),
)
}

// executeFollowerRoutine executes the follower's routine for the given coordination
// window. The routine listens for the coordination message from the leader and
// validates it. If the leader's proposal is valid, it returns the received
Expand Down
103 changes: 102 additions & 1 deletion pkg/tbtc/coordination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestCoordinationExecutor_Coordinate(t *testing.T) {
func(
walletPublicKeyHash [20]byte,
actionsChecklist []WalletActionType,
_ uint,
) (CoordinationProposal, error) {
for _, action := range actionsChecklist {
if walletPublicKeyHash == publicKeyHash && action == ActionRedemption {
Expand Down Expand Up @@ -692,6 +693,7 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) {
func(
walletPublicKeyHash [20]byte,
actionsChecklist []WalletActionType,
_ uint,
) (
CoordinationProposal,
error,
Expand Down Expand Up @@ -791,6 +793,97 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) {
}
}

func TestCoordinationExecutor_GenerateProposal(t *testing.T) {
var tests = map[string]struct {
proposalGenerator CoordinationProposalGenerator
expectedProposal CoordinationProposal
expectedError error
}{
"first attempt success": {
proposalGenerator: newMockCoordinationProposalGenerator(
func(
_ [20]byte,
_ []WalletActionType,
_ uint,
) (CoordinationProposal, error) {
return &NoopProposal{}, nil
},
),
expectedProposal: &NoopProposal{},
expectedError: nil,
},
"last attempt success": {
proposalGenerator: newMockCoordinationProposalGenerator(
func(
_ [20]byte,
_ []WalletActionType,
call uint,
) (CoordinationProposal, error) {
if call == 1 {
return nil, fmt.Errorf("unexpected error")
} else if call == 2 {
return &NoopProposal{}, nil
} else {
panic("unexpected call")
}
},
),
expectedProposal: &NoopProposal{},
expectedError: nil,
},
"all attempts failed": {
proposalGenerator: newMockCoordinationProposalGenerator(
func(
_ [20]byte,
_ []WalletActionType,
call uint,
) (CoordinationProposal, error) {
return nil, fmt.Errorf("unexpected error %v", call)
},
),
expectedProposal: nil,
expectedError: fmt.Errorf(
"all attempts failed: [attempt [1] error: [unexpected error 1]; attempt [2] error: [unexpected error 2]]",
),
},
}

for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
executor := &coordinationExecutor{
// Set only relevant fields.
proposalGenerator: test.proposalGenerator,
}

proposal, err := executor.generateProposal(
&CoordinationProposalRequest{}, // request fields not relevant
2,
1*time.Second,
)

if !reflect.DeepEqual(test.expectedError, err) {
t.Errorf(
"unexpected error\n"+
"expected: %v\n"+
"actual: %v\n",
test.expectedError,
err,
)
}

if !reflect.DeepEqual(test.expectedProposal, proposal) {
t.Errorf(
"unexpected proposal\n"+
"expected: %v\n"+
"actual: %v\n",
test.expectedProposal,
proposal,
)
}
})
}
}

func TestCoordinationExecutor_ExecuteFollowerRoutine(t *testing.T) {
// Uncompressed public key corresponding to the 20-byte public key hash:
// aa768412ceed10bd423c025542ca90071f9fb62d.
Expand Down Expand Up @@ -1163,16 +1256,19 @@ func TestCoordinationExecutor_ExecuteFollowerRoutine_WithIdleLeader(t *testing.T
}

type mockCoordinationProposalGenerator struct {
calls uint
delegate func(
walletPublicKeyHash [20]byte,
actionsChecklist []WalletActionType,
call uint,
) (CoordinationProposal, error)
}

func newMockCoordinationProposalGenerator(
delegate func(
walletPublicKeyHash [20]byte,
actionsChecklist []WalletActionType,
call uint,
) (CoordinationProposal, error),
) *mockCoordinationProposalGenerator {
return &mockCoordinationProposalGenerator{
Expand All @@ -1183,5 +1279,10 @@ func newMockCoordinationProposalGenerator(
func (mcpg *mockCoordinationProposalGenerator) Generate(
request *CoordinationProposalRequest,
) (CoordinationProposal, error) {
return mcpg.delegate(request.WalletPublicKeyHash, request.ActionsChecklist)
mcpg.calls++
return mcpg.delegate(
request.WalletPublicKeyHash,
request.ActionsChecklist,
mcpg.calls,
)
}

0 comments on commit f958ba0

Please sign in to comment.