Skip to content

Commit 88c8459

Browse files
eth/fetcher: fix blob transaction propagation (ethereum#30125)
This PR fixes an issue with blob transaction propagation due to the blob transation txpool rejecting transactions with gapped nonces. The specific changes are: - fetch transactions from a peer in the order they were announced to minimize nonce-gaps (which cause blob txs to be rejected - don't wait on fetching blob transactions after announcement is received, since they are not broadcast Testing: - unit tests updated to reflect that fetch order should always match tx announcement order - unit test added to confirm blob transactions are scheduled immediately for fetching - running the PR on an eth mainnet full node without incident so far --------- Signed-off-by: Roberto Bayardo <[email protected]> Co-authored-by: Gary Rong <[email protected]>
1 parent 8f4fac7 commit 88c8459

File tree

3 files changed

+193
-83
lines changed

3 files changed

+193
-83
lines changed

cmd/devp2p/internal/ethtest/suite.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,16 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
849849
if code, _, err := conn.Read(); err != nil {
850850
t.Fatalf("expected disconnect on blob violation, got err: %v", err)
851851
} else if code != discMsg {
852-
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
852+
if code == protoOffset(ethProto)+eth.NewPooledTransactionHashesMsg {
853+
// sometimes we'll get a blob transaction hashes announcement before the disconnect
854+
// because blob transactions are scheduled to be fetched right away.
855+
if code, _, err = conn.Read(); err != nil {
856+
t.Fatalf("expected disconnect on blob violation, got err on second read: %v", err)
857+
}
858+
}
859+
if code != discMsg {
860+
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
861+
}
853862
}
854863
conn.Close()
855864
}

eth/fetcher/tx_fetcher.go

+104-73
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package fetcher
1818

1919
import (
20-
"bytes"
2120
"errors"
2221
"fmt"
2322
"math"
@@ -35,7 +34,7 @@ import (
3534
)
3635

3736
const (
38-
// maxTxAnnounces is the maximum number of unique transaction a peer
37+
// maxTxAnnounces is the maximum number of unique transactions a peer
3938
// can announce in a short time.
4039
maxTxAnnounces = 4096
4140

@@ -114,16 +113,23 @@ var errTerminated = errors.New("terminated")
114113
type txAnnounce struct {
115114
origin string // Identifier of the peer originating the notification
116115
hashes []common.Hash // Batch of transaction hashes being announced
117-
metas []*txMetadata // Batch of metadata associated with the hashes
116+
metas []txMetadata // Batch of metadata associated with the hashes
118117
}
119118

120-
// txMetadata is a set of extra data transmitted along the announcement for better
121-
// fetch scheduling.
119+
// txMetadata provides the extra data transmitted along with the announcement
120+
// for better fetch scheduling.
122121
type txMetadata struct {
123122
kind byte // Transaction consensus type
124123
size uint32 // Transaction size in bytes
125124
}
126125

126+
// txMetadataWithSeq is a wrapper of transaction metadata with an extra field
127+
// tracking the transaction sequence number.
128+
type txMetadataWithSeq struct {
129+
txMetadata
130+
seq uint64
131+
}
132+
127133
// txRequest represents an in-flight transaction retrieval request destined to
128134
// a specific peers.
129135
type txRequest struct {
@@ -159,7 +165,7 @@ type txDrop struct {
159165
// The invariants of the fetcher are:
160166
// - Each tracked transaction (hash) must only be present in one of the
161167
// three stages. This ensures that the fetcher operates akin to a finite
162-
// state automata and there's do data leak.
168+
// state automata and there's no data leak.
163169
// - Each peer that announced transactions may be scheduled retrievals, but
164170
// only ever one concurrently. This ensures we can immediately know what is
165171
// missing from a reply and reschedule it.
@@ -169,18 +175,19 @@ type TxFetcher struct {
169175
drop chan *txDrop
170176
quit chan struct{}
171177

178+
txSeq uint64 // Unique transaction sequence number
172179
underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)
173180

174181
// Stage 1: Waiting lists for newly discovered transactions that might be
175182
// broadcast without needing explicit request/reply round trips.
176-
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
177-
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
178-
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)
183+
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
184+
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
185+
waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)
179186

180187
// Stage 2: Queue of transactions that waiting to be allocated to some peer
181188
// to be retrieved directly.
182-
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
183-
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
189+
announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
190+
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
184191

185192
// Stage 3: Set of transactions currently being retrieved, some which may be
186193
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
@@ -218,8 +225,8 @@ func NewTxFetcherForTests(
218225
quit: make(chan struct{}),
219226
waitlist: make(map[common.Hash]map[string]struct{}),
220227
waittime: make(map[common.Hash]mclock.AbsTime),
221-
waitslots: make(map[string]map[common.Hash]*txMetadata),
222-
announces: make(map[string]map[common.Hash]*txMetadata),
228+
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
229+
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
223230
announced: make(map[common.Hash]map[string]struct{}),
224231
fetching: make(map[common.Hash]string),
225232
requests: make(map[string]*txRequest),
@@ -247,7 +254,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
247254
// loop, so anything caught here is time saved internally.
248255
var (
249256
unknownHashes = make([]common.Hash, 0, len(hashes))
250-
unknownMetas = make([]*txMetadata, 0, len(hashes))
257+
unknownMetas = make([]txMetadata, 0, len(hashes))
251258

252259
duplicate int64
253260
underpriced int64
@@ -264,7 +271,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
264271
// Transaction metadata has been available since eth68, and all
265272
// legacy eth protocols (prior to eth68) have been deprecated.
266273
// Therefore, metadata is always expected in the announcement.
267-
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
274+
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
268275
}
269276
}
270277
txAnnounceKnownMeter.Mark(duplicate)
@@ -431,9 +438,19 @@ func (f *TxFetcher) loop() {
431438
ann.metas = ann.metas[:want-maxTxAnnounces]
432439
}
433440
// All is well, schedule the remainder of the transactions
434-
idleWait := len(f.waittime) == 0
435-
_, oldPeer := f.announces[ann.origin]
436-
441+
var (
442+
idleWait = len(f.waittime) == 0
443+
_, oldPeer = f.announces[ann.origin]
444+
hasBlob bool
445+
446+
// nextSeq returns the next available sequence number for tagging
447+
// transaction announcement and also bump it internally.
448+
nextSeq = func() uint64 {
449+
seq := f.txSeq
450+
f.txSeq++
451+
return seq
452+
}
453+
)
437454
for i, hash := range ann.hashes {
438455
// If the transaction is already downloading, add it to the list
439456
// of possible alternates (in case the current retrieval fails) and
@@ -443,9 +460,17 @@ func (f *TxFetcher) loop() {
443460

444461
// Stage 2 and 3 share the set of origins per tx
445462
if announces := f.announces[ann.origin]; announces != nil {
446-
announces[hash] = ann.metas[i]
463+
announces[hash] = &txMetadataWithSeq{
464+
txMetadata: ann.metas[i],
465+
seq: nextSeq(),
466+
}
447467
} else {
448-
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
468+
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
469+
hash: {
470+
txMetadata: ann.metas[i],
471+
seq: nextSeq(),
472+
},
473+
}
449474
}
450475
continue
451476
}
@@ -456,9 +481,17 @@ func (f *TxFetcher) loop() {
456481

457482
// Stage 2 and 3 share the set of origins per tx
458483
if announces := f.announces[ann.origin]; announces != nil {
459-
announces[hash] = ann.metas[i]
484+
announces[hash] = &txMetadataWithSeq{
485+
txMetadata: ann.metas[i],
486+
seq: nextSeq(),
487+
}
460488
} else {
461-
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
489+
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
490+
hash: {
491+
txMetadata: ann.metas[i],
492+
seq: nextSeq(),
493+
},
494+
}
462495
}
463496
continue
464497
}
@@ -475,24 +508,47 @@ func (f *TxFetcher) loop() {
475508
f.waitlist[hash][ann.origin] = struct{}{}
476509

477510
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
478-
waitslots[hash] = ann.metas[i]
511+
waitslots[hash] = &txMetadataWithSeq{
512+
txMetadata: ann.metas[i],
513+
seq: nextSeq(),
514+
}
479515
} else {
480-
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
516+
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
517+
hash: {
518+
txMetadata: ann.metas[i],
519+
seq: nextSeq(),
520+
},
521+
}
481522
}
482523
continue
483524
}
484525
// Transaction unknown to the fetcher, insert it into the waiting list
485526
f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
486-
f.waittime[hash] = f.clock.Now()
487527

