Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions pkg/hive/gossip_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package hive

import (
"math/rand/v2"
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/swarm"
)

const (
defaultGossipCoalesceInterval = time.Second
defaultGossipCoalesceJitter = 100 * time.Millisecond
// coalesceThreshold: gossips with fewer peers are buffered; larger
// (already-batched) messages are dispatched immediately.
coalesceThreshold = 2
)

// gossipBuffer accumulates single-peer outbound gossip per addressee so it can be
// flushed as one batched message.
type gossipBuffer struct {
mu sync.Mutex
pending map[string]*pendingGossip // addressee bytestring -> buffered peers
interval time.Duration
jitter time.Duration
maxBatch int
}

type pendingGossip struct {
addressee swarm.Address
peers map[string]swarm.Address // peer bytestring -> address (set semantics)
deadline time.Time
}

func newGossipBuffer(interval time.Duration, maxBatch int) *gossipBuffer {
if interval == 0 {
interval = defaultGossipCoalesceInterval
}
return &gossipBuffer{
pending: make(map[string]*pendingGossip),
interval: interval,
jitter: defaultGossipCoalesceJitter,
maxBatch: maxBatch,
}
}

// add buffers peers for the addressee. If the buffer reaches maxBatch it is
// removed and returned so the caller can flush it immediately.
func (b *gossipBuffer) add(now time.Time, addressee swarm.Address, peers ...swarm.Address) *pendingGossip {
b.mu.Lock()
defer b.mu.Unlock()

key := addressee.ByteString()
e, ok := b.pending[key]
if !ok {
var jitter time.Duration
if b.jitter > 0 {
jitter = time.Duration(rand.Int64N(int64(b.jitter)))
}
e = &pendingGossip{
addressee: addressee,
peers: make(map[string]swarm.Address),
deadline: now.Add(b.interval + jitter),
}
b.pending[key] = e
}
for _, p := range peers {
e.peers[p.ByteString()] = p
}

if len(e.peers) >= b.maxBatch {
delete(b.pending, key)
return e
}
return nil
}

// takeDue removes and returns all entries whose deadline has passed.
func (b *gossipBuffer) takeDue(now time.Time) []*pendingGossip {
return b.take(func(e *pendingGossip) bool { return !e.deadline.After(now) })
}

func (b *gossipBuffer) clearAddressee(addressee swarm.Address) {
b.mu.Lock()
defer b.mu.Unlock()

delete(b.pending, addressee.ByteString())
}

func (b *gossipBuffer) pendingAddressees() int {
b.mu.Lock()
defer b.mu.Unlock()

return len(b.pending)
}

func (b *gossipBuffer) take(match func(*pendingGossip) bool) []*pendingGossip {
b.mu.Lock()
defer b.mu.Unlock()

var out []*pendingGossip
for key, e := range b.pending {
if match(e) {
out = append(out, e)
delete(b.pending, key)
}
}
return out
}

func (e *pendingGossip) addresses() []swarm.Address {
out := make([]swarm.Address, 0, len(e.peers))
for _, p := range e.peers {
out = append(out, p)
}
return out
}
65 changes: 65 additions & 0 deletions pkg/hive/gossip_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package hive

import (
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/swarm"
)

func TestGossipBufferAddAndDue(t *testing.T) {
t.Parallel()

const interval = 100 * time.Millisecond

b := newGossipBuffer(interval, maxBatchSize)
addressee := swarm.RandAddress(t)
peer1 := swarm.RandAddress(t)
peer2 := swarm.RandAddress(t)

now := time.Now()
if full := b.add(now, addressee, peer1); full != nil {
t.Fatal("unexpected immediate flush")
}

if due := b.takeDue(now); len(due) != 0 {
t.Fatalf("want no due entries, got %d", len(due))
}

if full := b.add(now, addressee, peer2); full != nil {
t.Fatal("unexpected immediate flush")
}

afterDeadline := now.Add(interval + defaultGossipCoalesceJitter + time.Millisecond)
due := b.takeDue(afterDeadline)
if len(due) != 1 {
t.Fatalf("want 1 due entry, got %d", len(due))
}
if got := len(due[0].addresses()); got != 2 {
t.Fatalf("want 2 coalesced peers, got %d", got)
}
}

func TestGossipBufferMaxBatchFlush(t *testing.T) {
t.Parallel()

b := newGossipBuffer(time.Second, 2)
addressee := swarm.RandAddress(t)
now := time.Now()

b.add(now, addressee, swarm.RandAddress(t))
full := b.add(now, addressee, swarm.RandAddress(t))
if full == nil {
t.Fatal("want immediate flush at maxBatch")
}
if got := len(full.addresses()); got != 2 {
t.Fatalf("want 2 peers in full batch, got %d", got)
}
if due := b.takeDue(now.Add(time.Second)); len(due) != 0 {
t.Fatalf("want empty buffer after maxBatch flush, got %d due", len(due))
}
}
124 changes: 119 additions & 5 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ var (
ErrRateLimitExceeded = errors.New("rate limit exceeded")
)

const (
coalesceFlushReasonTimer = "timer"
coalesceFlushReasonMaxBatch = "max_batch"
)

// Options configures hive.Service at construction. Chequebook fields are
// optional: a nil ChequebookVerifier disables the verification gate (and
// records without a chequebook are accepted); a nil ChequebookStorer means
Expand All @@ -66,6 +71,8 @@ type Options struct {
AllowPrivateCIDRs bool
ChequebookVerifier chequebook.Verifier
ChequebookStorer ChequebookStorer

GossipCoalesceInterval time.Duration
}

