Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txpool: add support for set code transactions #31073

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ var (
// input transaction of non-blob type when a blob transaction from this sender
// remains pending (and vice-versa).
ErrAlreadyReserved = errors.New("address already reserved")

// ErrAuthorityReserved is returned if a transaction has an authorization
// signed by an address which already has in-flight transactions known to the
// pool.
ErrAuthorityReserved = errors.New("authority already reserved")
)
101 changes: 89 additions & 12 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"math"
"math/big"
"slices"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -217,12 +218,13 @@ type LegacyPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk

reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
auths map[common.Address][]*types.Transaction // All accounts with a pooled authorization
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for tracking the tx object itself, rather than the tx hash?


reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
Expand Down Expand Up @@ -254,6 +256,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
auths: make(map[common.Address][]*types.Transaction),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
Expand Down Expand Up @@ -611,7 +614,8 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
Accept: 0 |
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType,
1<<types.DynamicFeeTxType |
1<<types.SetCodeTxType,
MaxSize: txMaxSize,
MinTip: pool.gasTip.Load().ToBig(),
}
Expand Down Expand Up @@ -639,6 +643,10 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error {
if list := pool.queue[addr]; list != nil {
have += list.Len()
}
if pool.currentState.GetCode(addr) != nil {
// Allow at most one in-flight tx for delegated accounts.
return have, max(0, 1-have)
}
return have, math.MaxInt
},
ExistingExpenditure: func(addr common.Address) *big.Int {
Expand All @@ -655,6 +663,22 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error {
}
return nil
},
KnownConflicts: func(from common.Address, auths []common.Address) []common.Address {
var conflicts []common.Address
// The transaction sender cannot have an in-flight authorization.
if _, ok := pool.auths[from]; ok {
conflicts = append(conflicts, from)
}
// Authorities cannot conflict with any pending or queued transactions.
for _, addr := range auths {
if list := pool.pending[addr]; list != nil {
conflicts = append(conflicts, addr)
} else if list := pool.queue[addr]; list != nil {
conflicts = append(conflicts, addr)
}
}
return conflicts
},
}
if err := txpool.ValidateTransactionWithState(tx, pool.signer, opts); err != nil {
return err
Expand Down Expand Up @@ -786,12 +810,14 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pool.removeAuthorities(old)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
pool.addAuthorities(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
Expand All @@ -813,6 +839,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
localGauge.Inc(1)
}
pool.journalTx(from, tx)
pool.addAuthorities(tx)

log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil
Expand Down Expand Up @@ -860,6 +887,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.removeAuthorities(old)
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
} else {
Expand All @@ -879,6 +907,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
if _, exist := pool.beats[from]; !exist {
pool.beats[from] = time.Now()
}
pool.addAuthorities(tx)
return old != nil, nil
}

Expand Down Expand Up @@ -909,13 +938,15 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
pool.removeAuthorities(tx)
pool.priced.Removed(1)
pendingDiscardMeter.Mark(1)
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.removeAuthorities(old)
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
} else {
Expand Down Expand Up @@ -1129,6 +1160,9 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if pool.locals.contains(addr) {
localGauge.Dec(1)
}
// Remove any authorities the pool was tracking.
pool.removeAuthorities(tx)

// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
Expand Down Expand Up @@ -1162,6 +1196,43 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
return 0
}

// addAuthorities tracks the supplied tx in relation to each authority it
// specifies.
func (pool *LegacyPool) addAuthorities(tx *types.Transaction) {
for _, addr := range tx.Authorities() {
list, ok := pool.auths[addr]
if !ok {
list = []*types.Transaction{}
}
if slices.Contains(list, tx) {
// Don't add duplicates.
continue
}
list = append(list, tx)
pool.auths[addr] = list
}
}

// removeAuthorities stops tracking the supplied tx in relation to its
// authorities.
func (pool *LegacyPool) removeAuthorities(tx *types.Transaction) {
for _, addr := range tx.Authorities() {
// Remove tx from tracker.
list := pool.auths[addr]
if i := slices.Index(list, tx); i >= 0 {
list = append(list[:i], list[i+1:]...)
} else {
log.Error("Authority with untracked tx", "addr", addr, "hash", tx.Hash())
}
if len(list) == 0 {
// If list is newly empty, delete it entirely.
delete(pool.auths, addr)
continue
}
pool.auths[addr] = list
}
}

// requestReset requests a pool reset to the new head block.
// The returned channel is closed when the reset has occurred.
func (pool *LegacyPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
Expand Down Expand Up @@ -1461,15 +1532,15 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
pool.all.Remove(tx.Hash())
pool.removeAuthorities(tx)
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
pool.all.Remove(tx.Hash())
pool.removeAuthorities(tx)
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
Expand All @@ -1492,6 +1563,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
Expand Down Expand Up @@ -1557,6 +1629,7 @@ func (pool *LegacyPool) truncatePending() {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
Expand Down Expand Up @@ -1584,6 +1657,7 @@ func (pool *LegacyPool) truncatePending() {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(addr, tx.Nonce())
Expand Down Expand Up @@ -1664,14 +1738,16 @@ func (pool *LegacyPool) demoteUnexecutables() {
for _, tx := range olds {
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash)
pool.removeAuthorities(tx)
log.Trace("Removed unpayable pending transaction", "hash", hash)
}
pendingNofundsMeter.Mark(int64(len(drops)))

Expand Down Expand Up @@ -1995,6 +2071,7 @@ func (pool *LegacyPool) Clear() {
pool.pending = make(map[common.Address]*list)
pool.queue = make(map[common.Address]*list)
pool.pendingNonces = newNoncer(pool.currentState)
pool.auths = make(map[common.Address][]*types.Transaction)

if !pool.config.NoLocals && pool.config.Journal != "" {
pool.journal = newTxJournal(pool.config.Journal)
Expand Down
Loading
Loading