528+
// Assign the current timestamp as the wait time, but for blob transactions,
529+
// skip the wait time since they are only announced.
530+
if ann.metas[i].kind != types.BlobTxType {
531+
f.waittime[hash] = f.clock.Now()
532+
} else {
533+
hasBlob = true
534+
f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout)
535+
}
488536
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
489-
waitslots[hash] = ann.metas[i]
537+
waitslots[hash] = &txMetadataWithSeq{
538+
txMetadata: ann.metas[i],
539+
seq: nextSeq(),
540+
}
490541
} else {
491-
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
542+
f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
543+
hash: {
544+
txMetadata: ann.metas[i],
545+
seq: nextSeq(),
546+
},
547+
}
492548
}
493549
}
494550
// If a new item was added to the waitlist, schedule it into the fetcher
495-
if idleWait && len(f.waittime) > 0 {
551+
if hasBlob || (idleWait && len(f.waittime) > 0) {
496552
f.rescheduleWait(waitTimer, waitTrigger)
497553
}
498554
// If this peer is new and announced something already queued, maybe
@@ -516,7 +572,7 @@ func (f *TxFetcher) loop() {
516572
if announces := f.announces[peer]; announces != nil {
517573
announces[hash] = f.waitslots[peer][hash]
518574
} else {
519-
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
575+
f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]}
520576
}
521577
delete(f.waitslots[peer], hash)
522578
if len(f.waitslots[peer]) == 0 {
@@ -873,7 +929,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
873929
hashes = make([]common.Hash, 0, maxTxRetrievals)
874930
bytes uint64
875931
)
876-
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
932+
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta txMetadata) bool {
877933
// If the transaction is already fetching, skip to the next one
878934
if _, ok := f.fetching[hash]; ok {
879935
return true
@@ -938,28 +994,26 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
938994
}
939995
}
940996

