Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildnumber.dat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0
1
10 changes: 10 additions & 0 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,17 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf
messagesOfInterestEnc = wn.messagesOfInterestEnc
wn.messagesOfInterestMu.Unlock()
}

if messagesOfInterestEnc != nil {
// Filter VP tag for peers lacking stateful compression support
// older nodes (<= v4.3) treat unknown tags as protocol violations and disconnect.
if !peer.vpackStatefulCompressionSupported() {
tags, err := unmarshallMessageOfInterest(messagesOfInterestEnc)
if err == nil && tags[protocol.VotePackedTag] {
delete(tags, protocol.VotePackedTag)
messagesOfInterestEnc = marshallMessageOfInterestMap(tags)
}
}
peer.sendMessagesOfInterest(messagesOfInterestGeneration, messagesOfInterestEnc)
} else {
wn.log.Infof("msgOfInterest Enc=nil, MOIGen=%d", messagesOfInterestGeneration)
Expand Down
123 changes: 123 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"io"
"maps"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -4773,3 +4774,125 @@ func TestPeerComparisonInBroadcast(t *testing.T) {
require.Equal(t, 1, len(testPeer.sendBufferBulk))
require.Equal(t, 0, len(exceptPeer.sendBufferBulk))
}

func TestMaybeSendMessagesOfInterestLegacyPeer(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

makePeer := func(wn *WebsocketNetwork, features peerFeatureFlag) (*wsPeer, chan sendMessage) {
ch := make(chan sendMessage, 1)
return &wsPeer{
wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, nil, "test-addr", nil, ""),
features: features,
sendBufferHighPrio: ch,
sendBufferBulk: make(chan sendMessage, 1),
closing: make(chan struct{}),
processed: make(chan struct{}, 1),
}, ch
}

newTestNetwork := func(tags map[protocol.Tag]bool) *WebsocketNetwork {
wn := &WebsocketNetwork{
log: logging.TestingLog(t),
}
wn.ctx = context.Background()
cloned := maps.Clone(tags)
wn.messagesOfInterest = cloned
wn.messagesOfInterestEnc = marshallMessageOfInterestMap(cloned)
wn.messagesOfInterestGeneration.Store(1)
return wn
}

t.Run("filters VP for peers without stateful support", func(t *testing.T) {
wn := newTestNetwork(map[protocol.Tag]bool{
protocol.AgreementVoteTag: true,
protocol.VotePackedTag: true,
})

peer, ch := makePeer(wn, pfCompressedProposal|pfCompressedVoteVpack)
wn.maybeSendMessagesOfInterest(peer, nil)

select {
case msg := <-ch:
require.Equal(t, protocol.MsgOfInterestTag, protocol.Tag(msg.data[:2]))

decoded, err := unmarshallMessageOfInterest(msg.data[2:])
require.NoError(t, err)

require.Contains(t, decoded, protocol.AgreementVoteTag)
require.True(t, decoded[protocol.AgreementVoteTag])
_, hasVP := decoded[protocol.VotePackedTag]
require.False(t, hasVP, "VP tag should be filtered for legacy peers")
default:
t.Fatal("expected MOI message for legacy peer")
}
})

t.Run("retains VP for peers with stateful support", func(t *testing.T) {
wn := newTestNetwork(map[protocol.Tag]bool{
protocol.AgreementVoteTag: true,
protocol.VotePackedTag: true,
})

peer, ch := makePeer(wn, pfCompressedProposal|pfCompressedVoteVpack|pfCompressedVoteVpackStateful256)

wn.maybeSendMessagesOfInterest(peer, nil)

select {
case msg := <-ch:
require.Equal(t, protocol.MsgOfInterestTag, protocol.Tag(msg.data[:2]))

decoded, err := unmarshallMessageOfInterest(msg.data[2:])
require.NoError(t, err)

require.Contains(t, decoded, protocol.AgreementVoteTag)
require.True(t, decoded[protocol.AgreementVoteTag])
require.Contains(t, decoded, protocol.VotePackedTag)
require.True(t, decoded[protocol.VotePackedTag], "expected VP tag for peer with stateful support")
default:
t.Fatal("expected MOI message for stateful peer")
}
})

t.Run("gracefully handles configuration without VP tag", func(t *testing.T) {
wn := newTestNetwork(map[protocol.Tag]bool{
protocol.AgreementVoteTag: true,
})

peer, ch := makePeer(wn, pfCompressedProposal|pfCompressedVoteVpack)
wn.maybeSendMessagesOfInterest(peer, nil)

select {
case msg := <-ch:
require.Equal(t, protocol.MsgOfInterestTag, protocol.Tag(msg.data[:2]))

decoded, err := unmarshallMessageOfInterest(msg.data[2:])
require.NoError(t, err)

require.Contains(t, decoded, protocol.AgreementVoteTag)
require.True(t, decoded[protocol.AgreementVoteTag])
_, hasVP := decoded[protocol.VotePackedTag]
require.False(t, hasVP)
default:
t.Fatal("expected MOI message when VP is absent from configuration")
}
})

t.Run("skips sending when peer generation matches", func(t *testing.T) {
wn := newTestNetwork(map[protocol.Tag]bool{
protocol.AgreementVoteTag: true,
protocol.VotePackedTag: true,
})

peer, ch := makePeer(wn, pfCompressedProposal|pfCompressedVoteVpack)
peer.messagesOfInterestGeneration.Store(wn.messagesOfInterestGeneration.Load())

wn.maybeSendMessagesOfInterest(peer, nil)

select {
case <-ch:
t.Fatal("did not expect MOI message when generations already match")
default:
}
})
}