Skip to content

Merge hotfix release/2.19.0-dual-transmission to develop #15859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ jobs:
if: needs.changes.outputs.core_changes == 'false' || needs.solana-test-image-exists.outputs.exists == 'true'

solana-smoke-tests:
if: ${{ !contains(join(github.event.pull_request.labels.*.name, ' '), 'skip-smoke-tests') }}
if: ${{ contains(join(github.event.pull_request.labels.*.name, ' '), 'skip-smoke-tests') }}
environment: integration
permissions:
checks: write
Expand Down
6 changes: 6 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ packages:
BalanceMonitor:
config:
dir: "{{ .InterfaceDir }}/../mocks"
github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm:
interfaces:
Client:
TxStore:
AttemptBuilder:
Keystore:
github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr:
interfaces:
ChainConfig:
Expand Down
39 changes: 38 additions & 1 deletion common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ type TxManager[
GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error)
}

type TxmV2Wrapper[
CHAIN_ID types.ID,
HEAD types.Head[BLOCK_HASH],
ADDR types.Hashable,
TX_HASH types.Hashable,
BLOCK_HASH types.Hashable,
SEQ types.Sequence,
FEE feetypes.Fee,
] interface {
services.Service
CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Reset(addr ADDR, abandon bool) error
}

type reset struct {
// f is the function to execute between stopping/starting the
// Broadcaster and Confirmer
Expand Down Expand Up @@ -112,6 +126,7 @@ type Txm[
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -147,6 +162,7 @@ func NewTxm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
logger: logger.Sugared(lggr),
Expand All @@ -169,6 +185,7 @@ func NewTxm[
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
txmv2wrapper: txmv2wrapper,
}

if txCfg.ResendAfterThreshold() <= 0 {
Expand Down Expand Up @@ -207,6 +224,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

if b.txmv2wrapper != nil {
if err := ms.Start(ctx, b.txmv2wrapper); err != nil {
return fmt.Errorf("Txm: Txmv2 failed to start: %w", err)
}
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -237,6 +260,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr
f := func() {
if abandon {
err = b.abandon(addr)
if b.txmv2wrapper != nil {
if err2 := b.txmv2wrapper.Reset(addr, abandon); err2 != nil {
b.logger.Error("failed to abandon transactions for dual broadcast", "err", err2)
}
}
}
}

Expand Down Expand Up @@ -460,6 +488,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
if b.txmv2wrapper != nil {
err = b.txmv2wrapper.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -513,11 +547,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing
// Skipping CreateTransaction to avoid double send
if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast {
return b.txmv2wrapper.CreateTransaction(ctx, txRequest)
}
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil {
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ func makeTestEvmTxm(
lp,
keyStore,
estimator,
ht)
ht,
nil)
require.NoError(t, err, "can't create tx manager")

_, unsub := broadcaster.Subscribe(txm)
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (e *EVMConfig) BalanceMonitor() BalanceMonitor {
return &balanceMonitorConfig{c: e.C.BalanceMonitor}
}

func (e *EVMConfig) TxmV2() TxmV2 {
return &txmv2Config{c: e.C.TxmV2}
}

func (e *EVMConfig) Transactions() Transactions {
return &transactionsConfig{c: e.C.Transactions}
}
Expand Down
25 changes: 25 additions & 0 deletions core/chains/evm/config/chain_scoped_txmv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package config

import (
"net/url"
"time"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml"
)

type txmv2Config struct {
c toml.TxmV2
}

func (t *txmv2Config) Enabled() bool {
return *t.c.Enabled
}

func (t *txmv2Config) BlockTime() *time.Duration {
d := t.c.BlockTime.Duration()
return &d
}

func (t *txmv2Config) CustomURL() *url.URL {
return t.c.CustomURL.URL()
}
6 changes: 5 additions & 1 deletion core/chains/evm/config/chaintype/chaintype.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
ChainZkEvm ChainType = "zkevm"
ChainZkSync ChainType = "zksync"
ChainZircuit ChainType = "zircuit"
ChainDualBroadcast ChainType = "dualBroadcast"
)

// IsL2 returns true if this chain is a Layer 2 chain. Notably:
Expand All @@ -39,7 +40,7 @@ func (c ChainType) IsL2() bool {

func (c ChainType) IsValid() bool {
switch c {
case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainZircuit:
case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainZircuit, ChainDualBroadcast:
return true
}
return false
Expand Down Expand Up @@ -77,6 +78,8 @@ func FromSlug(slug string) ChainType {
return ChainZkSync
case "zircuit":
return ChainZircuit
case "dualBroadcast":
return ChainDualBroadcast
default:
return ChainType(slug)
}
Expand Down Expand Up @@ -144,4 +147,5 @@ var ErrInvalid = fmt.Errorf("must be one of %s or omitted", strings.Join([]strin
string(ChainZkEvm),
string(ChainZkSync),
string(ChainZircuit),
string(ChainDualBroadcast),
}, ", "))
7 changes: 7 additions & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type EVM interface {
HeadTracker() HeadTracker
BalanceMonitor() BalanceMonitor
TxmV2() TxmV2
Transactions() Transactions
GasEstimator() GasEstimator
OCR() OCR
Expand Down Expand Up @@ -102,6 +103,12 @@ type ClientErrors interface {
TooManyResults() string
}

type TxmV2 interface {
Enabled() bool
BlockTime() *time.Duration
CustomURL() *url.URL
}

type Transactions interface {
Enabled() bool
ForwardersEnabled() bool
Expand Down
55 changes: 53 additions & 2 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"slices"
"strconv"
"time"

"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/pelletier/go-toml/v2"
Expand Down Expand Up @@ -300,8 +301,10 @@ func (c *EVMConfig) ValidateConfig() (err error) {
is := c.ChainType.ChainType()
if is != must {
if must == "" {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: "must not be set with this chain id"})
if c.ChainType.ChainType() != chaintype.ChainDualBroadcast {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: "must not be set with this chain id"})
}
} else {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: fmt.Sprintf("only %q can be used with this chain id", must)})
Expand Down Expand Up @@ -387,6 +390,7 @@ type Chain struct {
FinalizedBlockOffset *uint32
NoNewFinalizedHeadsThreshold *commonconfig.Duration

TxmV2 TxmV2 `toml:",omitempty"`
Transactions Transactions `toml:",omitempty"`
BalanceMonitor BalanceMonitor `toml:",omitempty"`
GasEstimator GasEstimator `toml:",omitempty"`
Expand Down Expand Up @@ -448,6 +452,20 @@ func (c *Chain) ValidateConfig() (err error) {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)})
}
}
case chaintype.ChainDualBroadcast:
if c.Transactions.AutoPurge.DetectionApiUrl == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.DetectionApiUrl", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
if c.Transactions.AutoPurge.Threshold == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.Threshold", Msg: fmt.Sprintf("needs to be set if auto-purge feature is enabled for %s", chainType)})
} else if *c.Transactions.AutoPurge.Threshold == 0 {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.Threshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if auto-purge feature is enabled for %s", chainType)})
}
if c.TxmV2.Enabled != nil && *c.TxmV2.Enabled {
if c.TxmV2.CustomURL == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.CustomURL", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
}
default:
// Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist
if c.GasEstimator.BumpThreshold == nil {
Expand All @@ -471,6 +489,39 @@ func (c *Chain) ValidateConfig() (err error) {
return
}

type TxmV2 struct {
Enabled *bool `toml:",omitempty"`
BlockTime *commonconfig.Duration `toml:",omitempty"`
CustomURL *commonconfig.URL `toml:",omitempty"`
}

func (t *TxmV2) setFrom(f *TxmV2) {
if v := f.Enabled; v != nil {
t.Enabled = f.Enabled
}

if v := f.BlockTime; v != nil {
t.BlockTime = f.BlockTime
}

if v := f.CustomURL; v != nil {
t.CustomURL = f.CustomURL
}
}

func (t *TxmV2) ValidateConfig() (err error) {
if t.Enabled != nil && *t.Enabled {
if t.BlockTime == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.BlockTime", Msg: "must be set if txmv2 feature is enabled"})
return
}
if t.BlockTime.Duration() < 2*time.Second {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "TxmV2.BlockTime", Msg: "must be equal to or greater than 2 seconds"})
}
}
return
}

type Transactions struct {
Enabled *bool
ForwardersEnabled *bool
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (c *Chain) SetFrom(f *Chain) {
c.NoNewFinalizedHeadsThreshold = v
}

c.TxmV2.setFrom(&f.TxmV2)
c.Transactions.setFrom(&f.Transactions)
c.BalanceMonitor.setFrom(&f.BalanceMonitor)
c.GasEstimator.setFrom(&f.GasEstimator)
Expand Down
3 changes: 3 additions & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ FinalizedBlockOffset = 0
NoNewFinalizedHeadsThreshold = '0'
LogBroadcasterEnabled = true

[TxmV2]
Enabled = false

[Transactions]
Enabled = true
ForwardersEnabled = false
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/keystore/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ type Eth interface {
CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
SignTx(ctx context.Context, fromAddress common.Address, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error)
SignMessage(ctx context.Context, address common.Address, message []byte) ([]byte, error)
SubscribeToKeyChanges(ctx context.Context) (ch chan struct{}, unsub func())
}
Loading
Loading