Skip to content

Commit 4c8fe4a

Browse files
committed
feat: make ChainHead api local
1 parent 5cd13d3 commit 4c8fe4a

File tree

8 files changed

+56
-54
lines changed

8 files changed

+56
-54
lines changed

api/api.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ type Proxy interface {
5656
// between the two objects.
5757
ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) //perm:read
5858

59-
// ChainHead returns the current head of the chain.
60-
ChainHead(context.Context) (*types.TipSet, error) //perm:read
61-
6259
// ChainGetBlock returns the block specified by the given CID.
6360
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) //perm:read
6461

@@ -385,6 +382,7 @@ type Local interface {
385382
// ChainNotify returns channel with chain head updates.
386383
// First message is guaranteed to be of len == 1, and type == 'current'.
387384
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
385+
ChainHead(context.Context) (*types.TipSet, error)
388386
}
389387

390388
// UnSupport is a subset of api.FullNode

co/chain_api.go

+10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"time"
66

7+
"github.com/filecoin-project/lotus/chain/types"
8+
79
"github.com/filecoin-project/lotus/api"
810
"github.com/filecoin-project/lotus/chain/store"
911
)
@@ -73,3 +75,11 @@ func (c *Coordinator) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange
7375

7476
return out, nil
7577
}
78+
79+
// ChainHead impls api.FullNode.ChainNotify
80+
func (c *Coordinator) ChainHead(in0 context.Context) (*types.TipSet, error) {
81+
c.headMu.RLock()
82+
defer c.headMu.RUnlock()
83+
84+
return c.head, nil
85+
}

