Skip to content

Commit 0173c02

Browse files
authored
Improve test coverage of chain exchange package (#931)
Add tests that assert: * pubsub validation correctness * swarm chain discovery with bound cache size * eviction of instances * notification completeness Part of #869
1 parent bfa43d9 commit 0173c02

File tree

1 file changed

+274
-8
lines changed

1 file changed

+274
-8
lines changed

chainexchange/pubsub_test.go

+274-8
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99

1010
"github.com/filecoin-project/go-f3/chainexchange"
1111
"github.com/filecoin-project/go-f3/gpbft"
12+
"github.com/filecoin-project/go-f3/internal/clock"
1213
"github.com/libp2p/go-libp2p"
1314
pubsub "github.com/libp2p/go-libp2p-pubsub"
15+
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1416
"github.com/stretchr/testify/require"
1517
)
1618

@@ -108,6 +110,278 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
108110
require.Equal(t, instance, notifications[0].instance)
109111
require.EqualExportedValues(t, ecChain, notifications[0].chain)
110112

113+
// Assert instance is no longer found once removed.
114+
require.NoError(t, subject.RemoveChainsByInstance(ctx, instance+1))
115+
chain, found = subject.GetChainByInstance(ctx, instance, key)
116+
require.False(t, found)
117+
require.Nil(t, chain)
118+
119+
require.NoError(t, subject.Shutdown(ctx))
120+
})
121+
}
122+
}
123+
124+
func TestSwarm(t *testing.T) {
125+
const (
126+
topicName = "fish"
127+
swarmSize = 100
128+
)
129+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
130+
t.Cleanup(cancel)
131+
132+
currentInstance := gpbft.InstanceProgress{
133+
Input: &gpbft.ECChain{
134+
TipSets: []*gpbft.TipSet{
135+
{Epoch: 0, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
136+
},
137+
},
138+
}
139+
140+
mnet := mocknet.New()
141+
swarm := make([]*chainexchange.PubSubChainExchange, swarmSize)
142+
for i := range swarm {
143+
host, err := mnet.GenPeer()
144+
require.NoError(t, err)
145+
ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithFloodPublish(true), pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
146+
require.NoError(t, err)
147+
subject, err := chainexchange.NewPubSubChainExchange(
148+
chainexchange.WithProgress(func() (instant gpbft.InstanceProgress) {
149+
return currentInstance
150+
}),
151+
chainexchange.WithPubSub(ps),
152+
chainexchange.WithTopicName(topicName),
153+
chainexchange.WithTopicScoreParams(nil),
154+
chainexchange.WithMaxTimestampAge(time.Minute),
155+
chainexchange.WithCompression(true),
156+
chainexchange.WithMaxWantedChainsPerInstance(6),
157+
chainexchange.WithMaxDiscoveredChainsPerInstance(6),
158+
)
159+
require.NoError(t, err)
160+
require.NotNil(t, subject)
161+
require.NoError(t, subject.Start(ctx))
162+
swarm[i] = subject
163+
}
164+
165+
require.NoError(t, mnet.LinkAll())
166+
require.NoError(t, mnet.ConnectAllButSelf())
167+
168+
someChain := currentInstance.Input.BaseChain().Append(
169+
&gpbft.TipSet{Epoch: 1, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
170+
&gpbft.TipSet{Epoch: 2, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
171+
&gpbft.TipSet{Epoch: 3, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
172+
&gpbft.TipSet{Epoch: 4, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
173+
&gpbft.TipSet{Epoch: 5, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
174+
)
175+
176+
// Attempt to get the chain from all the nodes in the swarm to mark it as wanted.
177+
for _, cx := range swarm {
178+
_, found := cx.GetChainByInstance(ctx, currentInstance.ID, someChain.Key())
179+
require.False(t, found)
180+
}
181+
182+
require.NoError(t, swarm[0].Broadcast(ctx, chainexchange.Message{
183+
Instance: currentInstance.ID,
184+
Chain: someChain,
185+
Timestamp: time.Now().UnixMilli(),
186+
}))
187+
188+
require.Eventually(t, func() bool {
189+
for _, cx := range swarm {
190+
_, found := cx.GetChainByInstance(ctx, currentInstance.ID, someChain.Key())
191+
if !found {
192+
return false
193+
}
194+
}
195+
return true
196+
}, 5*time.Second, 100*time.Millisecond)
197+
}
198+
199+
func TestValidation(t *testing.T) {
200+
201+
var (
202+
validBaseChain = &gpbft.ECChain{
203+
TipSets: []*gpbft.TipSet{
204+
{Epoch: 0, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
205+
},
206+
}
207+
anotherValidChain = &gpbft.ECChain{
208+
TipSets: []*gpbft.TipSet{
209+
{Epoch: 1, Key: []byte("barreleye"), PowerTable: gpbft.MakeCid([]byte("pt"))},
210+
},
211+
}
212+
validChain = validBaseChain.Append(anotherValidChain.TipSets...)
213+
invalidChain = &gpbft.ECChain{
214+
TipSets: []*gpbft.TipSet{
215+
{Epoch: 1, Key: []byte("lobster"), PowerTable: gpbft.MakeCid([]byte("pt"))},
216+
{Epoch: 0, Key: nil, PowerTable: gpbft.MakeCid([]byte("pt"))},
217+
},
218+
}
219+
)
220+
221+
for _, test := range []struct {
222+
name string
223+
messageAt func(clock *clock.Mock) chainexchange.Message
224+
givenProgress gpbft.InstanceProgress
225+
wantErr string
226+
}{
227+
{
228+
name: "instance too old",
229+
messageAt: func(clock *clock.Mock) chainexchange.Message {
230+
return chainexchange.Message{
231+
Instance: 10,
232+
Chain: validChain,
233+
Timestamp: clock.Now().UnixMilli(),
234+
}
235+
},
236+
givenProgress: gpbft.InstanceProgress{
237+
Instant: gpbft.Instant{
238+
ID: 11,
239+
},
240+
Input: validChain,
241+
},
242+
wantErr: pubsub.RejectValidationIgnored,
243+
},
244+
{
245+
name: "timestamp too old",
246+
messageAt: func(clock *clock.Mock) chainexchange.Message {
247+
ts := clock.Now().UnixMilli()
248+
clock.Add(time.Hour)
249+
return chainexchange.Message{
250+
Instance: 10,
251+
Chain: validChain,
252+
Timestamp: ts,
253+
}
254+
},
255+
givenProgress: gpbft.InstanceProgress{
256+
Instant: gpbft.Instant{
257+
ID: 10,
258+
},
259+
Input: validChain,
260+
},
261+
wantErr: pubsub.RejectValidationIgnored,
262+
},
263+
{
264+
name: "invalid chain",
265+
messageAt: func(clock *clock.Mock) chainexchange.Message {
266+
return chainexchange.Message{
267+
Instance: 10,
268+
Chain: invalidChain,
269+
Timestamp: clock.Now().UnixMilli(),
270+
}
271+
},
272+
givenProgress: gpbft.InstanceProgress{
273+
Instant: gpbft.Instant{
274+
ID: 10,
275+
},
276+
Input: validChain,
277+
},
278+
wantErr: pubsub.RejectValidationFailed,
279+
},
280+
{
281+
name: "unexpected base chain",
282+
messageAt: func(clock *clock.Mock) chainexchange.Message {
283+
return chainexchange.Message{
284+
Instance: 10,
285+
Chain: anotherValidChain,
286+
Timestamp: clock.Now().UnixMilli(),
287+
}
288+
},
289+
givenProgress: gpbft.InstanceProgress{
290+
Instant: gpbft.Instant{
291+
ID: 10,
292+
},
293+
Input: validChain,
294+
},
295+
wantErr: pubsub.RejectValidationFailed,
296+
},
297+
{
298+
name: "fresh enough timestamp",
299+
messageAt: func(clock *clock.Mock) chainexchange.Message {
300+
ts := clock.Now().UnixMilli()
301+
clock.Add(5 * time.Second)
302+
return chainexchange.Message{
303+
Instance: 10,
304+
Chain: validChain,
305+
Timestamp: ts,
306+
}
307+
},
308+
givenProgress: gpbft.InstanceProgress{
309+
Instant: gpbft.Instant{
310+
ID: 10,
311+
},
312+
Input: validChain,
313+
},
314+
},
315+
{
316+
name: "future instance",
317+
messageAt: func(clock *clock.Mock) chainexchange.Message {
318+
return chainexchange.Message{
319+
Instance: 11,
320+
Chain: anotherValidChain,
321+
Timestamp: clock.Now().UnixMilli(),
322+
}
323+
},
324+
givenProgress: gpbft.InstanceProgress{
325+
Instant: gpbft.Instant{
326+
ID: 10,
327+
},
328+
Input: validChain,
329+
},
330+
},
331+
{
332+
name: "too far in future instance",
333+
messageAt: func(clock *clock.Mock) chainexchange.Message {
334+
return chainexchange.Message{
335+
Instance: 110,
336+
Chain: anotherValidChain,
337+
Timestamp: clock.Now().UnixMilli(),
338+
}
339+
},
340+
givenProgress: gpbft.InstanceProgress{
341+
Instant: gpbft.Instant{
342+
ID: 10,
343+
},
344+
Input: validChain,
345+
},
346+
wantErr: pubsub.RejectValidationIgnored,
347+
},
348+
} {
349+
t.Run(test.name, func(t *testing.T) {
350+
const topicName = "fish"
351+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
352+
clck := clock.NewMock()
353+
host, err := libp2p.New()
354+
require.NoError(t, err)
355+
t.Cleanup(func() {
356+
cancel()
357+
require.NoError(t, host.Close())
358+
})
359+
360+
ps, err := pubsub.NewGossipSub(ctx, host, pubsub.WithFloodPublish(true))
361+
require.NoError(t, err)
362+
363+
subject, err := chainexchange.NewPubSubChainExchange(
364+
chainexchange.WithProgress(func() (instant gpbft.InstanceProgress) { return test.givenProgress }),
365+
chainexchange.WithPubSub(ps),
366+
chainexchange.WithTopicName(topicName),
367+
chainexchange.WithTopicScoreParams(nil),
368+
chainexchange.WithMaxTimestampAge(10*time.Second),
369+
chainexchange.WithCompression(true),
370+
chainexchange.WithClock(clck),
371+
)
372+
require.NoError(t, err)
373+
require.NotNil(t, subject)
374+
375+
err = subject.Start(ctx)
376+
require.NoError(t, err)
377+
378+
err = subject.Broadcast(ctx, test.messageAt(clck))
379+
if test.wantErr != "" {
380+
require.ErrorContains(t, err, test.wantErr)
381+
} else {
382+
require.NoError(t, err)
383+
}
384+
111385
require.NoError(t, subject.Shutdown(ctx))
112386
})
113387
}
@@ -133,11 +407,3 @@ func (l *listener) getNotifications() []notification {
133407
defer l.mu.Unlock()
134408
return slices.Clone(l.notifications)
135409
}
136-
137-
// TODO: Add more tests, specifically:
138-
// - validation
139-
// - discovery through other chainexchange instance
140-
// - cache eviction/fixed memory footprint.
141-
// - fulfilment of chain from discovery to wanted in any order.
142-
// - spam
143-
// - fuzz

0 commit comments

Comments
 (0)