-
Notifications
You must be signed in to change notification settings - Fork 188
/
Copy pathconnect_test.go
129 lines (106 loc) · 3.54 KB
/
connect_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package integration
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
)
func Connect(t *testing.T, instances []*Instance) {
// first, create a map of all instances and a queue for each
lookup := make(map[flow.Identifier]*Instance)
for _, in := range instances {
lookup[in.localID] = in
}
// then, for each instance, initialize a wired up communicator
for _, sender := range instances {
sender := sender // avoid capturing loop variable in closure
*sender.notifier = *NewMockedCommunicatorConsumer()
sender.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(
func(args mock.Arguments) {
proposal, ok := args[0].(*flow.Proposal)
require.True(t, ok)
// sender should always have the parent
sender.updatingBlocks.RLock()
_, exists := sender.headers[proposal.Header.ParentID]
sender.updatingBlocks.RUnlock()
if !exists {
t.Fatalf("parent for proposal not found (sender: %x, parent: %x)", sender.localID, proposal.Header.ParentID)
}
// convert into proposal immediately
hotstuffProposal := model.SignedProposalFromFlow(proposal)
// store locally and loop back to engine for processing
sender.ProcessBlock(hotstuffProposal)
// check if we should block the outgoing proposal
if sender.blockPropOut(hotstuffProposal) {
return
}
// iterate through potential receivers
for _, receiver := range instances {
// we should skip ourselves always
if receiver.localID == sender.localID {
continue
}
// check if we should block the incoming proposal
if receiver.blockPropIn(hotstuffProposal) {
continue
}
receiver.ProcessBlock(hotstuffProposal)
}
},
)
sender.notifier.On("OnOwnVote", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(
func(args mock.Arguments) {
blockID, ok := args[0].(flow.Identifier)
require.True(t, ok)
view, ok := args[1].(uint64)
require.True(t, ok)
sigData, ok := args[2].([]byte)
require.True(t, ok)
recipientID, ok := args[3].(flow.Identifier)
require.True(t, ok)
// convert into vote
vote := model.VoteFromFlow(sender.localID, blockID, view, sigData)
// get the receiver
receiver, exists := lookup[recipientID]
if !exists {
t.Fatalf("recipient doesn't exist (sender: %x, receiver: %x)", sender.localID, recipientID)
}
// if we are next leader we should be receiving our own vote
if recipientID != sender.localID {
// check if we should block the outgoing vote
if sender.blockVoteOut(vote) {
return
}
// check if e should block the incoming vote
if receiver.blockVoteIn(vote) {
return
}
}
// submit the vote to the receiving event loop (non-blocking)
receiver.queue <- vote
},
)
sender.notifier.On("OnOwnTimeout", mock.Anything).Run(
func(args mock.Arguments) {
timeoutObject, ok := args[0].(*model.TimeoutObject)
require.True(t, ok)
// iterate through potential receivers
for _, receiver := range instances {
// we should skip ourselves always
if receiver.localID == sender.localID {
continue
}
// check if we should block the outgoing value
if sender.blockTimeoutObjectOut(timeoutObject) {
continue
}
// check if we should block the incoming value
if receiver.blockTimeoutObjectIn(timeoutObject) {
continue
}
receiver.queue <- timeoutObject
}
})
}
}