Skip to content

[p2p] split consensus to a different topic #4548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,8 +798,9 @@ func (builder *Builder) buildBlockTimeCalculator() (err error) {
func (builder *Builder) buildConsensusComponent() error {
p2pAgent := builder.cs.p2pAgent
copts := []consensus.Option{
consensus.WithBroadcast(func(msg proto.Message) error {
return p2pAgent.BroadcastOutbound(context.Background(), msg)
consensus.WithDefaultTopic(p2pAgent.DefaultTopic()),
consensus.WithBroadcast(func(topic string, msg proto.Message) error {
return p2pAgent.BroadcastOutbound(context.Background(), topic, msg)
}),
}
if rDPoSProtocol := rolldpos.FindProtocol(builder.cs.registry); rDPoSProtocol != nil {
Expand Down
2 changes: 1 addition & 1 deletion chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, archive bool) (*api.ServerV
p2pAgent := cs.p2pAgent
apiServerOptions := []api.Option{
api.WithBroadcastOutbound(func(ctx context.Context, chainID uint32, msg proto.Message) error {
return p2pAgent.BroadcastOutbound(ctx, msg)
return p2pAgent.BroadcastOutbound(ctx, p2pAgent.DefaultTopic(), msg)
}),
api.WithNativeElection(cs.electionCommittee),
api.WithAPIStats(cs.apiStats),
Expand Down
12 changes: 11 additions & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type IotxConsensus struct {
}

type optionParams struct {
defaultTopic string
broadcastHandler scheme.Broadcast
pp poll.Protocol
rp *rp.Protocol
Expand All @@ -54,6 +55,14 @@ type optionParams struct {
// Option sets Consensus construction parameter.
type Option func(op *optionParams) error

// WithDefaultTopic is an option to set consensus broadcast topic
func WithDefaultTopic(topic string) Option {
return func(ops *optionParams) error {
ops.defaultTopic = topic
return nil
}
}

// WithBroadcast is an option to add broadcast callback to Consensus
func WithBroadcast(broadcastHandler scheme.Broadcast) Option {
return func(ops *optionParams) error {
Expand Down Expand Up @@ -140,6 +149,7 @@ func NewConsensus(
SetBlockDeserializer(block.NewDeserializer(bc.EvmNetworkID())).
SetClock(clock).
SetBroadcast(ops.broadcastHandler).
SetDefaultTopic(ops.defaultTopic).
SetDelegatesByEpochFunc(delegatesByEpochFunc).
SetProposersByEpochFunc(proposersByEpochFunc).
RegisterProtocol(ops.rp)
Expand Down Expand Up @@ -171,7 +181,7 @@ func NewConsensus(
}
broadcastBlockCB := func(blk *block.Block) error {
if blkPb := blk.ConvertToBlockPb(); blkPb != nil {
return ops.broadcastHandler(blkPb)
return ops.broadcastHandler(ops.defaultTopic, blkPb)
}
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions consensus/scheme/rolldpos/rolldpos.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ type (
chain ChainManager
blockDeserializer *block.Deserializer
broadcastHandler scheme.Broadcast
topic string
clock clock.Clock
// TODO: explorer dependency deleted at #1085, need to add api params
rp *rolldpos.Protocol
Expand Down Expand Up @@ -385,6 +386,11 @@ func (b *Builder) SetBroadcast(broadcastHandler scheme.Broadcast) *Builder {
return b
}

func (b *Builder) SetDefaultTopic(topic string) *Builder {
b.topic = topic
return b
}

// SetClock sets the clock
func (b *Builder) SetClock(clock clock.Clock) *Builder {
b.clock = clock
Expand Down Expand Up @@ -435,6 +441,7 @@ func (b *Builder) Build() (*RollDPoS, error) {
b.blockDeserializer,
b.rp,
b.broadcastHandler,
b.topic,
b.delegatesByEpochFunc,
b.proposersByEpochFunc,
b.encodedAddr,
Expand Down
9 changes: 7 additions & 2 deletions consensus/scheme/rolldpos/rolldposctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type (
chain ChainManager
blockDeserializer *block.Deserializer
broadcastHandler scheme.Broadcast
topic string
roundCalc *roundCalculator
eManagerDB db.KVStore
toleratedOvertime time.Duration
Expand All @@ -114,6 +115,7 @@ func NewRollDPoSCtx(
blockDeserializer *block.Deserializer,
rp *rolldpos.Protocol,
broadcastHandler scheme.Broadcast,
topic string,
delegatesByEpochFunc NodesSelectionByEpochFunc,
proposersByEpochFunc NodesSelectionByEpochFunc,
encodedAddr string,
Expand Down Expand Up @@ -166,6 +168,7 @@ func NewRollDPoSCtx(
chain: chain,
blockDeserializer: blockDeserializer,
broadcastHandler: broadcastHandler,
topic: topic,
clock: clock,
roundCalc: roundCalc,
eManagerDB: eManagerDB,
Expand Down Expand Up @@ -526,7 +529,8 @@ func (ctx *rollDPoSCtx) Commit(msg interface{}) (bool, error) {
}
// Broadcast the committed block to the network
if blkProto := pendingBlock.ConvertToBlockPb(); blkProto != nil {
if err := ctx.broadcastHandler(blkProto); err != nil {
// TODO: broadcast to different topic after HF height
if err := ctx.broadcastHandler(ctx.topic, blkProto); err != nil {
ctx.logger().Error(
"error when broadcasting blkProto",
zap.Error(err),
Expand Down Expand Up @@ -571,7 +575,8 @@ func (ctx *rollDPoSCtx) Broadcast(endorsedMsg interface{}) {
ctx.loggerWithStats().Error("failed to generate protobuf message", zap.Error(err))
return
}
if err := ctx.broadcastHandler(msg); err != nil {
// TODO: broadcast to different topic after HF height
if err := ctx.broadcastHandler(ctx.topic, msg); err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check block height, after HF switch to consensus's own topic

ctx.loggerWithStats().Error("fail to broadcast", zap.Error(err))
}
}
Expand Down
6 changes: 5 additions & 1 deletion consensus/scheme/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/iotexproject/iotex-proto/golang/iotextypes"
)

var (
BroadcastTopic = "broadcast+consensus"
)

// CreateBlockCB defines the callback to create a new block
type CreateBlockCB func() (*block.Block, error)

Expand All @@ -26,7 +30,7 @@ type ConsensusDoneCB func(*block.Block) error
type BroadcastCB func(*block.Block) error

// Broadcast sends a broadcast message to the whole network
type Broadcast func(msg proto.Message) error
type Broadcast func(string, proto.Message) error

// Scheme is the interface that consensus schemes should implement
type Scheme interface {
Expand Down
8 changes: 6 additions & 2 deletions nodeinfo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (

type (
transmitter interface {
BroadcastOutbound(context.Context, proto.Message) error
BroadcastOutbound(context.Context, string, proto.Message) error
UnicastOutbound(context.Context, peer.AddrInfo, proto.Message) error
DefaultTopic() string
Info() (peer.AddrInfo, error)
}

Expand All @@ -54,6 +55,7 @@ type (
lifecycle.Lifecycle
version string
address string
topic string
broadcastList atomic.Value // []string, whitelist to force enable broadcast
nodeMap *lru.Cache
transmitter transmitter
Expand Down Expand Up @@ -86,6 +88,7 @@ func NewInfoManager(cfg *Config, t transmitter, ch chain, privKey crypto.Private
privKey: privKey,
version: version.PackageVersion,
address: privKey.PublicKey().Address().String(),
topic: t.DefaultTopic(),
getBroadcastListFunc: broadcastListFunc,
}
dm.broadcastList.Store([]string{})
Expand Down Expand Up @@ -169,7 +172,8 @@ func (dm *InfoManager) BroadcastNodeInfo(ctx context.Context) error {
return err
}
// broadcast request meesage
if err := dm.transmitter.BroadcastOutbound(ctx, req); err != nil {
// TODO: use different topic after HF height
if err := dm.transmitter.BroadcastOutbound(ctx, dm.topic, req); err != nil {
return err
}
// manually update self node info for broadcast message to myself will be ignored
Expand Down
118 changes: 65 additions & 53 deletions p2p/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type (
ExternalHost string `yaml:"externalHost"`
ExternalPort int `yaml:"externalPort"`
BootstrapNodes []string `yaml:"bootstrapNodes"`
ExtraTopics []string `yaml:"extraTopics"`
MasterKey string `yaml:"masterKey"` // master key will be PrivateKey if not set.
// RelayType is the type of P2P network relay. By default, the value is empty, meaning disabled. Two relay types
// are supported: active, nat.
Expand All @@ -103,10 +104,12 @@ type (
Agent interface {
lifecycle.StartStopper
nodestats.StatsReporter
// BroadcastOutbound sends a broadcast message to the whole network
BroadcastOutbound(ctx context.Context, msg proto.Message) (err error)
// BroadcastOutbound sends a broadcast message to the topic
BroadcastOutbound(context.Context, string, proto.Message) (err error)
// UnicastOutbound sends a unicast message to the given address
UnicastOutbound(_ context.Context, peer peer.AddrInfo, msg proto.Message) (err error)
// DefaultTopic returns the default broadcast topic
DefaultTopic() string
// Info returns agents' peer info.
Info() (peer.AddrInfo, error)
// Self returns the self network address
Expand Down Expand Up @@ -163,14 +166,16 @@ func (*dummyAgent) Stop(context.Context) error {
return nil
}

func (*dummyAgent) BroadcastOutbound(ctx context.Context, msg proto.Message) error {
func (*dummyAgent) BroadcastOutbound(ctx context.Context, topic string, msg proto.Message) error {
return nil
}

func (*dummyAgent) UnicastOutbound(_ context.Context, peer peer.AddrInfo, msg proto.Message) error {
return nil
}

func (*dummyAgent) DefaultTopic() string { return "" }

func (*dummyAgent) Info() (peer.AddrInfo, error) {
return peer.AddrInfo{}, nil
}
Expand Down Expand Up @@ -235,61 +240,64 @@ func (p *agent) Start(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "error when instantiating Agent host")
}

if err := host.AddBroadcastPubSub(ctx, _broadcastTopic+p.topicSuffix, func(ctx context.Context, data []byte) (err error) {
// Blocking handling the broadcast message until the agent is started
<-ready
var (
peerID string
broadcast iotexrpc.BroadcastMsg
latency int64
)
skip := false
defer func() {
// Skip accounting if the broadcast message is not handled
if skip {
// register to all topics
p.cfg.ExtraTopics = append(p.cfg.ExtraTopics, p.DefaultTopic())
for _, topic := range p.cfg.ExtraTopics {
if err := host.AddBroadcastPubSub(ctx, topic, func(ctx context.Context, data []byte) (err error) {
// Blocking handling the broadcast message until the agent is started
<-ready
var (
peerID string
broadcast iotexrpc.BroadcastMsg
latency int64
)
skip := false
defer func() {
// Skip accounting if the broadcast message is not handled
if skip {
return
}
status := _successStr
if err != nil {
status = _failureStr
}
_p2pMsgCounter.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), "in", peerID, status).Inc()
_p2pMsgLatency.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), status).Observe(float64(latency))
}()
if err = proto.Unmarshal(data, &broadcast); err != nil {
err = errors.Wrap(err, "error when marshaling broadcast message")
return
}
status := _successStr
if err != nil {
status = _failureStr
// Skip the broadcast message if it's from the node itself
rawmsg, ok := p2p.GetBroadcastMsg(ctx)
if !ok {
err = errors.New("error when asserting broadcast msg context")
return
}
peerID = rawmsg.GetFrom().String()
if p.host.HostIdentity() == peerID {
skip = true
return
}
if broadcast.ChainId != p.chainID {
err = errors.Errorf("chain ID mismatch, received %d, expecting %d", broadcast.ChainId, p.chainID)
return
}
_p2pMsgCounter.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), "in", peerID, status).Inc()
_p2pMsgLatency.WithLabelValues("broadcast", strconv.Itoa(int(broadcast.MsgType)), status).Observe(float64(latency))
}()
if err = proto.Unmarshal(data, &broadcast); err != nil {
err = errors.Wrap(err, "error when marshaling broadcast message")
return
}
// Skip the broadcast message if it's from the node itself
rawmsg, ok := p2p.GetBroadcastMsg(ctx)
if !ok {
err = errors.New("error when asserting broadcast msg context")
return
}
peerID = rawmsg.GetFrom().String()
if p.host.HostIdentity() == peerID {
skip = true
return
}
if broadcast.ChainId != p.chainID {
err = errors.Errorf("chain ID mismatch, received %d, expecting %d", broadcast.ChainId, p.chainID)
return
}

t := broadcast.GetTimestamp().AsTime()
latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds()
t := broadcast.GetTimestamp().AsTime()
latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds()

msg, err := goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
if err != nil {
err = errors.Wrap(err, "error when typifying broadcast message")
msg, err := goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
if err != nil {
err = errors.Wrap(err, "error when typifying broadcast message")
return
}
p.broadcastInboundHandler(ctx, broadcast.ChainId, peerID, msg)
p.qosMetrics.updateRecvBroadcast(time.Now())
return
}); err != nil {
return errors.Wrap(err, "error when adding broadcast pubsub")
}
p.broadcastInboundHandler(ctx, broadcast.ChainId, peerID, msg)
p.qosMetrics.updateRecvBroadcast(time.Now())
return
}); err != nil {
return errors.Wrap(err, "error when adding broadcast pubsub")
}

if err := host.AddUnicastPubSub(_unicastTopic+p.topicSuffix, func(ctx context.Context, peerInfo peer.AddrInfo, data []byte) (err error) {
Expand Down Expand Up @@ -379,7 +387,7 @@ func (p *agent) Stop(ctx context.Context) error {
return nil
}

func (p *agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err error) {
func (p *agent) BroadcastOutbound(ctx context.Context, topic string, msg proto.Message) (err error) {
_, span := tracer.NewSpan(ctx, "Agent.BroadcastOutbound")
defer span.End()

Expand Down Expand Up @@ -419,7 +427,7 @@ func (p *agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err e
return
}
t := time.Now()
if err = host.Broadcast(ctx, _broadcastTopic+p.topicSuffix, data); err != nil {
if err = host.Broadcast(ctx, topic, data); err != nil {
err = errors.Wrap(err, "error when sending broadcast message")
p.qosMetrics.updateSendBroadcast(t, false)
return
Expand Down Expand Up @@ -473,6 +481,10 @@ func (p *agent) UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg pro
return
}

func (p *agent) DefaultTopic() string {
return _broadcastTopic + p.topicSuffix
}

func (p *agent) Info() (peer.AddrInfo, error) {
if p.host == nil {
return peer.AddrInfo{}, ErrAgentNotStarted
Expand Down
3 changes: 3 additions & 0 deletions server/itx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/iotexproject/iotex-core/v2/api"
"github.com/iotexproject/iotex-core/v2/chainservice"
"github.com/iotexproject/iotex-core/v2/config"
"github.com/iotexproject/iotex-core/v2/consensus/scheme"
"github.com/iotexproject/iotex-core/v2/dispatcher"
"github.com/iotexproject/iotex-core/v2/p2p"
"github.com/iotexproject/iotex-core/v2/pkg/ha"
Expand Down Expand Up @@ -66,6 +67,8 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
case config.StandaloneScheme:
p2pAgent = p2p.NewDummyAgent()
default:
// TODO: add different topic depends on node type
cfg.Network.ExtraTopics = append(cfg.Network.ExtraTopics, scheme.BroadcastTopic)
p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell)
}
chains := make(map[uint32]*chainservice.ChainService)
Expand Down
Loading