Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-4420 Moving ocr2/plugins/ccip from the CCIP repo #15879

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (rf *CommitReportingPluginFactory) UpdateDynamicReaders(ctx context.Context
}
}

destPriceRegistryReader, err := rf.config.priceRegistryProvider.NewPriceRegistryReader(ctx, cciptypes.Address(newPriceRegAddr.String()))
destPriceRegistryReader, err := rf.config.priceRegistryProvider.NewPriceRegistryReader(context.Background(), cciptypes.Address(newPriceRegAddr.String()))
if err != nil {
return fmt.Errorf("init dynamic price registry: %w", err)
}
Expand All @@ -71,11 +71,14 @@ type reportingPluginAndInfo struct {
}

// NewReportingPlugin registers a new ReportingPlugin
func (rf *CommitReportingPluginFactory) NewReportingPlugin(ctx context.Context, config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay
maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries

pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(ctx, config), initialRetryDelay, maxDelay)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(
rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay, maxRetries,
)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -85,34 +88,36 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(ctx context.Context,
// NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to
// function, hence why we can only keep retrying it until it succeeds.
func (rf *CommitReportingPluginFactory) NewReportingPluginFn(ctx context.Context, config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
newReportingPluginFn := func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider adding some timeout

destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.ChangeConfig error: %w", err)
}

priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("GenericAddrToEvm error: %w", err)
}
if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("UpdateDynamicReaders error: %w", err)
}

pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.OffchainConfig error: %w", err)
}

gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.GasPriceEstimator error: %w", err)
}

err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("priceService.UpdateDynamicConfig error: %w", err)
}

lggr := rf.config.lggr.Named("CommitReportingPlugin")
Expand Down Expand Up @@ -145,4 +150,14 @@ func (rf *CommitReportingPluginFactory) NewReportingPluginFn(ctx context.Context

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}

return func() (reportingPluginAndInfo, error) {
result, err := newReportingPluginFn()
if err != nil {
rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err)
rf.config.metricsCollector.NewReportingPluginError()
}

return result, err
}
}
11 changes: 5 additions & 6 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (
"testing"
"time"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/core/logger"
ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
Expand All @@ -26,8 +24,9 @@ import (
// first call to each of these functions to fail, then all subsequent calls succeed. We assert that NewReportingPlugin
// retries a sufficient number of times to get through the transient errors and eventually succeed.
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
ctx := tests.Context(t)
commitConfig := CommitPluginStaticConfig{}
commitConfig.lggr = logger.TestLogger(t)
commitConfig.metricsCollector = ccip2.NoopMetricsCollector

// For this unit test, ensure that there is no delay between retries
commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
Expand Down Expand Up @@ -99,6 +98,6 @@ func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
reportingConfig.OffchainConfig = []byte{1, 2, 3}

// Assert that NewReportingPlugin succeeds despite many transient internal failures (mocked out above)
_, _, err := factory.NewReportingPlugin(ctx, reportingConfig)
_, _, err := factory.NewReportingPlugin(reportingConfig)
assert.Equal(t, nil, err)
}
57 changes: 9 additions & 48 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@ package ccipcommit
import (
"context"
"encoding/json"
"fmt"
"math/big"
"strings"
"time"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib"

"github.com/Masterminds/semver/v3"
"github.com/ethereum/go-ethereum/common"
libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
Expand All @@ -27,7 +22,6 @@ import (
db "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdb"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
Expand All @@ -40,9 +34,15 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}
var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: time.Second,
MaxDelay: 10 * time.Minute,
// Retry for approximately 4hrs (MaxDelay of 10m = 6 times per hour, times 4 hours, plus 10 because the first
// 10 retries only take 20 minutes due to an initial retry of 1s and exponential backoff)
MaxRetries: (6 * 4) + 10,
}

func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, priceGetter ccip.AllTokensPriceGetter, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec

var pluginConfig ccipconfig.CommitPluginJobSpecConfig
Expand All @@ -69,45 +69,6 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c
commitStoreReader = ccip.NewProviderProxyCommitStoreReader(srcCommitStore, dstCommitStore)
commitLggr := lggr.Named("CCIPCommit").With("sourceChain", sourceChainID, "destChain", destChainID)

var priceGetter pricegetter.AllTokensPriceGetter
withPipeline := strings.Trim(pluginConfig.TokenPricesUSDPipeline, "\n\t ") != ""
if withPipeline {
priceGetter, err = pricegetter.NewPipelineGetter(pluginConfig.TokenPricesUSDPipeline, pr, jb.ID, jb.ExternalJobID, jb.Name.ValueOrZero(), lggr)
if err != nil {
return nil, fmt.Errorf("creating pipeline price getter: %w", err)
}
} else {
// Use dynamic price getter.
if pluginConfig.PriceGetterConfig == nil {
return nil, fmt.Errorf("priceGetterConfig is nil")
}

// Build price getter clients for all chains specified in the aggregator configurations.
// Some lanes (e.g. Wemix/Kroma) requires other clients than source and destination, since they use feeds from other chains.
priceGetterClients := map[uint64]pricegetter.DynamicPriceGetterClient{}
for _, aggCfg := range pluginConfig.PriceGetterConfig.AggregatorPrices {
chainID := aggCfg.ChainID
// Retrieve the chain.
chain, _, err2 := ccipconfig.GetChainByChainID(chainSet, chainID)
if err2 != nil {
return nil, fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2)
}
caller := rpclib.NewDynamicLimitedBatchCaller(
lggr,
chain.Client(),
rpclib.DefaultRpcBatchSizeLimit,
rpclib.DefaultRpcBatchBackOffMultiplier,
rpclib.DefaultMaxParallelRpcCalls,
)
priceGetterClients[chainID] = pricegetter.NewDynamicPriceGetterClient(caller)
}

priceGetter, err = pricegetter.NewDynamicPriceGetter(*pluginConfig.PriceGetterConfig, priceGetterClients)
if err != nil {
return nil, fmt.Errorf("creating dynamic price getter: %w", err)
}
}

