Skip to content

Commit 368e16f

Browse files
authored
core, eth, ethstats: simplify chain head events (ethereum#30601)
1 parent 15bf90e commit 368e16f

16 files changed

+48
-173
lines changed

core/blockchain.go

+9-15
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ type BlockChain struct {
224224
hc *HeaderChain
225225
rmLogsFeed event.Feed
226226
chainFeed event.Feed
227-
chainSideFeed event.Feed
228227
chainHeadFeed event.Feed
229228
logsFeed event.Feed
230229
blockProcFeed event.Feed
@@ -571,15 +570,14 @@ func (bc *BlockChain) SetHead(head uint64) error {
571570
}
572571
// Send chain head event to update the transaction pool
573572
header := bc.CurrentBlock()
574-
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
575-
if block == nil {
573+
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
576574
// This should never happen. In practice, previously currentBlock
577575
// contained the entire block whereas now only a "marker", so there
578576
// is an ever so slight chance for a race we should handle.
579577
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
580578
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
581579
}
582-
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
580+
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
583581
return nil
584582
}
585583

@@ -593,15 +591,14 @@ func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error {
593591
}
594592
// Send chain head event to update the transaction pool
595593
header := bc.CurrentBlock()
596-
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
597-
if block == nil {
594+
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
598595
// This should never happen. In practice, previously currentBlock
599596
// contained the entire block whereas now only a "marker", so there
600597
// is an ever so slight chance for a race we should handle.
601598
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
602599
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
603600
}
604-
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
601+
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
605602
return nil
606603
}
607604

@@ -1552,7 +1549,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
15521549
// Set new head.
15531550
bc.writeHeadBlock(block)
15541551

1555-
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
1552+
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
15561553
if len(logs) > 0 {
15571554
bc.logsFeed.Send(logs)
15581555
}
@@ -1562,7 +1559,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
15621559
// we will fire an accumulated ChainHeadEvent and disable fire
15631560
// event here.
15641561
if emitHeadEvent {
1565-
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
1562+
bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()})
15661563
}
15671564
return CanonStatTy, nil
15681565
}
@@ -1627,7 +1624,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
16271624
// Fire a single chain head event if we've progressed the chain
16281625
defer func() {
16291626
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
1630-
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
1627+
bc.chainHeadFeed.Send(ChainHeadEvent{Header: lastCanon.Header()})
16311628
}
16321629
}()
16331630
// Start the parallel header verifier
@@ -2328,9 +2325,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
23282325
// Deleted logs + blocks:
23292326
var deletedLogs []*types.Log
23302327
for i := len(oldChain) - 1; i >= 0; i-- {
2331-
// Also send event for blocks removed from the canon chain.
2332-
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
2333-
23342328
// Collect deleted logs for notification
23352329
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
23362330
deletedLogs = append(deletedLogs, logs...)
@@ -2403,11 +2397,11 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
24032397

24042398
// Emit events
24052399
logs := bc.collectLogs(head, false)
2406-
bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
2400+
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
24072401
if len(logs) > 0 {
24082402
bc.logsFeed.Send(logs)
24092403
}
2410-
bc.chainHeadFeed.Send(ChainHeadEvent{Block: head})
2404+
bc.chainHeadFeed.Send(ChainHeadEvent{Header: head.Header()})
24112405

24122406
context := []interface{}{
24132407
"number", head.Number(),

core/blockchain_reader.go

-5
Original file line numberDiff line numberDiff line change
@@ -430,11 +430,6 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
430430
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
431431
}
432432

433-
// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
434-
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
435-
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
436-
}
437-
438433
// SubscribeLogsEvent registers a subscription of []*types.Log.
439434
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
440435
return bc.scope.Track(bc.logsFeed.Subscribe(ch))

core/blockchain_test.go

-79
Original file line numberDiff line numberDiff line change
@@ -1332,85 +1332,6 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan Re
13321332
}
13331333
}
13341334