co/coordinator.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,6 @@ func (c *Coordinator) handleCandidate(hc *headCandidate) {
9696

9797
prev := c.head
9898
next := hc.ts
99-
headChanges, err := c.applyTipSetChange(prev, next, hc.node) // todo if network become slow
100-
if err != nil {
101-
clog.Errorf("apply tipset change: %s", err)
102-
}
103-
if headChanges == nil {
104-
return
105-
}
10699

107100
c.head = hc.ts
108101
c.weight = hc.weight
@@ -112,8 +105,16 @@ func (c *Coordinator) handleCandidate(hc *headCandidate) {
112105
c.sel.setPriority(DelayPriority, preAddrs...)
113106
c.sel.setPriority(CatchUpPriority, addr)
114107

115-
c.tspub.Pub(headChanges, tipsetChangeTopic)
108+
// reorg must after cache update
109+
headChanges, err := c.applyTipSetChange(prev, next, hc.node) // todo if network become slow
110+
if err != nil {
111+
clog.Errorf("apply tipset change: %s", err)
112+
}
113+
if headChanges == nil {
114+
return
115+
}
116116

117+
c.tspub.Pub(headChanges, tipsetChangeTopic)
117118
return
118119
}
119120

co/ctx.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,8 @@ import (
1414

1515
// NewCtx constructs a Ctx instance
1616
func NewCtx(mctx helpers.MetricsCtx, lc fx.Lifecycle, nodeOpt NodeOption) (*Ctx, error) {
17-
bcache, err := newBlockHeaderCache(1 << 20)
18-
if err != nil {
19-
return nil, err
20-
}
21-
2217
return &Ctx{
2318
lc: helpers.LifecycleCtx(mctx, lc),
24-
bcache: bcache,
2519
headCh: make(chan *headCandidate, 256),
2620
errNodeCh: make(chan string, 256),
2721
nodeOpt: nodeOpt,
@@ -31,7 +25,6 @@ func NewCtx(mctx helpers.MetricsCtx, lc fx.Lifecycle, nodeOpt NodeOption) (*Ctx,
3125
// Ctx contains the shared components between different modules
3226
type Ctx struct {
3327
lc context.Context
34-
bcache *blockHeaderCache
3528
headCh chan *headCandidate
3629
errNodeCh chan string
3730

@@ -77,3 +70,18 @@ func (bc *blockHeaderCache) load(c cid.Cid) (*types.BlockHeader, bool) {
7770
blk, ok := val.(*types.BlockHeader)
7871
return blk, ok
7972
}
73+
74+
func (bc *blockHeaderCache) has(c cid.Cid) bool {
75+
_, ok := bc.cache.Peek(c)
76+
return ok
77+
}
78+
79+
func (bc *blockHeaderCache) hasKey(key types.TipSetKey) bool {
80+
for _, blkCid := range key.Cids() {
81+
has := bc.has(blkCid)
82+
if !has {
83+
return false
84+
}
85+
}
86+
return true
87+
}

co/node.go

+9-25
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"sync"
66
"time"
77

8-
lru "github.com/hashicorp/golang-lru"
9-
108
"github.com/filecoin-project/lotus/api/v1api"
119
"github.com/ipfs-force-community/venus-common-utils/apiinfo"
1210

@@ -73,8 +71,9 @@ type Node struct {
7371
closer jsonrpc.ClientCloser
7472
}
7573

76-
blkCache *lru.ARCCache
77-
log *zap.SugaredLogger
74+
blkCache *blockHeaderCache
75+
76+
log *zap.SugaredLogger
7877
}
7978

8079
func NewNode(cctx *Ctx, info NodeInfo) (*Node, error) {
@@ -83,11 +82,12 @@ func NewNode(cctx *Ctx, info NodeInfo) (*Node, error) {
8382
return nil, err
8483
}
8584
ctx, cancel := context.WithCancel(cctx.lc)
86-
blkCache, err := lru.NewARC(100)
85+
blkCache, err := newBlockHeaderCache(1 << 20)
8786
if err != nil {
8887
cancel()
8988
return nil, err
9089
}
90+
9191
return &Node{
9292
reListenInterval: cctx.nodeOpt.ReListenMinInterval,
9393
opt: cctx.nodeOpt,
@@ -211,7 +211,7 @@ func (n *Node) reListen() (<-chan []*api.HeadChange, error) {
211211
}
212212

213213
func (n *Node) applyChanges(lifeCtx context.Context, changes []*api.HeadChange) {
214-
n.sctx.bcache.add(changes)
214+
n.blkCache.add(changes)
215215

216216
idx := -1
217217
for i := range changes {
@@ -220,7 +220,6 @@ func (n *Node) applyChanges(lifeCtx context.Context, changes []*api.HeadChange)
220220
idx = i
221221
case store.HCApply:
222222
idx = i
223-
n.storeKey(changes[i].Val.Key())
224223
}
225224
}
226225

@@ -292,31 +291,16 @@ func (n *Node) loadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipS
292291
}
293292

294293
func (n *Node) loadBlockHeader(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
295-
if blk, ok := n.sctx.bcache.load(c); ok {
294+
if blk, ok := n.blkCache.load(c); ok {
296295
return blk, nil
297296
}
298297

299298
blk, err := n.upstream.full.ChainGetBlock(ctx, c)
300299
return blk, err
301300
}
302301

303-
func (n *Node) hasKey(key types.TipSetKey) bool {
304-
for _, blkCid := range key.Cids() {
305-
_, has := n.blkCache.Peek(blkCid.String())
306-
if !has {
307-
return false
308-
}
309-
}
310-
return true
311-
}
312-
313-
func (n *Node) storeKey(key types.TipSetKey) {
314-
for _, blkCid := range key.Cids() {
315-
_, has := n.blkCache.Peek(blkCid.String())
316-
if !has {
317-
n.blkCache.Add(blkCid.String(), nil)
318-
}
319-
}
302+
func (n *Node) hasTipset(key types.TipSetKey) bool {
303+
return n.blkCache.hasKey(key)
320304
}
321305

322306
const (

co/selector.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,9 @@ func (s *Selector) Select(tsk types.TipSetKey) (*Node, error) {
157157

158158
for addr, p := range s.priority {
159159
node := s.nodeProvider.GetNode(addr)
160-
if !node.hasKey(tsk) {
161-
log.Warnf("node %s not contains key %s", addr, tsk)
160+
if !tsk.IsEmpty() && node.hasTipset(tsk) && p != ErrPriority {
161+
log.Debugf("node %s has tipset %s, change to catchup node", addr, tsk.Cids())
162+
catchUpQue[addr] = s.weight[addr]
162163
continue
163164
}
164165
if p == CatchUpPriority {

proxy/local.go

+9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ type Local struct {
2020
}
2121

2222
// impl api.Local
23+
func (p *Local) ChainHead(in0 context.Context) (out0 *types.TipSet, err error) {
24+
cli, err := p.Select(types.EmptyTSK)
25+
if err != nil {
26+
err = fmt.Errorf("api ChainHead %v", err)
27+
return
28+
}
29+
return cli.ChainHead(in0)
30+
}
31+
2332
func (p *Local) ChainNotify(in0 context.Context) (out0 <-chan []*api1.HeadChange, err error) {
2433
cli, err := p.Select(types.EmptyTSK)
2534
if err != nil {

proxy/proxy.go

-9
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,6 @@ func (p *Proxy) ChainHasObj(in0 context.Context, in1 cid.Cid) (out0 bool, err er
140140
return cli.ChainHasObj(in0, in1)
141141
}
142142

143-
func (p *Proxy) ChainHead(in0 context.Context) (out0 *types.TipSet, err error) {
144-
cli, err := p.Select(types.EmptyTSK)
145-
if err != nil {
146-
err = fmt.Errorf("api ChainHead %v", err)
147-
return
148-
}
149-
return cli.ChainHead(in0)
150-
}
151-
152143
func (p *Proxy) ChainReadObj(in0 context.Context, in1 cid.Cid) (out0 []uint8, err error) {
153144
cli, err := p.Select(types.EmptyTSK)
154145
if err != nil {

0 commit comments

Comments
 (0)