offRampReader, err := dstProvider.NewOffRampReader(ctx, pluginConfig.OffRamp)
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,7 +117,7 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c
onRampAddress,
)

orm, err := cciporm.NewObservedORM(ds, lggr)
orm, err := cciporm.NewORM(ds, lggr)
if err != nil {
return nil, err
}
Expand Down
86 changes: 61 additions & 25 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,32 +453,50 @@ func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time
return nil, nil, err
}

return r.calculatePriceUpdates(ctx, gasPriceObs, tokenPriceObs, latestGasPrice, latestTokenPrices)
return r.calculatePriceUpdates(gasPriceObs, tokenPriceObs, latestGasPrice, latestTokenPrices)
}

// Note priceUpdates must be deterministic.
// The provided gasPriceObs and tokenPriceObs should not contain nil values.
// The returned latestGasPrice and latestTokenPrices should not contain nil values.
func (r *CommitReportingPlugin) calculatePriceUpdates(ctx context.Context, gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice map[uint64]update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice map[uint64]update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) {
var tokenPriceUpdates []cciptypes.TokenPrice
// Token prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows:
// If any token requires heartbeat update, include all token prices in the report.
// Otherwise, only include token prices that exceed deviation threshold.
needTokenHeartbeat := false
for token := range tokenPriceObs {
latestTokenPrice, exists := latestTokenPrices[token]
if !exists || time.Since(latestTokenPrice.timestamp) >= r.offchainConfig.TokenPriceHeartBeat {
r.lggr.Infow("Token requires heartbeat update", "token", token)
needTokenHeartbeat = true
break
}
}

for token, tokenPriceObservations := range tokenPriceObs {
medianPrice := ccipcalc.BigIntSortedMiddle(tokenPriceObservations)

if needTokenHeartbeat {
r.lggr.Debugw("Token price update included due to heartbeat", "token", token, "newPrice", medianPrice)
tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
continue
}

latestTokenPrice, exists := latestTokenPrices[token]
if exists {
tokenPriceUpdatedRecently := time.Since(latestTokenPrice.timestamp) < r.offchainConfig.TokenPriceHeartBeat
tokenPriceNotChanged := !ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB))
if tokenPriceUpdatedRecently && tokenPriceNotChanged {
r.lggr.Debugw("token price was updated recently, skipping the update",
if ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB)) {
r.lggr.Debugw("Token price update included due to deviation",
"token", token, "newPrice", medianPrice, "existingPrice", latestTokenPrice.value)
continue // skip the update if we recently had a price update close to the new value
tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
}
}

tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{
Token: token,
Value: medianPrice,
})
}

// Determinism required.
Expand All @@ -487,31 +505,49 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(ctx context.Context, gasPr
})

var gasPriceUpdate []cciptypes.GasPrice
// Gas prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows:
// If any source chain gas price requires heartbeat update, include all gas prices in the report.
// Otherwise, only include gas prices that exceed deviation threshold.
needGasHeartbeat := false
for chainSelector := range gasPriceObs {
latestGasPrice, exists := latestGasPrice[chainSelector]
if !exists || latestGasPrice.value == nil || time.Since(latestGasPrice.timestamp) >= r.offchainConfig.GasPriceHeartBeat {
r.lggr.Infow("Chain gas price requires heartbeat update", "chainSelector", chainSelector)
needGasHeartbeat = true
break
}
}

for chainSelector, gasPriceObservations := range gasPriceObs {
newGasPrice, err := r.gasPriceEstimator.Median(ctx, gasPriceObservations) // Compute the median price
newGasPrice, err := r.gasPriceEstimator.Median(gasPriceObservations) // Compute the median price
if err != nil {
return nil, nil, fmt.Errorf("failed to calculate median gas price for chain selector %d: %w", chainSelector, err)
}

// Default to updating so that we update if there are no prior updates.
if needGasHeartbeat {
r.lggr.Debugw("Gas price update included due to heartbeat", "chainSelector", chainSelector)
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
continue
}

latestGasPrice, exists := latestGasPrice[chainSelector]
if exists && latestGasPrice.value != nil {
gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(ctx, newGasPrice, latestGasPrice.value)
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(newGasPrice, latestGasPrice.value)
if err != nil {
return nil, nil, err
}
if gasPriceUpdatedRecently && !gasPriceDeviated {
r.lggr.Debugw("gas price was updated recently and not deviated sufficiently, skipping the update",
if gasPriceDeviated {
r.lggr.Debugw("Gas price update included due to deviation",
"chainSelector", chainSelector, "newPrice", newGasPrice, "existingPrice", latestGasPrice.value)
continue
gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}
}

gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{
DestChainSelector: chainSelector,
Value: newGasPrice,
})
}

sort.Slice(gasPriceUpdate, func(i, j int) bool {
Expand Down Expand Up @@ -708,7 +744,7 @@ func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger
return false
}

gasPriceDeviated, err := r.gasPriceEstimator.Deviates(ctx, gasPriceUpdate.Value, latestUpdate.value)
gasPriceDeviated, err := r.gasPriceEstimator.Deviates(gasPriceUpdate.Value, latestUpdate.value)
if err != nil {
lggr.Errorw("Gas price is stale because deviation check failed", "err", err)
return true
Expand Down
Loading
Loading