1335-
func TestReorgSideEvent(t *testing.T) {
1336-
testReorgSideEvent(t, rawdb.HashScheme)
1337-
testReorgSideEvent(t, rawdb.PathScheme)
1338-
}
1339-
1340-
func testReorgSideEvent(t *testing.T, scheme string) {
1341-
var (
1342-
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
1343-
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
1344-
gspec = &Genesis{
1345-
Config: params.TestChainConfig,
1346-
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000000)}},
1347-
}
1348-
signer = types.LatestSigner(gspec.Config)
1349-
)
1350-
blockchain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
1351-
defer blockchain.Stop()
1352-
1353-
_, chain, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 3, func(i int, gen *BlockGen) {})
1354-
if _, err := blockchain.InsertChain(chain); err != nil {
1355-
t.Fatalf("failed to insert chain: %v", err)
1356-
}
1357-
1358-
_, replacementBlocks, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 4, func(i int, gen *BlockGen) {
1359-
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, nil), signer, key1)
1360-
if i == 2 {
1361-
gen.OffsetTime(-9)
1362-
}
1363-
if err != nil {
1364-
t.Fatalf("failed to create tx: %v", err)
1365-
}
1366-
gen.AddTx(tx)
1367-
})
1368-
chainSideCh := make(chan ChainSideEvent, 64)
1369-
blockchain.SubscribeChainSideEvent(chainSideCh)
1370-
if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
1371-
t.Fatalf("failed to insert chain: %v", err)
1372-
}
1373-
1374-
expectedSideHashes := map[common.Hash]bool{
1375-
chain[0].Hash(): true,
1376-
chain[1].Hash(): true,
1377-
chain[2].Hash(): true,
1378-
}
1379-
1380-
i := 0
1381-
1382-
const timeoutDura = 10 * time.Second
1383-
timeout := time.NewTimer(timeoutDura)
1384-
done:
1385-
for {
1386-
select {
1387-
case ev := <-chainSideCh:
1388-
block := ev.Block
1389-
if _, ok := expectedSideHashes[block.Hash()]; !ok {
1390-
t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash())
1391-
}
1392-
i++
1393-
1394-
if i == len(expectedSideHashes) {
1395-
timeout.Stop()
1396-
1397-
break done
1398-
}
1399-
timeout.Reset(timeoutDura)
1400-
1401-
case <-timeout.C:
1402-
t.Fatalf("Timeout. Possibly not all blocks were triggered for sideevent: %v", i)
1403-
}
1404-
}
1405-
1406-
// make sure no more events are fired
1407-
select {
1408-
case e := <-chainSideCh:
1409-
t.Errorf("unexpected event fired: %v", e)
1410-
case <-time.After(250 * time.Millisecond):
1411-
}
1412-
}
1413-
14141335
// Tests if the canonical block can be fetched from the database during chain insertion.
14151336
func TestCanonicalBlockRetrieval(t *testing.T) {
14161337
testCanonicalBlockRetrieval(t, rawdb.HashScheme)

core/chain_indexer.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -222,20 +222,19 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainH
222222
errc <- nil
223223
return
224224
}
225-
header := ev.Block.Header()
226-
if header.ParentHash != prevHash {
225+
if ev.Header.ParentHash != prevHash {
227226
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
228227
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
229228

230229
if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
231-
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
230+
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil {
232231
c.newHead(h.Number.Uint64(), true)
233232
}
234233
}
235234
}
236-
c.newHead(header.Number.Uint64(), false)
235+
c.newHead(ev.Header.Number.Uint64(), false)
237236

238-
prevHeader, prevHash = header, header.Hash()
237+
prevHeader, prevHash = ev.Header, ev.Header.Hash()
239238
}
240239
}
241240
}

core/events.go

+3-11
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,19 @@
1717
package core
1818

1919
import (
20-
"github.com/ethereum/go-ethereum/common"
2120
"github.com/ethereum/go-ethereum/core/types"
2221
)
2322

2423
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
2524
type NewTxsEvent struct{ Txs []*types.Transaction }
2625

27-
// NewMinedBlockEvent is posted when a block has been imported.
28-
type NewMinedBlockEvent struct{ Block *types.Block }
29-
3026
// RemovedLogsEvent is posted when a reorg happens
3127
type RemovedLogsEvent struct{ Logs []*types.Log }
3228

3329
type ChainEvent struct {
34-
Block *types.Block
35-
Hash common.Hash
36-
Logs []*types.Log
30+
Header *types.Header
3731
}
3832

