Skip to content

Commit d0e9d0b

Browse files
authored
report received block height (#4670)
* report received block height * filter future blocks * fix unit tests
1 parent 76036da commit d0e9d0b

File tree

4 files changed

+57
-8
lines changed

4 files changed

+57
-8
lines changed

chainservice/chainservice.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/libp2p/go-libp2p/core/peer"
1414
"github.com/pkg/errors"
1515
"github.com/prometheus/client_golang/prometheus"
16+
"go.uber.org/zap"
1617
"google.golang.org/protobuf/proto"
1718

1819
"github.com/iotexproject/go-pkgs/hash"
@@ -127,9 +128,39 @@ func (cs *ChainService) Pause(pause bool) {
127128
cs.paused.Store(pause)
128129
}
129130

131+
func (cs *ChainService) Filter(messageType iotexrpc.MessageType, msg proto.Message, cap int) bool {
132+
// Filter out messages that are not relevant to the chain service
133+
if messageType != iotexrpc.MessageType_BLOCK {
134+
return true
135+
}
136+
blk, ok := msg.(*iotextypes.Block)
137+
if !ok || blk == nil {
138+
return false
139+
}
140+
if blk.Header.Core.Height > atomic.LoadUint64(&cs.lastReceivedBlockHeight) {
141+
atomic.StoreUint64(&cs.lastReceivedBlockHeight, blk.Header.Core.Height)
142+
}
143+
tip, err := cs.blockdao.Height()
144+
if err != nil {
145+
log.L().Error("failed to get tip height from blockdao", zap.Error(err))
146+
return true
147+
}
148+
return blk.Header.Core.Height < tip+uint64(cap)
149+
}
150+
130151
// ReportFullness switch on or off block sync
131-
func (cs *ChainService) ReportFullness(_ context.Context, messageType iotexrpc.MessageType, fullness float32) {
152+
func (cs *ChainService) ReportFullness(_ context.Context, messageType iotexrpc.MessageType, msg proto.Message, fullness float32) {
132153
_blockchainFullnessMtc.WithLabelValues(iotexrpc.MessageType_name[int32(messageType)]).Set(float64(fullness))
154+
switch messageType {
155+
case iotexrpc.MessageType_BLOCK:
156+
blk, ok := msg.(*iotextypes.Block)
157+
if !ok || blk == nil {
158+
return
159+
}
160+
if blk.Header.Core.Height > atomic.LoadUint64(&cs.lastReceivedBlockHeight) {
161+
atomic.StoreUint64(&cs.lastReceivedBlockHeight, blk.Header.Core.Height)
162+
}
163+
}
133164
}
134165

135166
// HandleAction handles incoming action request.
@@ -186,9 +217,6 @@ func (cs *ChainService) HandleBlock(ctx context.Context, peer string, pbBlock *i
186217
if err != nil {
187218
return err
188219
}
189-
if atomic.LoadUint64(&cs.lastReceivedBlockHeight) < blk.Height() {
190-
atomic.StoreUint64(&cs.lastReceivedBlockHeight, blk.Height())
191-
}
192220
return cs.blocksync.ProcessBlock(ctx, peer, blk)
193221
}
194222

dispatcher/dispatcher.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,16 @@ func (d *IotxDispatcher) ValidateMessage(pMsg proto.Message) (bool, error) {
232232
}
233233

234234
func (d *IotxDispatcher) queueMessage(msg *message) {
235+
subscriber := d.subscriber(msg.chainID)
236+
if subscriber == nil {
237+
log.L().Warn("chainID has not been registered in dispatcher.", zap.Uint32("chainID", msg.chainID))
238+
return
239+
}
235240
queue := d.queueMgr.Queue(msg)
241+
if !subscriber.Filter(msg.msgType, msg.msg, cap(queue)) {
242+
log.L().Debug("Message filtered by subscriber.", zap.Uint32("chainID", msg.chainID), zap.String("msgType", msg.msgType.String()))
243+
return
244+
}
236245
select {
237246
case queue <- msg:
238247
default:
@@ -309,7 +318,7 @@ func (d *IotxDispatcher) updateMetrics(msg *message, queue chan *message) {
309318
d.updateEventAudit(msg.msgType)
310319
subscriber := d.subscriber(msg.chainID)
311320
if subscriber != nil {
312-
subscriber.ReportFullness(msg.ctx, msg.msgType, float32(len(queue))/float32(cap(queue)))
321+
subscriber.ReportFullness(msg.ctx, msg.msgType, msg.msg, float32(len(queue))/float32(cap(queue)))
313322
}
314323
}
315324

dispatcher/dispatcher_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,12 @@ func dispatcherIsClean(dsp *IotxDispatcher) bool {
181181

182182
type dummySubscriber struct{}
183183

184-
func (ds *dummySubscriber) ReportFullness(context.Context, iotexrpc.MessageType, float32) {}
184+
func (ds *dummySubscriber) Filter(iotexrpc.MessageType, proto.Message, int) bool {
185+
return true
186+
}
187+
188+
func (ds *dummySubscriber) ReportFullness(context.Context, iotexrpc.MessageType, proto.Message, float32) {
189+
}
185190

186191
func (ds *dummySubscriber) HandleBlock(context.Context, string, *iotextypes.Block) error { return nil }
187192

@@ -220,7 +225,12 @@ type counterSubscriber struct {
220225
actionHash atomic.Int32
221226
}
222227

223-
func (cs *counterSubscriber) ReportFullness(context.Context, iotexrpc.MessageType, float32) {}
228+
func (cs *counterSubscriber) Filter(iotexrpc.MessageType, proto.Message, int) bool {
229+
return true
230+
}
231+
232+
func (cs *counterSubscriber) ReportFullness(context.Context, iotexrpc.MessageType, proto.Message, float32) {
233+
}
224234

225235
func (cs *counterSubscriber) HandleBlock(context.Context, string, *iotextypes.Block) error {
226236
cs.block.Inc()

dispatcher/subscriber.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
"github.com/iotexproject/iotex-proto/golang/iotexrpc"
88
"github.com/iotexproject/iotex-proto/golang/iotextypes"
99
"github.com/libp2p/go-libp2p/core/peer"
10+
"google.golang.org/protobuf/proto"
1011
)
1112

1213
// Subscriber is the dispatcher subscriber interface
1314
type Subscriber interface {
14-
ReportFullness(context.Context, iotexrpc.MessageType, float32)
15+
Filter(iotexrpc.MessageType, proto.Message, int) bool
16+
ReportFullness(context.Context, iotexrpc.MessageType, proto.Message, float32)
1517
HandleAction(context.Context, *iotextypes.Action) error
1618
HandleBlock(context.Context, string, *iotextypes.Block) error
1719
HandleSyncRequest(context.Context, peer.AddrInfo, *iotexrpc.BlockSync) error

0 commit comments

Comments
 (0)