Skip to content

core/txpool/blobpool: upgrade blobpool #32146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 27 additions & 42 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

// Included transactions blobs need to be moved to the limbo
if filled && inclusions != nil {
p.offload(addr, txs[i].nonce, txs[i].id, inclusions)
p.offload(addr, txs[i], inclusions)
}
}
delete(p.index, addr)
Expand All @@ -566,9 +566,13 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids)
dropFilledMeter.Mark(int64(len(ids)))
}
for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)

// If the txs were recorded in the limbo, we don't delete them.
if !(filled && inclusions != nil) {
for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}
}
}
return
Expand All @@ -590,16 +594,19 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

// Included transactions blobs need to be moved to the limbo
if inclusions != nil {
p.offload(addr, txs[0].nonce, txs[0].id, inclusions)
p.offload(addr, txs[0], inclusions)
}
txs = txs[1:]
}
log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs))
dropOverlappedMeter.Mark(int64(len(ids)))

for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
// If the txs were recorded in the limbo, we don't delete them.
if inclusions == nil {
for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}
}
}
p.index[addr] = txs
Expand Down Expand Up @@ -769,23 +776,13 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
// any of it since there's no clear error case. Some errors may be due to coding
// issues, others caused by signers mining MEV stuff or swapping transactions. In
// all cases, the pool needs to continue operating.
func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) {
data, err := p.store.Get(id)
if err != nil {
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
var tx types.Transaction
if err = rlp.DecodeBytes(data, &tx); err != nil {
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
block, ok := inclusions[tx.Hash()]
func (p *BlobPool) offload(addr common.Address, blobTxMeta *blobTxMeta, inclusions map[common.Hash]uint64) {
block, ok := inclusions[blobTxMeta.hash]
if !ok {
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", blobTxMeta.nonce, "id", blobTxMeta.id)
return
}
if err := p.limbo.push(&tx, block); err != nil {
if err := p.limbo.push(blobTxMeta, block); err != nil {
log.Warn("Failed to offload blob tx into limbo", "err", err)
return
}
Expand Down Expand Up @@ -831,8 +828,13 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
}
}
// Flush out any blobs from limbo that are older than the latest finality
// and also delete the txs from the store.
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
p.limbo.finalize(p.chain.CurrentFinalBlock())
p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err)
}
})
}
// Reset the price heap for the new set of basefee/blobfee pairs
var (
Expand Down Expand Up @@ -986,31 +988,14 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
tx, err := p.limbo.pull(txhash)
meta, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return err
}
// TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here.

// Serialize the transaction back into the primary datastore.
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err
}
id, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return err
}

// Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err)
return err
}
p.index[addr] = []*blobTxMeta{meta}
Expand Down
98 changes: 41 additions & 57 deletions core/txpool/blobpool/limbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,25 @@ import (
// to which it belongs as well as the block number in which it was included for
// finality eviction.
type limboBlob struct {
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
Block uint64 // Block in which the blob transaction was included
Tx *types.Transaction
Block uint64 // Block in which the blob transaction was included
BlobTxMeta *blobTxMeta
}

// limbo is a light, indexed database to temporarily store recently included
// blobs until they are finalized. The purpose is to support small reorgs, which
// would require pulling back up old blobs (which aren't part of the chain).
//
// TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without?
type limbo struct {
store billy.Database // Persistent data store for limboed blobs

index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids
groups map[uint64]map[uint64]common.Hash // Set of txs included in past blocks
index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids
limbos map[uint64]*limboBlob // Mappings from datastore ids to limboBlobs
}

// newLimbo opens and indexes a set of limboed blob transactions.
func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) {
l := &limbo{
index: make(map[common.Hash]uint64),
groups: make(map[uint64]map[uint64]common.Hash),
limbos: make(map[uint64]*limboBlob),
}
// Index all limboed blobs on disk and delete anything unprocessable
var fails []uint64
Expand Down Expand Up @@ -94,56 +91,55 @@ func (l *limbo) parseBlob(id uint64, data []byte) error {
log.Error("Failed to decode blob limbo entry", "id", id, "err", err)
return err
}
if _, ok := l.index[item.TxHash]; ok {
txHash := item.BlobTxMeta.hash
if _, ok := l.index[txHash]; ok {
// This path is impossible, unless due to a programming error a blob gets
// inserted into the limbo which was already part of if. Recover gracefully
// by ignoring this data entry.
log.Error("Dropping duplicate blob limbo entry", "owner", item.TxHash, "id", id)
log.Error("Dropping duplicate blob limbo entry", "owner", txHash, "id", id)
return errors.New("duplicate blob")
}
l.index[item.TxHash] = id

if _, ok := l.groups[item.Block]; !ok {
l.groups[item.Block] = make(map[uint64]common.Hash)
}
l.groups[item.Block][id] = item.TxHash
l.index[txHash] = id
l.limbos[id] = item

return nil
}

// finalize evicts all blobs belonging to a recently finalized block or older.
func (l *limbo) finalize(final *types.Header) {
func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.Hash)) {
// Just in case there's no final block yet (network not yet merged, weird
// restart, sethead, etc), fail gracefully.
if final == nil {
log.Warn("Nil finalized block cannot evict old blobs")
return
}
for block, ids := range l.groups {
if block > final.Number.Uint64() {
for id, item := range l.limbos {
if item.Block > final.Number.Uint64() {
continue
}
for id, owner := range ids {
if err := l.store.Delete(id); err != nil {
log.Error("Failed to drop finalized blob", "block", block, "id", id, "err", err)
}
delete(l.index, owner)
// Delete limbo metadata.
if err := l.store.Delete(id); err != nil {
log.Error("Failed to drop finalized blob", "block", item.Block, "id", id, "err", err)
}
delete(l.index, item.BlobTxMeta.hash)
delete(l.limbos, id)
if fn != nil { // Delete blob tx if fn is not null.
fn(item.BlobTxMeta.id, item.BlobTxMeta.hash)
}
delete(l.groups, block)
}
}

// push stores a new blob transaction into the limbo, waiting until finality for
// it to be automatically evicted.
func (l *limbo) push(tx *types.Transaction, block uint64) error {
func (l *limbo) push(metaData *blobTxMeta, block uint64) error {
// If the blobs are already tracked by the limbo, consider it a programming
// error. There's not much to do against it, but be loud.
if _, ok := l.index[tx.Hash()]; ok {
log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash())
if _, ok := l.index[metaData.hash]; ok {
log.Error("Limbo cannot push already tracked blobs", "tx", metaData.hash)
return errors.New("already tracked blob transaction")
}
if err := l.setAndIndex(tx, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err)
if err := l.setAndIndex(metaData, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", metaData.hash, "err", err)
return err
}
return nil
Expand All @@ -152,7 +148,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
// pull retrieves a previously pushed set of blobs back from the limbo, removing
// it at the same time. This method should be used when a previously included blob
// transaction gets reorged out.
func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
func (l *limbo) pull(tx common.Hash) (*blobTxMeta, error) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
Expand All @@ -166,7 +162,10 @@ func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
return nil, err
}
return item.Tx, nil
meta := item.BlobTxMeta
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
return meta, nil
}

// update changes the block number under which a blob transaction is tracked. This
Expand All @@ -187,7 +186,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
}
// If there was no change in the blob's inclusion block, don't mess around
// with heavy database operations.
if _, ok := l.groups[block][id]; ok {
if item, ok := l.limbos[id]; ok && item.Block == block {
log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block)
return
}
Expand All @@ -198,7 +197,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err)
return
}
if err := l.setAndIndex(item.Tx, block); err != nil {
if err := l.setAndIndex(item.BlobTxMeta, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
return
}
Expand All @@ -208,19 +207,9 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
// getAndDrop retrieves a blob item from the limbo store and deletes it both from
// the store and indices.
func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
data, err := l.store.Get(id)
if err != nil {
return nil, err
}
item := new(limboBlob)
if err = rlp.DecodeBytes(data, item); err != nil {
return nil, err
}
delete(l.index, item.TxHash)
delete(l.groups[item.Block], id)
if len(l.groups[item.Block]) == 0 {
delete(l.groups, item.Block)
}
item := l.limbos[id]
delete(l.index, item.BlobTxMeta.hash)
delete(l.limbos, id)
if err := l.store.Delete(id); err != nil {
return nil, err
}
Expand All @@ -229,12 +218,10 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {

// setAndIndex assembles a limbo blob database entry and stores it, also updating
// the in-memory indices.
func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
txhash := tx.Hash()
func (l *limbo) setAndIndex(metaData *blobTxMeta, block uint64) error {
item := &limboBlob{
TxHash: txhash,
Block: block,
Tx: tx,
Block: block,
BlobTxMeta: metaData,
}
data, err := rlp.EncodeToBytes(item)
if err != nil {
Expand All @@ -244,10 +231,7 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
if err != nil {
return err
}
l.index[txhash] = id
if _, ok := l.groups[block]; !ok {
l.groups[block] = make(map[uint64]common.Hash)
}
l.groups[block][id] = txhash
l.index[item.BlobTxMeta.hash] = id
l.limbos[id] = item
return nil
}