Skip to content

Commit da20eba

Browse files
ccealgorandskiyjannotti
authored
network: add VP message type for stateful vote compression (#6466)
This follows #6351 by integrating it with the WebsocketNetwork implementation, using a new VP message tag. Co-authored-by: Pavel Zbitskiy <[email protected]> Co-authored-by: John Jannotti <[email protected]>
1 parent 1352391 commit da20eba

File tree

19 files changed

+999
-101
lines changed

19 files changed

+999
-101
lines changed

config/config_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,10 @@ func (l tLogger) Infof(fmts string, args ...interface{}) {
861861
l.t.Logf(fmts, args...)
862862
}
863863

864+
func (l tLogger) Warnf(fmts string, args ...interface{}) {
865+
l.t.Logf(fmts, args...)
866+
}
867+
864868
// TestEnsureAndResolveGenesisDirs confirms that paths provided in the config are resolved to absolute paths and are created if relevant
865869
func TestEnsureAndResolveGenesisDirs(t *testing.T) {
866870
partitiontest.PartitionTest(t)

config/localTemplate.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,15 @@ type Local struct {
646646
// EnableVoteCompression controls whether vote compression is enabled for websocket networks
647647
EnableVoteCompression bool `version[36]:"true"`
648648

649+
// StatefulVoteCompressionTableSize controls the size of the per-peer tables used for vote compression.
650+
// If 0, stateful vote compression is disabled (but stateless vote compression will still be used if
651+
// EnableVoteCompression is true). This value should be a power of 2 between 16 and 2048, inclusive.
652+
// The per-peer overhead for stateful compression in one direction (from peer A => B) is 224 bytes times
653+
// this value, plus 800 bytes of fixed overhead; it is twice that if votes are also being sent from B => A.
654+
// So the default value of 2048 requires 459,552 bytes of memory per peer for stateful vote compression
655+
// in one direction, or 919,104 bytes if both directions are used.
656+
StatefulVoteCompressionTableSize uint `version[37]:"2048"`
657+
649658
// EnableBatchVerification controls whether ed25519 batch verification is enabled
650659
EnableBatchVerification bool `version[37]:"true"`
651660
}
@@ -871,6 +880,7 @@ func (cfg *Local) ResolveLogPaths(rootDir string) (liveLog, archive string) {
871880

872881
type logger interface {
873882
Infof(format string, args ...interface{})
883+
Warnf(format string, args ...interface{})
874884
}
875885

876886
// EnsureAndResolveGenesisDirs will resolve the supplied config paths to absolute paths, and will create the genesis directories of each
@@ -1059,3 +1069,35 @@ func (cfg *Local) TracksCatchpoints() bool {
10591069
}
10601070
return false
10611071
}
1072+
1073+
// NormalizedVoteCompressionTableSize validates and normalizes the StatefulVoteCompressionTableSize config value.
1074+
// Supported values are powers of 2 in the range [16, 2048].
1075+
// Values >= 2048 clamp to 2048.
1076+
// Values 1-15 are below the minimum and return 0 (disabled).
1077+
// Values between supported powers of 2 round down to the nearest supported value.
1078+
// Logs a message if the configured value is adjusted.
1079+
// Returns the normalized size.
1080+
func (cfg Local) NormalizedVoteCompressionTableSize(log logger) uint {
1081+
configured := cfg.StatefulVoteCompressionTableSize
1082+
if configured == 0 {
1083+
return 0
1084+
}
1085+
if configured < 16 {
1086+
log.Warnf("StatefulVoteCompressionTableSize configured as %d is invalid (minimum 16). Stateful vote compression disabled.", configured)
1087+
return 0
1088+
}
1089+
// Round down to nearest power of 2 within supported range [16, 2048]
1090+
supportedSizes := []uint{2048, 1024, 512, 256, 128, 64, 32, 16}
1091+
for _, size := range supportedSizes {
1092+
if configured >= size {
1093+
if configured != size {
1094+
log.Infof("StatefulVoteCompressionTableSize configured as %d, using nearest supported value: %d", configured, size)
1095+
}
1096+
return size
1097+
}
1098+
}
1099+
1100+
// Should never reach here given the checks above
1101+
log.Warnf("StatefulVoteCompressionTableSize configured as %d is invalid. Stateful vote compression disabled.", configured)
1102+
return 0
1103+
}

config/local_defaults.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ var defaultLocal = Local{
140140
RestReadTimeoutSeconds: 15,
141141
RestWriteTimeoutSeconds: 120,
142142
RunHosted: false,
143+
StatefulVoteCompressionTableSize: 2048,
143144
StateproofDir: "",
144145
StorageEngine: "sqlite",
145146
SuggestedFeeBlockHistory: 3,

installer/config.json.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
"RestReadTimeoutSeconds": 15,
120120
"RestWriteTimeoutSeconds": 120,
121121
"RunHosted": false,
122+
"StatefulVoteCompressionTableSize": 2048,
122123
"StateproofDir": "",
123124
"StorageEngine": "sqlite",
124125
"SuggestedFeeBlockHistory": 3,

network/metrics.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricNa
117117

118118
// var networkP2PGossipSubSentMsgs = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_message_sent", Description: "Number of complete messages that were sent to the network through gossipsub"})
119119

120+
var networkVoteBroadcastCompressedBytes = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vote_compressed_bytes_broadcast_total", Description: "Total AV message bytes broadcast after applying stateless compression"})
121+
var networkVoteBroadcastUncompressedBytes = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vote_uncompressed_bytes_broadcast_total", Description: "Total AV message bytes broadcast before applying stateless compression"})
122+
var networkVPCompressionErrors = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_compression_errors_total", Description: "Total number of stateful vote compression errors"})
123+
var networkVPDecompressionErrors = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_decompression_errors_total", Description: "Total number of stateful vote decompression errors"})
124+
var networkVPAbortMessagesSent = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_abort_messages_sent_total", Description: "Total number of vpack abort messages sent to peers"})
125+
var networkVPAbortMessagesReceived = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_abort_messages_received_total", Description: "Total number of vpack abort messages received from peers"})
126+
var networkVPCompressedBytesSent = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_compressed_bytes_sent_total", Description: "Total VP message bytes sent, after compressing AV to VP messages"})
127+
var networkVPUncompressedBytesSent = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_uncompressed_bytes_sent_total", Description: "Total VP message bytes sent, before compressing AV to VP messages"})
128+
120129
var _ = pubsub.RawTracer(pubsubMetricsTracer{})
121130

122131
// pubsubMetricsTracer is a tracer for pubsub events used to track metrics.

network/msgCompressor.go

Lines changed: 152 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"fmt"
2222
"io"
23+
"sync/atomic"
2324

2425
"github.com/DataDog/zstd"
2526

@@ -32,6 +33,19 @@ var zstdCompressionMagic = [4]byte{0x28, 0xb5, 0x2f, 0xfd}
3233

3334
const zstdCompressionLevel = zstd.BestSpeed
3435

36+
// voteCompressionAbortMessage is a single-byte payload sent with a VP tag to signal
37+
// that stateful compression should be disabled for this connection.
38+
// When either encoder or decoder encounters an error, it sends VP+0xFF to notify
39+
// the peer, then both sides disable stateful compression and fall back to AV messages.
40+
const voteCompressionAbortMessage byte = 0xFF
41+
42+
// voteCompressionError wraps errors from stateful vote compression/decompression.
43+
// This error type signals that an abort message should be sent to the peer.
44+
type voteCompressionError struct{ err error }
45+
46+
func (e *voteCompressionError) Error() string { return e.err.Error() }
47+
func (e *voteCompressionError) Unwrap() error { return e.err }
48+
3549
// zstdCompressMsg returns a concatenation of a tag and compressed data
3650
func zstdCompressMsg(tbytes []byte, d []byte) ([]byte, string) {
3751
bound := max(zstd.CompressBound(len(d)),
@@ -74,16 +88,30 @@ func vpackCompressVote(tbytes []byte, d []byte) ([]byte, string) {
7488
// and should be larger.
7589
const MaxDecompressedMessageSize = 20 * 1024 * 1024 // some large enough value
7690

77-
// wsPeerMsgDataDecoder performs optional incoming messages conversion.
78-
// At the moment it only supports zstd decompression for payload proposal,
79-
// and vpack decompression for votes.
80-
type wsPeerMsgDataDecoder struct {
91+
// wsPeerMsgCodec performs optional message compression/decompression for certain
92+
// types of messages. It handles:
93+
// - zstd compression for PP proposals (outgoing not implemented)
94+
// - stateless vpack compression for AV votes (outgoing not implemented)
95+
// - stateful vpack compression for VP votes (both directions)
96+
type wsPeerMsgCodec struct {
8197
log logging.Logger
8298
origin string
8399

84-
// actual converter(s)
100+
// decompressors
85101
ppdec zstdProposalDecompressor
86102
avdec vpackVoteDecompressor
103+
104+
// stateful vote compression (if enabled).
105+
// If either side encounters an error, or if we receive an abort, we disable
106+
// stateful compression entirely and fall back to stateless AV traffic.
107+
statefulVoteEnabled atomic.Bool
108+
statefulVoteTableSize uint
109+
statefulVoteEnc *vpack.StatefulEncoder
110+
statefulVoteDec *vpack.StatefulDecoder
111+
}
112+
113+
func (c *wsPeerMsgCodec) switchOffStatefulVoteCompression() {
114+
c.statefulVoteEnabled.Store(false)
87115
}
88116

89117
type zstdProposalDecompressor struct{}
@@ -124,8 +152,58 @@ func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {
124152
}
125153
}
126154

127-
func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, error) {
128-
if tag == protocol.ProposalPayloadTag {
155+
// compress attempts to compress an outgoing message.
156+
// Currently only supports stateful vote compression.
157+
// Returns compressed data and nil error if compression succeeds,
158+
// (nil, nil) if compression is not applicable,
159+
// (nil, vpError) if stateful compression fails (caller should send abort message).
160+
func (c *wsPeerMsgCodec) compress(tag protocol.Tag, data []byte) ([]byte, error) {
161+
if tag == protocol.AgreementVoteTag && c.statefulVoteEnabled.Load() {
162+
// Skip the tag bytes (first 2 bytes are the AV tag)
163+
if len(data) < 2 {
164+
return nil, nil
165+
}
166+
// Input data is AV+stateless-compressed from broadcast
167+
// We only need to apply stateful compression on top
168+
statelessCompressed := data[2:]
169+
170+
// initialize stateful encoder on first use
171+
if c.statefulVoteEnc == nil {
172+
enc, err := vpack.NewStatefulEncoder(c.statefulVoteTableSize)
173+
if err != nil {
174+
c.log.Warnf("failed to initialize stateful vote encoder for peer %s, disabling: %v", c.origin, err)
175+
networkVPCompressionErrors.Inc(nil)
176+
c.switchOffStatefulVoteCompression()
177+
return nil, &voteCompressionError{err: err}
178+
}
179+
c.statefulVoteEnc = enc
180+
c.log.Debugf("stateful vote encoder initialized for peer %s (table size %d)", c.origin, c.statefulVoteTableSize)
181+
}
182+
183+
tagLen := len(protocol.VotePackedTag)
184+
result := make([]byte, tagLen+vpack.MaxCompressedVoteSize)
185+
copy(result, protocol.VotePackedTag)
186+
// apply stateful compression to stateless-compressed data
187+
compressed, err := c.statefulVoteEnc.Compress(result[tagLen:], statelessCompressed)
188+
if err != nil {
189+
c.log.Warnf("stateful vote compression failed for peer %s, disabling: %v", c.origin, err)
190+
networkVPCompressionErrors.Inc(nil)
191+
c.switchOffStatefulVoteCompression()
192+
return nil, &voteCompressionError{err: err}
193+
}
194+
finalResult := result[:tagLen+len(compressed)]
195+
// Track stateful compression layer only: stateless-compressed input → VP output
196+
networkVPUncompressedBytesSent.AddUint64(uint64(len(statelessCompressed)), nil)
197+
networkVPCompressedBytesSent.AddUint64(uint64(len(compressed)), nil)
198+
return finalResult, nil
199+
}
200+
return nil, nil
201+
}
202+
203+
// decompress handles incoming message decompression based on tag type
204+
func (c *wsPeerMsgCodec) decompress(tag protocol.Tag, data []byte) ([]byte, error) {
205+
switch tag {
206+
case protocol.ProposalPayloadTag:
129207
// sender might support compressed payload but fail to compress for whatever reason,
130208
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
131209
if c.ppdec.accept(data) {
@@ -136,7 +214,8 @@ func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, e
136214
return res, nil
137215
}
138216
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
139-
} else if tag == protocol.AgreementVoteTag {
217+
218+
case protocol.AgreementVoteTag:
140219
if c.avdec.enabled {
141220
res, err := c.avdec.convert(data)
142221
if err != nil {
@@ -146,12 +225,59 @@ func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, e
146225
}
147226
return res, nil
148227
}
228+
229+
case protocol.VotePackedTag:
230+
// Check for abort message first
231+
if len(data) == 1 && data[0] == voteCompressionAbortMessage {
232+
c.log.Infof("Received VP abort message from peer %s, disabling stateful encoding", c.origin)
233+
networkVPAbortMessagesReceived.Inc(nil)
234+
// Peer signalled stateful compression should stop; disable both encode and decode paths.
235+
c.switchOffStatefulVoteCompression()
236+
// Drop this message silently (it's just a control signal)
237+
return nil, nil
238+
}
239+
240+
if !c.statefulVoteEnabled.Load() {
241+
c.log.Debugf("dropping VP message from %s: stateful decompression disabled", c.origin)
242+
return nil, nil
243+
}
244+
if c.statefulVoteDec == nil {
245+
dec, err := vpack.NewStatefulDecoder(c.statefulVoteTableSize)
246+
if err != nil {
247+
c.log.Warnf("failed to initialize stateful vote decoder for peer %s, disabling: %v", c.origin, err)
248+
networkVPDecompressionErrors.Inc(nil)
249+
c.switchOffStatefulVoteCompression()
250+
return nil, &voteCompressionError{err: err}
251+
}
252+
c.statefulVoteDec = dec
253+
c.log.Debugf("stateful vote decoder initialized for peer %s (table size %d)", c.origin, c.statefulVoteTableSize)
254+
}
255+
// StatefulDecoder decompresses to "stateless-compressed" format
256+
statelessCompressed, err := c.statefulVoteDec.Decompress(make([]byte, 0, vpack.MaxCompressedVoteSize), data)
257+
if err != nil {
258+
c.log.Warnf("stateful vote decompression failed for peer %s, disabling: %v", c.origin, err)
259+
networkVPDecompressionErrors.Inc(nil)
260+
c.switchOffStatefulVoteCompression()
261+
return nil, &voteCompressionError{err: err}
262+
}
263+
264+
var statelessDec vpack.StatelessDecoder
265+
voteBody, err := statelessDec.DecompressVote(make([]byte, 0, vpack.MaxMsgpackVoteSize), statelessCompressed)
266+
if err != nil {
267+
c.log.Warnf("stateless vote decompression failed after stateful for peer %s, disabling: %v", c.origin, err)
268+
networkVPDecompressionErrors.Inc(nil)
269+
c.switchOffStatefulVoteCompression()
270+
return nil, &voteCompressionError{err: err}
271+
}
272+
273+
return voteBody, nil
149274
}
275+
150276
return data, nil
151277
}
152278

153-
func makeWsPeerMsgDataDecoder(wp *wsPeer) *wsPeerMsgDataDecoder {
154-
c := wsPeerMsgDataDecoder{
279+
func makeWsPeerMsgCodec(wp *wsPeer) *wsPeerMsgCodec {
280+
c := wsPeerMsgCodec{
155281
log: wp.log,
156282
origin: wp.originAddress,
157283
}
@@ -164,5 +290,21 @@ func makeWsPeerMsgDataDecoder(wp *wsPeer) *wsPeerMsgDataDecoder {
164290
dec: vpack.NewStatelessDecoder(),
165291
}
166292
}
293+
294+
// Initialize stateful compression negotiation details if both nodes support it
295+
// Stateful compression requires stateless compression to be available since VP messages
296+
// decompress in two stages: VP → stateless-compressed → raw vote
297+
if wp.enableVoteCompression && // this node's configuration allows vote compression
298+
wp.voteCompressionTableSize > 0 && // this node's configuration allows stateful vote compression
299+
wp.vpackVoteCompressionSupported() && // the other side has advertised vote compression
300+
wp.vpackStatefulCompressionSupported() { // the other side has advertised stateful vote compression
301+
tableSize := wp.getBestVpackTableSize()
302+
if tableSize > 0 {
303+
c.statefulVoteEnabled.Store(true)
304+
c.statefulVoteTableSize = tableSize
305+
wp.log.Debugf("Stateful compression negotiated with table size %d (our max: %d)", tableSize, wp.voteCompressionTableSize)
306+
}
307+
}
308+
167309
return &c
168310
}

0 commit comments

Comments
 (0)