-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreplication.go
132 lines (110 loc) · 4.42 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package simplex
import (
"bytes"
"fmt"
"math"
"go.uber.org/zap"
)
type ReplicationState struct {
logger Logger
enabled bool
maxRoundWindow uint64
comm Communication
id NodeID
// latest seq requested
lastSequenceRequested uint64
// highest sequence we have received a finalization certificate for
highestFCertReceived *FinalizationCertificate
// received
receivedFinalizationCertificates map[uint64]FinalizedBlock
}
func NewReplicationState(logger Logger, comm Communication, id NodeID, maxRoundWindow uint64, enabled bool) *ReplicationState {
return &ReplicationState{
logger: logger,
enabled: enabled,
comm: comm,
id: id,
maxRoundWindow: maxRoundWindow,
receivedFinalizationCertificates: make(map[uint64]FinalizedBlock),
}
}
// isReplicationComplete returns true if the replication state has caught up to the highest finalization certificate.
// TODO: when we add notarization requests, this function should also make sure we have caught up to the highest notarization.
func (r *ReplicationState) isReplicationComplete(nextSeqToCommit uint64) bool {
return nextSeqToCommit > r.highestFCertReceived.Finalization.Seq
}
func (r *ReplicationState) collectFutureFinalizationCertificates(fCert *FinalizationCertificate, currentRound uint64, nextSeqToCommit uint64) {
if !r.enabled {
return
}
fCertSeq := fCert.Finalization.Seq
// Don't exceed the max round window
endSeq := math.Min(float64(fCertSeq), float64(r.maxRoundWindow+currentRound))
if r.highestFCertReceived == nil || fCertSeq > r.highestFCertReceived.Finalization.Seq {
r.highestFCertReceived = fCert
}
// Node is behind, but we've already sent messages to collect future fCerts
if r.lastSequenceRequested >= uint64(endSeq) {
return
}
startSeq := math.Max(float64(nextSeqToCommit), float64(r.lastSequenceRequested))
r.logger.Debug("Node is behind, requesting missing finalization certificates", zap.Uint64("seq", fCertSeq), zap.Uint64("startSeq", uint64(startSeq)), zap.Uint64("endSeq", uint64(endSeq)))
r.sendFutureCertficatesRequests(uint64(startSeq), uint64(endSeq))
}
// sendFutureCertficatesRequests sends requests for future finalization certificates for the
// range of sequences [start, end] <- inclusive
func (r *ReplicationState) sendFutureCertficatesRequests(start uint64, end uint64) {
seqs := make([]uint64, (end+1)-start)
for i := start; i <= end; i++ {
seqs[i-start] = i
}
roundRequest := &ReplicationRequest{
FinalizationCertificateRequest: &FinalizationCertificateRequest{
Sequences: seqs,
},
}
msg := &Message{ReplicationRequest: roundRequest}
requestFrom := r.requestFrom()
r.lastSequenceRequested = end
r.comm.SendMessage(msg, requestFrom)
}
// requestFrom returns a node to send a message request to
// this is used to ensure that we are not sending a message to ourselves
func (r *ReplicationState) requestFrom() NodeID {
nodes := r.comm.ListNodes()
for _, node := range nodes {
if !node.Equals(r.id) {
return node
}
}
return NodeID{}
}
// maybeCollectFutureFinalizationCertificates attempts to collect future finalization certificates if
// there are more fCerts to be collected and the round has caught up.
func (r *ReplicationState) maybeCollectFutureFinalizationCertificates(round uint64, nextSequenceToCommit uint64) {
if r.highestFCertReceived == nil {
return
}
if r.lastSequenceRequested >= r.highestFCertReceived.Finalization.Seq {
return
}
// we send out more requests once our seq has caught up to 1/2 of the maxRoundWindow
if round+r.maxRoundWindow/2 > r.lastSequenceRequested {
r.collectFutureFinalizationCertificates(r.highestFCertReceived, round, nextSequenceToCommit)
}
}
func (r *ReplicationState) StoreFinalizedBlock(data FinalizedBlock) error {
// ensure the finalization certificate we get relates to the block
blockDigest := data.Block.BlockHeader().Digest
if !bytes.Equal(blockDigest[:], data.FCert.Finalization.Digest[:]) {
return fmt.Errorf("finalization certificate does not match the block")
}
// don't store the same finalization certificate twice
if _, ok := r.receivedFinalizationCertificates[data.FCert.Finalization.Seq]; ok {
return nil
}
r.receivedFinalizationCertificates[data.FCert.Finalization.Seq] = data
return nil
}