Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 6445258

Browse files
kortatuacud
authored andcommitted
network: Reconnect to the same peers on startup (#1844)
* network: stored current connected peers on shutdown so at startup first node will try to connect to the same peers on restart
1 parent cdcbd0a commit 6445258

File tree

2 files changed

+205
-21
lines changed

2 files changed

+205
-21
lines changed

network/hive.go

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import (
2929
"github.com/ethersphere/swarm/state"
3030
)
3131

32+
const connectionsKey = "conns"
33+
const addressesKey = "peers"
34+
3235
/*
3336
Hive is the logistic manager of the swarm
3437
@@ -135,32 +138,35 @@ func (h *Hive) Stop() error {
135138
// at each iteration, ask the overlay driver to suggest the most preferred peer to connect to
136139
// as well as advertises saturation depth if needed
137140
func (h *Hive) connect() {
138-
loop:
139141
for {
140142
select {
141143
case <-h.ticker.C:
142-
addr, depth, changed := h.SuggestPeer()
143-
if h.Discovery && changed {
144-
NotifyDepth(uint8(depth), h.Kademlia)
145-
}
146-
if addr == nil {
147-
continue loop
148-
}
149-
150-
log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
151-
under, err := enode.ParseV4(string(addr.Under()))
152-
if err != nil {
153-
log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
154-
continue loop
155-
}
156-
log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
157-
h.addPeer(under)
144+
h.tickHive()
158145
case <-h.done:
159-
break loop
146+
return
160147
}
161148
}
162149
}
163150

151+
func (h *Hive) tickHive() {
152+
addr, depth, changed := h.SuggestPeer()
153+
if h.Discovery && changed {
154+
NotifyDepth(uint8(depth), h.Kademlia)
155+
}
156+
if addr != nil {
157+
log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
158+
underA := addr.Under()
159+
s := string(underA)
160+
under, err := enode.ParseV4(s)
161+
if err != nil {
162+
log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
163+
return
164+
}
165+
log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
166+
h.addPeer(under)
167+
}
168+
}
169+
164170
// Run protocol run function
165171
func (h *Hive) Run(p *BzzPeer) error {
166172
h.trackPeer(p)
@@ -232,7 +238,7 @@ func (h *Hive) Peer(id enode.ID) *BzzPeer {
232238
// loadPeers, savePeer implement persistence callback/
233239
func (h *Hive) loadPeers() error {
234240
var as []*BzzAddr
235-
err := h.Store.Get("peers", &as)
241+
err := h.Store.Get(addressesKey, &as)
236242
if err != nil {
237243
if err == state.ErrNotFound {
238244
log.Info(fmt.Sprintf("hive %08x: no persisted peers found", h.BaseAddr()[:4]))
@@ -249,13 +255,40 @@ func (h *Hive) loadPeers() error {
249255
}
250256
}
251257
log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4]))
258+
errRegistering := h.Register(as...)
259+
var conns []*BzzAddr
260+
err = h.Store.Get(connectionsKey, &conns)
261+
if err != nil {
262+
if err == state.ErrNotFound {
263+
log.Info(fmt.Sprintf("hive %08x: no persisted peer connections found", h.BaseAddr()[:4]))
264+
} else {
265+
log.Warn(fmt.Sprintf("hive %08x: error loading connections: %v", h.BaseAddr()[:4], err))
266+
}
252267

253-
return h.Register(as...)
268+
} else {
269+
go h.connectInitialPeers(conns)
270+
}
271+
return errRegistering
272+
}
273+
274+
func (h *Hive) connectInitialPeers(conns []*BzzAddr) {
275+
log.Info(fmt.Sprintf("%08x hive connectInitialPeers() With %v saved connections", h.BaseAddr()[:4], len(conns)))
276+
for _, addr := range conns {
277+
log.Trace(fmt.Sprintf("%08x hive connect() suggested initial %08x", h.BaseAddr()[:4], addr.Address()[:4]))
278+
under, err := enode.ParseV4(string(addr.Under()))
279+
if err != nil {
280+
log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
281+
continue
282+
}
283+
log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
284+
h.addPeer(under)
285+
}
254286
}
255287

256288
// savePeers, savePeer implement persistence callback/
257289
func (h *Hive) savePeers() error {
258290
var peers []*BzzAddr
291+
var conns []*BzzAddr
259292
h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int) bool {
260293
if pa == nil {
261294
log.Warn(fmt.Sprintf("empty addr: %v", i))
@@ -265,8 +298,18 @@ func (h *Hive) savePeers() error {
265298
peers = append(peers, pa)
266299
return true
267300
})
268-
if err := h.Store.Put("peers", peers); err != nil {
301+
302+
h.Kademlia.EachConn(nil, 256, func(p *Peer, i int) bool {
303+
log.Trace("saving connected peer", "OAddr", hexutil.Encode(p.OAddr), "UAddr", p.UAddr)
304+
conns = append(conns, p.BzzAddr)
305+
return true
306+
})
307+
if err := h.Store.Put(addressesKey, peers); err != nil {
269308
return fmt.Errorf("could not save peers: %v", err)
270309
}
310+
311+
if err := h.Store.Put(connectionsKey, conns); err != nil {
312+
return fmt.Errorf("could not save peer connections: %v", err)
313+
}
271314
return nil
272315
}

network/hive_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,14 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/ethereum/go-ethereum/common"
26+
"github.com/ethereum/go-ethereum/common/hexutil"
2527
"github.com/ethereum/go-ethereum/crypto"
2628
"github.com/ethereum/go-ethereum/p2p"
29+
"github.com/ethereum/go-ethereum/p2p/enode"
30+
"github.com/ethersphere/swarm/log"
2731
p2ptest "github.com/ethersphere/swarm/p2p/testing"
32+
"github.com/ethersphere/swarm/pot"
2833
"github.com/ethersphere/swarm/state"
2934
)
3035

@@ -175,3 +180,139 @@ func TestHiveStatePersistence(t *testing.T) {
175180
t.Fatalf("%d peers left over: %v", len(peers), peers)
176181
}
177182
}
183+
184+
// TestHiveStateConnections connect the node to some peers and then after cleanup/save in store those peers
185+
// are retrieved and used as suggested peer initially.
186+
func TestHiveStateConnections(t *testing.T) {
187+
dir, err := ioutil.TempDir("", "hive_test_store")
188+
if err != nil {
189+
t.Fatal(err)
190+
}
191+
defer os.RemoveAll(dir)
192+
193+
const peersCount = 5
194+
195+
nodeIdToBzzAddr := make(map[string]*BzzAddr)
196+
addedChan := make(chan struct{}, 5)
197+
startHive := func(t *testing.T, dir string) (h *Hive, cleanupFunc func()) {
198+
store, err := state.NewDBStore(dir)
199+
if err != nil {
200+
t.Fatal(err)
201+
}
202+
203+
params := NewHiveParams()
204+
params.Discovery = false
205+
206+
prvkey, err := crypto.GenerateKey()
207+
if err != nil {
208+
t.Fatal(err)
209+
}
210+
211+
h = NewHive(params, NewKademlia(PrivateKeyToBzzKey(prvkey), NewKadParams()), store)
212+
s := p2ptest.NewProtocolTester(prvkey, 0, func(p *p2p.Peer, rw p2p.MsgReadWriter) error { return nil })
213+
214+
if err := h.Start(s.Server); err != nil {
215+
t.Fatal(err)
216+
}
217+
//Close ticker to avoid interference with initial peer suggestion
218+
h.ticker.Stop()
219+
//Overwrite addPeer so the Node is added as a peer automatically.
220+
// The related Overlay address is retrieved from nodeIdToBzzAddr where it has been saved before
221+
h.addPeer = func(node *enode.Node) {
222+
bzzAddr := nodeIdToBzzAddr[encodeId(node.ID())]
223+
if bzzAddr == nil {
224+
t.Fatalf("Enode [%v] not found in saved peers!", encodeId(node.ID()))
225+
}
226+
bzzPeer := newConnPeerLocal(bzzAddr.Address(), h.Kademlia)
227+
h.On(bzzPeer)
228+
addedChan <- struct{}{}
229+
}
230+
231+
cleanupFunc = func() {
232+
err := h.Stop()
233+
if err != nil {
234+
t.Fatal(err)
235+
}
236+
237+
s.Stop()
238+
}
239+
return h, cleanupFunc
240+
}
241+
242+
h1, cleanup1 := startHive(t, dir)
243+
peers := make(map[string]bool)
244+
for i := 0; i < peersCount; i++ {
245+
raddr := RandomBzzAddr()
246+
h1.Register(raddr)
247+
peers[raddr.String()] = true
248+
}
249+
const initialPeers = 5
250+
for i := 0; i < initialPeers; i++ {
251+
suggestedPeer, _, _ := h1.SuggestPeer()
252+
if suggestedPeer != nil {
253+
testAddPeer(suggestedPeer, h1, nodeIdToBzzAddr)
254+
}
255+
256+
}
257+
numConns := h1.conns.Size()
258+
connAddresses := make(map[string]string)
259+
h1.EachConn(h1.base, 255, func(peer *Peer, i int) bool {
260+
key := hexutil.Encode(peer.Address())
261+
connAddresses[key] = key
262+
return true
263+
})
264+
log.Warn("After 5 suggestions", "numConns", numConns)
265+
cleanup1()
266+
267+
// start the hive and check that we suggest previous connected peers
268+
h2, _ := startHive(t, dir)
269+
// there should be at some point 5 conns
270+
connsAfterLoading := 0
271+
iterations := 0
272+
connsAfterLoading = h2.conns.Size()
273+
for connsAfterLoading != numConns && iterations < 5 {
274+
select {
275+
case <-addedChan:
276+
connsAfterLoading = h2.conns.Size()
277+
case <-time.After(1 * time.Second):
278+
iterations++
279+
}
280+
log.Trace("Iteration waiting for initial connections", "numConns", connsAfterLoading, "iterations", iterations)
281+
}
282+
if connsAfterLoading != numConns {
283+
t.Errorf("Expected 5 peer connecteds from previous execution but got %v", connsAfterLoading)
284+
}
285+
h2.EachConn(h2.base, 255, func(peer *Peer, i int) bool {
286+
key := hexutil.Encode(peer.Address())
287+
if connAddresses[key] != key {
288+
t.Errorf("Expected address %v to be in connections as it was a previous peer connected", key)
289+
} else {
290+
log.Warn("Previous peer connected again", "addr", key)
291+
}
292+
return true
293+
})
294+
}
295+
296+
// Create a Peer with the suggested address and store the relationshsip enode -> BzzAddr for later retrieval
297+
func testAddPeer(suggestedPeer *BzzAddr, h1 *Hive, nodeIdToBzzAddr map[string]*BzzAddr) {
298+
byteAddresses := suggestedPeer.Address()
299+
bzzPeer := newConnPeerLocal(byteAddresses, h1.Kademlia)
300+
nodeIdToBzzAddr[encodeId(bzzPeer.ID())] = bzzPeer.BzzAddr
301+
bzzPeer.kad = h1.Kademlia
302+
h1.On(bzzPeer)
303+
}
304+
305+
func encodeId(id enode.ID) string {
306+
addr := id[:]
307+
return hexutil.Encode(addr)
308+
}
309+
310+
// We create a test Peer with underlay address to localhost and using overlay address provided
311+
func newConnPeerLocal(addr []byte, kademlia *Kademlia) *Peer {
312+
hash := [common.HashLength]byte{}
313+
copy(hash[:], addr)
314+
potAddress := pot.Address(hash)
315+
peer := newDiscPeer(potAddress)
316+
peer.kad = kademlia
317+
return peer
318+
}

0 commit comments

Comments
 (0)