Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

Commit 3851f98

Browse files
committed
custom blocktracker
1 parent 4fbe4ba commit 3851f98

File tree

6 files changed

+616
-6
lines changed

6 files changed

+616
-6
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,3 +231,5 @@ require (
231231
gotest.tools/v3 v3.0.2 // indirect
232232
inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a // indirect
233233
)
234+
235+
replace github.com/umbracle/ethgo => github.com/igorcrevar/ethgo v0.1.1000

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,8 @@ github.com/huin/goupnp v1.1.0 h1:gEe0Dp/lZmPZiDFzJJaOfUpOvv2MKUkoBX8lDrn9vKU=
355355
github.com/huin/goupnp v1.1.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
356356
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
357357
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
358+
github.com/igorcrevar/ethgo v0.1.1000 h1:YOneC/vjuXYPoNNFi1gy48TRSuOpy3X1apLhHy+amYs=
359+
github.com/igorcrevar/ethgo v0.1.1000/go.mod h1:J+OZNfRCtbaYW3AEc0m47GhwAzlNJjcr9vO86nzOr6E=
358360
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
359361
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
360362
github.com/ipfs/boxo v0.8.1 h1:3DkKBCK+3rdEB5t77WDShUXXhktYwH99mkAsgajsKrU=
@@ -705,8 +707,6 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
705707
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
706708
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
707709
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
708-
github.com/umbracle/ethgo v0.1.4-0.20230810113823-c9c19bcd8a1e h1:CPUqjupBwC5EpV/eY/K+E65ZvFZkcXYgWf6KzypBQDY=
709-
github.com/umbracle/ethgo v0.1.4-0.20230810113823-c9c19bcd8a1e/go.mod h1:J+OZNfRCtbaYW3AEc0m47GhwAzlNJjcr9vO86nzOr6E=
710710
github.com/umbracle/fastrlp v0.1.1-0.20230504065717-58a1b8a9929d h1:HAg1Kpr9buwRxEiC2UXU9oT2AU8uCU7o3/WTH+Lt5wo=
711711
github.com/umbracle/fastrlp v0.1.1-0.20230504065717-58a1b8a9929d/go.mod h1:5RHgqiFjd4vLJESMWagP/E7su+5Gzk0iqqmrotR8WdA=
712712
github.com/umbracle/go-eth-bn256 v0.0.0-20230125114011-47cb310d9b0b h1:5/xofhZiOG0I9DQXqDSPxqYObk6QI7mBGMJI+ngyIgc=