941-
// forEachAnnounce does a range loop over a map of announcements in production,
942-
// but during testing it does a deterministic sorted random to allow reproducing
943-
// issues.
944-
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
945-
// If we're running production, use whatever Go's map gives us
946-
if f.rand == nil {
947-
for hash, meta := range announces {
948-
if !do(hash, meta) {
949-
return
950-
}
951-
}
952-
return
997+
// forEachAnnounce loops over the given announcements in arrival order, invoking
998+
// the do function for each until it returns false. We enforce an arrival
999+
// ordering to minimize the chances of transaction nonce-gaps, which result in
1000+
// transactions being rejected by the txpool.
1001+
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) {
1002+
type announcement struct {
1003+
hash common.Hash
1004+
meta txMetadata
1005+
seq uint64
9531006
}
954-
// We're running the test suite, make iteration deterministic
955-
list := make([]common.Hash, 0, len(announces))
956-
for hash := range announces {
957-
list = append(list, hash)
1007+
// Process announcements by their arrival order
1008+
list := make([]announcement, 0, len(announces))
1009+
for hash, entry := range announces {
1010+
list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq})
9581011
}
959-
sortHashes(list)
960-
rotateHashes(list, f.rand.Intn(len(list)))
961-
for _, hash := range list {
962-
if !do(hash, announces[hash]) {
1012+
sort.Slice(list, func(i, j int) bool {
1013+
return list[i].seq < list[j].seq
1014+
})
1015+
for i := range list {
1016+
if !do(list[i].hash, list[i].meta) {
9631017
return
9641018
}
9651019
}
@@ -975,26 +1029,3 @@ func rotateStrings(slice []string, n int) {
9751029
slice[i] = orig[(i+n)%len(orig)]
9761030
}
9771031
}
978-
979-
// sortHashes sorts a slice of hashes. This method is only used in tests in order
980-
// to simulate random map iteration but keep it deterministic.
981-
func sortHashes(slice []common.Hash) {
982-
for i := 0; i < len(slice); i++ {
983-
for j := i + 1; j < len(slice); j++ {
984-
if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
985-
slice[i], slice[j] = slice[j], slice[i]
986-
}
987-
}
988-
}
989-
}
990-
991-
// rotateHashes rotates the contents of a slice by n steps. This method is only
992-
// used in tests to simulate random map iteration but keep it deterministic.
993-
func rotateHashes(slice []common.Hash, n int) {
994-
orig := make([]common.Hash, len(slice))
995-
copy(orig, slice)
996-
997-
for i := 0; i < len(orig); i++ {
998-
slice[i] = orig[(i+n)%len(orig)]
999-
}
1000-
}

0 commit comments

Comments
 (0)