type Service struct {
Expand All @@ -90,6 +97,7 @@ type Service struct {
// chequebook are dropped.
chequebookVerifier chequebook.Verifier
chequebookStorer ChequebookStorer
gossipBuf *gossipBuffer
}

func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, overlay swarm.Address, logger log.Logger, o Options) *Service {
Expand All @@ -112,9 +120,12 @@ func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uin
chequebookStorer: o.ChequebookStorer,
}

svc.gossipBuf = newGossipBuffer(o.GossipCoalesceInterval, maxBatchSize)

if !o.BootnodeMode {
svc.startCheckPeersHandler()
}
svc.startGossipCoalescer()

return svc
}
Expand All @@ -136,35 +147,82 @@ func (s *Service) Protocol() p2p.ProtocolSpec {

var ErrShutdownInProgress = errors.New("shutdown in progress")

// BroadcastPeers sends peer gossip to the addressee. Calls with fewer than
// coalesceThreshold peers are buffered and flushed asynchronously; errors
// during deferred dispatch are logged but not returned to the caller.
// Calls with coalesceThreshold or more peers are sent immediately.
func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error {
maxSize := maxBatchSize
if len(peers) == 0 {
return nil
}

s.metrics.BroadcastPeers.Inc()
s.metrics.BroadcastPeersPeers.Add(float64(len(peers)))

// Already-batched messages go out immediately; single-peer gossips are coalesced.
if len(peers) >= coalesceThreshold {
s.metrics.GossipCoalesceImmediatePeers.Add(float64(len(peers)))
s.logger.Debug("gossip immediate send", "addressee", addressee, "peer_count", len(peers))
_, err := s.broadcastNow(ctx, addressee, peers...)
return err
}

select {
case <-s.quit:
return ErrShutdownInProgress
default:
}

s.metrics.GossipCoalesceBufferedPeers.Add(float64(len(peers)))
s.logger.Debug("gossip buffered", "addressee", addressee, "peer_count", len(peers))

// Buffer; if it just filled up, flush it synchronously while still in the call
if full := s.gossipBuf.add(s.now(), addressee, peers...); full != nil {
flushPeers := full.addresses()
s.recordCoalesceFlush(coalesceFlushReasonMaxBatch, addressee, flushPeers)
s.setCoalesceBufferGauge()
sent, err := s.broadcastNow(ctx, addressee, flushPeers...)
if dropped := len(flushPeers) - sent; dropped > 0 {
s.metrics.GossipCoalesceDropped.Add(float64(dropped))
}
return err
}
s.setCoalesceBufferGauge()
return nil
}

// broadcastNow performs the synchronous, rate-limited, batched send.
// It returns the number of peers successfully sent.
func (s *Service) broadcastNow(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) (sent int, err error) {
maxSize := maxBatchSize
total := len(peers)

for len(peers) > 0 {
if maxSize > len(peers) {
maxSize = len(peers)
}

// If broadcasting limit is exceeded, return early
if !s.outLimiter.Allow(addressee.ByteString(), maxSize) {
return nil
return total - len(peers), nil
}

select {
case <-ctx.Done():
return total - len(peers), ctx.Err()
case <-s.quit:
return ErrShutdownInProgress
return total - len(peers), ErrShutdownInProgress
default:
}

if err := s.sendPeers(ctx, addressee, peers[:maxSize]); err != nil {
return err
return total - len(peers), err
}

peers = peers[maxSize:]
}

return nil
return total, nil
}

func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) {
Expand Down Expand Up @@ -296,9 +354,65 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
func (s *Service) disconnect(peer p2p.Peer) error {
s.inLimiter.Clear(peer.Address.ByteString())
s.outLimiter.Clear(peer.Address.ByteString())
s.gossipBuf.clearAddressee(peer.Address)
s.setCoalesceBufferGauge()
return nil
}

func (s *Service) startGossipCoalescer() {
tick := s.gossipBuf.interval / 2
if tick <= 0 {
tick = s.gossipBuf.interval
}

s.wg.Go(func() {
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.flushGossipEntries(s.gossipBuf.takeDue(s.now()), coalesceFlushReasonTimer)
case <-s.quit:
return
}
}
})
}

func (s *Service) flushGossipEntries(entries []*pendingGossip, reason string) {
s.setCoalesceBufferGauge()

for _, e := range entries {
peers := e.addresses()
s.recordCoalesceFlush(reason, e.addressee, peers)

ctx, cancel := context.WithTimeout(context.Background(), messageTimeout)
sent, err := s.broadcastNow(ctx, e.addressee, peers...)
if dropped := len(peers) - sent; dropped > 0 {
s.metrics.GossipCoalesceDropped.Add(float64(dropped))
}
if err != nil {
s.logger.Debug("coalesced gossip flush failed", "addressee", e.addressee, "reason", reason, "batch_size", len(peers), "error", err)
}
cancel()
}
}

func (s *Service) recordCoalesceFlush(reason string, addressee swarm.Address, peers []swarm.Address) {
batchSize := len(peers)
if batchSize == 0 {
return
}

s.metrics.GossipCoalesceFlushTotal.WithLabelValues(reason).Inc()
s.metrics.GossipCoalesceFlushPeers.Add(float64(batchSize))
s.logger.Debug("coalesced gossip flush", "addressee", addressee, "reason", reason, "batch_size", batchSize)
}

func (s *Service) setCoalesceBufferGauge() {
s.metrics.GossipCoalesceBufferSize.Set(float64(s.gossipBuf.pendingAddressees()))
}

func (s *Service) startCheckPeersHandler() {
ctx, cancel := context.WithCancel(context.Background())
s.wg.Go(func() {
Expand Down
Loading
Loading