39-
type ChainSideEvent struct {
40-
Block *types.Block
33+
type ChainHeadEvent struct {
34+
Header *types.Header
4135
}
42-
43-
type ChainHeadEvent struct{ Block *types.Block }

core/txindexer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,9 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
151151
if done == nil {
152152
stop = make(chan struct{})
153153
done = make(chan struct{})
154-
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
154+
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Header.Number.Uint64(), stop, done)
155155
}
156-
lastHead = head.Block.NumberU64()
156+
lastHead = head.Header.Number.Uint64()
157157
case <-done:
158158
stop = nil
159159
done = nil

core/txpool/txpool.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
243243
select {
244244
case event := <-newHeadCh:
245245
// Chain moved forward, store the head for later consumption
246-
newHead = event.Block.Header()
246+
newHead = event.Header
247247

248248
case head := <-resetDone:
249249
// Previous reset finished, update the old head and allow a new reset

eth/api_backend.go

-4
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,6 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
275275
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
276276
}
277277

278-
func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
279-
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
280-
}
281-
282278
func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
283279
return b.eth.BlockChain().SubscribeLogsEvent(ch)
284280
}

eth/catalyst/simulated_beacon_test.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,16 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
123123
timer := time.NewTimer(12 * time.Second)
124124
for {
125125
select {
126-
case evt := <-chainHeadCh:
127-
for _, includedTx := range evt.Block.Transactions() {
126+
case ev := <-chainHeadCh:
127+
block := ethService.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
128+
for _, includedTx := range block.Transactions() {
128129
includedTxs[includedTx.Hash()] = struct{}{}
129130
}
130-
for _, includedWithdrawal := range evt.Block.Withdrawals() {
131+
for _, includedWithdrawal := range block.Withdrawals() {
131132
includedWithdrawals = append(includedWithdrawals, includedWithdrawal.Index)
132133
}
133-
134134
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
135-
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && evt.Block.Number().Cmp(big.NewInt(2)) == 0 {
135+
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && ev.Header.Number.Cmp(big.NewInt(2)) == 0 {
136136
return
137137
}
138138
case <-timer.C:
@@ -186,11 +186,12 @@ func TestOnDemandSpam(t *testing.T) {
186186
)
187187
for {
188188
select {
189-
case evt := <-chainHeadCh:
190-
for _, itx := range evt.Block.Transactions() {
189+
case ev := <-chainHeadCh:
190+
block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
191+
for _, itx := range block.Transactions() {
191192
includedTxs[itx.Hash()] = struct{}{}
192193
}
193-
for _, iwx := range evt.Block.Withdrawals() {
194+
for _, iwx := range block.Withdrawals() {
194195
includedWxs = append(includedWxs, iwx.Index)
195196
}
196197
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10

eth/filters/filter_system.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)
391391

392392
func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
393393
for _, f := range filters[BlocksSubscription] {
394-
f.headers <- ev.Block.Header()
394+
f.headers <- ev.Header
395395
}
396396
}
397397

eth/filters/filter_system_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func TestBlockSubscription(t *testing.T) {
200200
)
201201

202202
for _, blk := range chain {
203-
chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
203+
chainEvents = append(chainEvents, core.ChainEvent{Header: blk.Header()})
204204
}
205205

206206
chan0 := make(chan *types.Header)
@@ -213,13 +213,13 @@ func TestBlockSubscription(t *testing.T) {
213213
for i1 != len(chainEvents) || i2 != len(chainEvents) {
214214
select {
215215
case header := <-chan0:
216-
if chainEvents[i1].Hash != header.Hash() {
217-
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
216+
if chainEvents[i1].Header.Hash() != header.Hash() {
217+
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Header.Hash(), header.Hash())
218218
}
219219
i1++
220220
case header := <-chan1:
221-
if chainEvents[i2].Hash != header.Hash() {
222-
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
221+
if chainEvents[i2].Header.Hash() != header.Hash() {
222+
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Header.Hash(), header.Hash())
223223
}
224224
i2++
225225
}

eth/gasprice/gasprice.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ func NewOracle(backend OracleBackend, params Config, startPrice *big.Int) *Oracl
124124
go func() {
125125
var lastHead common.Hash
126126
for ev := range headEvent {
127-
if ev.Block.ParentHash() != lastHead {
127+
if ev.Header.ParentHash != lastHead {
128128
cache.Purge()
129129
}
130-
lastHead = ev.Block.Hash()
130+
lastHead = ev.Header.Hash()
131131
}
132132
}()
133133

0 commit comments

Comments
 (0)