tracker/blocktracker.go

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
package tracker
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/hashicorp/go-hclog"
9+
"github.com/umbracle/ethgo"
10+
bt "github.com/umbracle/ethgo/blocktracker"
11+
)
12+
13+
const (
14+
defaultMaxBlockBacklog = 10
15+
)
16+
17+
// BlockTracker is an interface to track new blocks on the chain
18+
type BlockTracker struct {
19+
config *Config
20+
blocks []*ethgo.Block
21+
blocksLock sync.Mutex
22+
subscriber bt.BlockTrackerInterface
23+
blockChs []chan *bt.BlockEvent
24+
blockChsLock sync.Mutex
25+
provider bt.BlockProvider
26+
closeCh chan struct{}
27+
once sync.Once
28+
logger hclog.Logger
29+
}
30+
31+
type Config struct {
32+
Tracker bt.BlockTrackerInterface
33+
MaxBlockBacklog uint64
34+
}
35+
36+
func DefaultConfig() *Config {
37+
return &Config{
38+
MaxBlockBacklog: defaultMaxBlockBacklog,
39+
}
40+
}
41+
42+
type ConfigOption func(*Config)
43+
44+
func WithBlockMaxBacklog(b uint64) ConfigOption {
45+
return func(c *Config) {
46+
c.MaxBlockBacklog = b
47+
}
48+
}
49+
50+
func WithTracker(b bt.BlockTrackerInterface) ConfigOption {
51+
return func(c *Config) {
52+
c.Tracker = b
53+
}
54+
}
55+
56+
func NewBlockTracker(provider bt.BlockProvider, logger hclog.Logger, opts ...ConfigOption) *BlockTracker {
57+
config := DefaultConfig()
58+
for _, opt := range opts {
59+
opt(config)
60+
}
61+
62+
tracker := config.Tracker
63+
if tracker == nil {
64+
tracker = bt.NewJSONBlockTracker(provider)
65+
}
66+
67+
return &BlockTracker{
68+
blocks: []*ethgo.Block{},
69+
blockChs: []chan *bt.BlockEvent{},
70+
config: config,
71+
subscriber: tracker,
72+
provider: provider,
73+
closeCh: make(chan struct{}),
74+
logger: logger,
75+
blocksLock: sync.Mutex{},
76+
blockChsLock: sync.Mutex{},
77+
once: sync.Once{},
78+
}
79+
}
80+
81+
func (t *BlockTracker) Subscribe() chan *bt.BlockEvent {
82+
t.blockChsLock.Lock()
83+
defer t.blockChsLock.Unlock()
84+
85+
ch := make(chan *bt.BlockEvent, 1)
86+
t.blockChs = append(t.blockChs, ch)
87+
88+
return ch
89+
}
90+
91+
func (t *BlockTracker) AcquireLock() bt.Lock {
92+
return bt.NewLock(&t.blocksLock)
93+
}
94+
95+
func (t *BlockTracker) Init() error {
96+
var (
97+
block *ethgo.Block
98+
err error
99+
i uint64
100+
)
101+
102+
t.once.Do(func() {
103+
block, err = t.provider.GetBlockByNumber(ethgo.Latest, false)
104+
if err != nil {
105+
return
106+
}
107+
108+
if block.Number == 0 {
109+
return
110+
}
111+
112+
blocks := make([]*ethgo.Block, t.config.MaxBlockBacklog)
113+
114+
for i = 0; i < t.config.MaxBlockBacklog; i++ {
115+
blocks[t.config.MaxBlockBacklog-i-1] = block
116+
if block.Number == 0 {
117+
break
118+
}
119+
120+
block, err = t.provider.GetBlockByHash(block.ParentHash, false)
121+
if err != nil {
122+
return
123+
} else if block == nil {
124+
// if block does not exist (for example reorg happened) GetBlockByHash will return nil, nil
125+
err = fmt.Errorf("block with hash %s not found", block.ParentHash)
126+
127+
return
128+
}
129+
}
130+
131+
if i != t.config.MaxBlockBacklog {
132+
// less than maxBacklog elements
133+
blocks = blocks[t.config.MaxBlockBacklog-i-1:]
134+
}
135+
136+
t.blocks = blocks
137+
})
138+
139+
return err
140+
}
141+
142+
func (t *BlockTracker) MaxBlockBacklog() uint64 {
143+
return t.config.MaxBlockBacklog
144+
}
145+
146+
func (t *BlockTracker) LastBlocked() *ethgo.Block {
147+
target := t.blocks[len(t.blocks)-1]
148+
if target == nil {
149+
return nil
150+
}
151+
152+
return target.Copy()
153+
}
154+
155+
func (t *BlockTracker) BlocksBlocked() []*ethgo.Block {
156+
res := make([]*ethgo.Block, len(t.blocks))
157+
for i, block := range t.blocks {
158+
res[i] = block.Copy()
159+
}
160+
161+
return res
162+
}
163+
164+
func (t *BlockTracker) Len() int {
165+
return len(t.blocks)
166+
}
167+
168+
func (t *BlockTracker) Close() error {
169+
close(t.closeCh)
170+
171+
return nil
172+
}
173+
174+
func (t *BlockTracker) Start() error {
175+
ctx, cancelFn := context.WithCancel(context.Background())
176+
go func() {
177+
<-t.closeCh
178+
cancelFn()
179+
}()
180+
181+
// start the polling
182+
return t.subscriber.Track(ctx, func(block *ethgo.Block) error {
183+
return t.HandleTrackedBlock(block)
184+
})
185+
}
186+
187+
func (t *BlockTracker) AddBlockLocked(block *ethgo.Block) error {
188+
if uint64(len(t.blocks)) == t.config.MaxBlockBacklog {
189+
// remove past blocks if there are more than maxReconcileBlocks
190+
t.blocks = t.blocks[1:]
191+
}
192+
193+
if len(t.blocks) != 0 {
194+
if lastNum := t.blocks[len(t.blocks)-1].Number; lastNum+1 != block.Number {
195+
return fmt.Errorf("bad number sequence. %d and %d", lastNum, block.Number)
196+
}
197+
}
198+
199+
t.blocks = append(t.blocks, block)
200+
201+
return nil
202+
}
203+
204+
func (t *BlockTracker) blockAtIndex(hash ethgo.Hash) int {
205+
for indx, b := range t.blocks {
206+
if b.Hash == hash {
207+
return indx
208+
}
209+
}
210+
211+
return -1
212+
}
213+
214+
func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block, int, error) {
215+
// The state is empty
216+
if len(t.blocks) == 0 {
217+
return []*ethgo.Block{block}, -1, nil
218+
}
219+
220+
// Append to the head of the chain
221+
if t.blocks[len(t.blocks)-1].Hash == block.ParentHash {
222+
return []*ethgo.Block{block}, -1, nil
223+
}
224+
225+
// The block already exists, but if not last, remove all the following
226+
if indx := t.blockAtIndex(block.Hash); indx != -1 {
227+
return nil, indx + 1, nil
228+
}
229+
230+
// Fork in the middle of the chain
231+
if indx := t.blockAtIndex(block.ParentHash); indx != -1 {
232+
return []*ethgo.Block{block}, indx + 1, nil
233+
}
234+
235+
// Backfill. We dont know the parent of the block.
236+
// Need to query the chain until we find a known block
237+
var (
238+
added = []*ethgo.Block{block}
239+
count uint64 = 0
240+
indx int
241+
err error
242+
)
243+
244+
for ; count < t.config.MaxBlockBacklog; count++ {
245+
hash := block.ParentHash
246+
247+
block, err = t.provider.GetBlockByHash(hash, false)
248+
if err != nil {
249+
return nil, -1, fmt.Errorf("parent with hash retrieving error: %w", err)
250+
} else if block == nil {
251+
// if block does not exist (for example reorg happened) GetBlockByHash will return nil, nil
252+
return nil, -1, fmt.Errorf("parent with hash %s not found", hash)
253+
}
254+
255+
added = append(added, block)
256+
257+
if indx = t.blockAtIndex(block.ParentHash); indx != -1 {
258+
indx++
259+
260+
break
261+
}
262+
}
263+
264+
// need the blocks in reverse order
265+
for i := len(added)/2 - 1; i >= 0; i-- {
266+
opp := len(added) - 1 - i
267+
added[i], added[opp] = added[opp], added[i]
268+
}
269+
270+
if count == t.config.MaxBlockBacklog {
271+
indx = 0 // remove all from the current state
272+
273+
t.logger.Info("reconcile did not found parent for new blocks", "hash", added[0].Hash)
274+
}
275+
276+
return added, indx, nil
277+
}
278+
279+
func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*bt.BlockEvent, error) {
280+
t.blocksLock.Lock()
281+
defer t.blocksLock.Unlock()
282+
283+
blocks, indx, err := t.handleReconcileImpl(block)
284+
if err != nil {
285+
return nil, err
286+
}
287+
288+
if len(blocks) == 0 {
289+
return nil, nil
290+
}
291+
292+
blockEvnt := &bt.BlockEvent{}
293+
294+
// there are some blocks to remove
295+
if indx >= 0 {
296+
for i := indx; i < len(t.blocks); i++ {
297+
blockEvnt.Removed = append(blockEvnt.Removed, t.blocks[i])
298+
}
299+
300+
t.blocks = t.blocks[:indx]
301+
}
302+
303+
// include the new blocks
304+
for _, block := range blocks {
305+
blockEvnt.Added = append(blockEvnt.Added, block)
306+
307+
if err := t.AddBlockLocked(block); err != nil {
308+
return nil, err
309+
}
310+
}
311+
312+
return blockEvnt, nil
313+
}
314+
315+
func (t *BlockTracker) HandleTrackedBlock(block *ethgo.Block) error {
316+
blockEvnt, err := t.HandleBlockEvent(block)
317+
if err != nil {
318+
return err
319+
}
320+
321+
if blockEvnt != nil {
322+
for _, x := range blockEvnt.Added {
323+
fmt.Println(x.Number, x.Hash)
324+
}
325+
326+
t.blockChsLock.Lock()
327+
defer t.blockChsLock.Unlock()
328+
329+
for _, ch := range t.blockChs {
330+
ch <- blockEvnt
331+
}
332+
}
333+
334+
return nil
335+
}

0 commit comments

Comments
 (0)