Skip to content

Commit

Permalink
node,peers: block pex in dht and gossipsub when app pex is disabled
Browse files Browse the repository at this point in the history
- It was discovered that both gossipsub and dht external modules were
  initiating new peer connections using the host.Host directly. This
  is undesirable if PEX is disabled in the application, so both are
  now created with appropriate options to disable their own peer
  discovery mechanisms if kwild is started with p2p.pex = false.
- We were ignoring the cfg.P2P.TargetConnections field. It is now
  passed to the peer manager, where it is considered in maintainMinPeers.
  Previously a hard coded value was used.
  • Loading branch information
jchappelow authored and charithabandi committed Feb 26, 2025
1 parent 91a1f3c commit 7ba37c3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 8 deletions.
11 changes: 9 additions & 2 deletions node/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ import (
// Higher level logic will be needed for the aggregation, and fallback to
// next-best shapshots in the event that restore of the current best fails.

func makeDHT(ctx context.Context, h host.Host, peers []peer.AddrInfo, mode dht.ModeOpt) (*dht.IpfsDHT, error) {
func makeDHT(ctx context.Context, h host.Host, peers []peer.AddrInfo, mode dht.ModeOpt, pex bool) (*dht.IpfsDHT, error) {
// Create a DHT
kadDHT, err := dht.New(ctx, h, dht.BootstrapPeers(peers...), dht.Mode(mode))
opts := []dht.Option{
dht.BootstrapPeers(peers...),
dht.Mode(mode),
}
if !pex {
opts = append(opts, dht.DisableAutoRefresh()) // just use connected peers
}
kadDHT, err := dht.New(ctx, h, opts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (n *Node) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

ps, err := pubsub.NewGossipSub(ctx, n.host)
ps, err := pubsub.NewGossipSub(ctx, n.host, pubsub.WithPeerExchange(n.P2PService.PEX()))
if err != nil {
return err
}
Expand Down
14 changes: 11 additions & 3 deletions node/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type P2PService struct {
dht *dht.IpfsDHT
discovery discovery.Discovery

pex bool // pex enable in peerManager

log log.Logger
}

Expand Down Expand Up @@ -60,7 +62,7 @@ func NewP2PService(ctx context.Context, cfg *P2PServiceConfig, host host.Host) (
wcg = peers.NewWhitelistGater(peerWhitelist, peers.WithLogger(logger.New("PEERFILT")))
// PeerMan adds more from address book.
}
cg := peers.ChainConnectionGaters(wcg) // pointless for one, but can be more
cg := peers.ChainConnectionGaters(wcg)

if host == nil {
ip, portStr, err := net.SplitHostPort(cfg.KwilCfg.P2P.ListenAddress)
Expand Down Expand Up @@ -100,7 +102,7 @@ func NewP2PService(ctx context.Context, cfg *P2PServiceConfig, host host.Host) (
Logger: logger.New("PEERS"),
Host: host,
ChainID: cfg.ChainID,
TargetConnections: 20,
TargetConnections: cfg.KwilCfg.P2P.TargetConnections,
ConnGater: wcg,
RequiredProtocols: RequiredStreamProtocols,
}
Expand All @@ -120,7 +122,7 @@ func NewP2PService(ctx context.Context, cfg *P2PServiceConfig, host host.Host) (
host.SetStreamHandler(pubsub.GossipSubID_v12, dummyStreamHandler)

mode := dht.ModeServer
dht, err := makeDHT(ctx, host, nil, mode)
dht, err := makeDHT(ctx, host, nil, mode, pmCfg.PEX)
if err != nil {
return nil, fmt.Errorf("failed to create DHT: %w", err)
}
Expand All @@ -132,6 +134,7 @@ func NewP2PService(ctx context.Context, cfg *P2PServiceConfig, host host.Host) (
dht: dht,
discovery: discoverer,
log: logger,
pex: cfg.KwilCfg.P2P.Pex,
}, nil
}

Expand Down Expand Up @@ -214,3 +217,8 @@ func (p *P2PService) Host() host.Host {
func (p *P2PService) Discovery() discovery.Discovery {
return p.discovery
}

// PEX indicates whether the peer manager is configured to use peer exchange (PEX).
func (p *P2PService) PEX() bool {
return p.pex
}
36 changes: 36 additions & 0 deletions node/peers/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,42 @@ func WithLogger(logger log.Logger) GateOpt {
}
}

var _ connmgr.ConnectionGater = (*OutboundWhitelistGater)(nil)

// OutboundWhitelistGater is to prevent dialing out to peers that are not
// explicitly allowed by an application provided filter function. This exists in
// part to prevent other modules such as the DHT and gossipsub from dialing out
// to peers that are not explicitly allowed (e.g. already connected or added by
// the application).
type OutboundWhitelistGater struct {
AllowedOutbound func(peer.ID) bool
}

// OUTBOUND

func (g *OutboundWhitelistGater) InterceptPeerDial(p peer.ID) bool {
if g == nil || g.AllowedOutbound == nil {
return true
}
return g.AllowedOutbound(p)
}

func (g *OutboundWhitelistGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) bool {
return true
}

// INBOUND

func (g *OutboundWhitelistGater) InterceptAccept(connAddrs network.ConnMultiaddrs) bool { return true }

func (g *OutboundWhitelistGater) InterceptSecured(dir network.Direction, p peer.ID, conn network.ConnMultiaddrs) bool {
return true
}

func (g *OutboundWhitelistGater) InterceptUpgraded(conn network.Conn) (bool, control.DisconnectReason) {
return true, 0
}

func NewWhitelistGater(allowed []peer.ID, opts ...GateOpt) *WhitelistGater {
options := &gateOpts{
logger: log.DiscardLogger,
Expand Down
3 changes: 2 additions & 1 deletion node/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ func (pm *PeerMan) maintainMinPeers(ctx context.Context) {
} else {
pm.log.Infof("Connected to peer %s", peerIDStringer(pid))
added++

}
}

Expand Down Expand Up @@ -344,7 +343,9 @@ func (pm *PeerMan) startPex(ctx context.Context) {
go func() {
var count int
for peer := range peerChan {

if pm.addPeerAddrs(peer) {
pm.log.Info("Found new peer %v, connecting", peerIDStringer(peer.ID))
// TODO: connection manager, with limits
if err = pm.c.Connect(ctx, peer); err != nil {
pm.log.Warnf("Failed to connect to %s: %v", peerIDStringer(peer.ID), CompressDialError(err))
Expand Down
2 changes: 1 addition & 1 deletion node/statesync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func newTestStatesyncer(ctx context.Context, t *testing.T, mn mock.Mocknet, root
return nil, nil, nil, nil, nil, err
}

dht, err := makeDHT(ctx, h, nil, dht.ModeServer)
dht, err := makeDHT(ctx, h, nil, dht.ModeServer, true)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand Down

0 comments on commit 7ba37c3

Please sign in to comment.