Skip to content

Commit eec1e41

Browse files
committed
boot: add DHT service with namespace rendezvous bootstrapping
Implement a new DHT service that uses namespace-based peer discovery: - Add DHT service that implements the Service interface - Use protocol namespace as rendezvous point for peer discovery - Implement continuous peer discovery and advertisement cycle - Add comprehensive tests verifying peer discovery and bootstrapping - Document test coverage and peer discovery methodology This enables peers to discover each other through the DHT using a shared namespace as a rendezvous point, improving the robustness of the peer discovery mechanism.
1 parent 0c3b294 commit eec1e41

File tree

5 files changed

+299
-2
lines changed

5 files changed

+299
-2
lines changed

boot/dht.go

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package boot
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"time"
7+
8+
"github.com/libp2p/go-libp2p/core/discovery"
9+
"github.com/libp2p/go-libp2p/core/peerstore"
10+
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
11+
"github.com/lthibault/jitterbug/v2"
12+
"github.com/wetware/go/system"
13+
)
14+
15+
// DHT implements a peer discovery service using the dual DHT.
16+
type DHT struct {
17+
Env *system.Env
18+
}
19+
20+
func (d DHT) String() string {
21+
return "dht"
22+
}
23+
24+
// Serve starts the DHT service and begins discovering peers.
25+
// It creates a RoutingDiscovery from the dual DHT and continuously
26+
// searches for peers in the given namespace.
27+
func (d DHT) Serve(ctx context.Context) error {
28+
// Create a RoutingDiscovery from the dual DHT
29+
disc := routing.NewRoutingDiscovery(d.Env.DHT)
30+
31+
// Get the namespace from the protocol path
32+
ns := system.Proto.String()
33+
34+
// Start discovering peers
35+
d.Env.Log().DebugContext(ctx, "service started",
36+
"service", d.String(),
37+
"namespace", ns)
38+
39+
jitter := jitterbug.Uniform{
40+
Source: rand.New(rand.NewSource(time.Now().UnixNano())),
41+
Min: peerstore.AddressTTL / 2,
42+
}
43+
44+
for {
45+
select {
46+
case <-ctx.Done():
47+
return ctx.Err()
48+
default:
49+
}
50+
51+
// Advertise ourselves in the namespace with 1 hour TTL
52+
ttl, err := disc.Advertise(ctx, ns, discovery.TTL(peerstore.AddressTTL))
53+
if err != nil {
54+
if ctx.Err() != nil {
55+
return ctx.Err()
56+
}
57+
d.Env.Log().WarnContext(ctx, "failed to advertise in DHT",
58+
"error", err,
59+
"service", d.String())
60+
continue
61+
}
62+
63+
d.Env.Log().DebugContext(ctx, "advertised in DHT",
64+
"service", d.String(),
65+
"namespace", ns,
66+
"ttl", ttl)
67+
68+
// Find peers in the namespace
69+
peerChan, err := disc.FindPeers(ctx, ns)
70+
if err != nil {
71+
if ctx.Err() != nil {
72+
return ctx.Err()
73+
}
74+
d.Env.Log().WarnContext(ctx, "failed to find peers",
75+
"error", err,
76+
"service", d.String())
77+
continue
78+
}
79+
80+
// Handle discovered peers
81+
for {
82+
select {
83+
case <-ctx.Done():
84+
return ctx.Err()
85+
case peer, ok := <-peerChan:
86+
if !ok {
87+
goto NEXT_ITERATION
88+
}
89+
if peer.ID != d.Env.Host.ID() { // skip self
90+
d.Env.HandlePeerFound(peer)
91+
}
92+
}
93+
}
94+
95+
NEXT_ITERATION:
96+
select {
97+
case <-ctx.Done():
98+
return ctx.Err()
99+
case <-time.After(jitter.Jitter(ttl)):
100+
}
101+
}
102+
}

boot/dht_test.go

+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package boot
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/libp2p/go-libp2p"
9+
"github.com/libp2p/go-libp2p-kad-dht/dual"
10+
"github.com/libp2p/go-libp2p/core/host"
11+
"github.com/libp2p/go-libp2p/core/peer"
12+
"github.com/stretchr/testify/require"
13+
"github.com/wetware/go/system"
14+
"golang.org/x/sync/errgroup"
15+
)
16+
17+
// TestDHT verifies that the DHT service correctly handles peer discovery and shutdown.
18+
// It tests:
19+
// 1. Creation and bootstrapping of two DHT nodes
20+
// 2. Connection establishment between peers
21+
// 3. Service startup and peer discovery
22+
// 4. Clean shutdown on context cancellation
23+
//
24+
// The test creates two libp2p hosts with their own DHT instances, connects them,
25+
// and verifies they can discover each other through the DHT service. It then ensures
26+
// the services shut down cleanly when their context is cancelled.
27+
func TestDHT(t *testing.T) {
28+
t.Parallel()
29+
30+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
31+
defer cancel()
32+
33+
// Create two hosts
34+
h1, err := libp2p.New()
35+
require.NoError(t, err)
36+
defer h1.Close()
37+
38+
h2, err := libp2p.New()
39+
require.NoError(t, err)
40+
defer h2.Close()
41+
42+
// Create DHT instances for both hosts
43+
dht1, err := dual.New(ctx, h1)
44+
require.NoError(t, err)
45+
defer dht1.Close()
46+
47+
dht2, err := dual.New(ctx, h2)
48+
require.NoError(t, err)
49+
defer dht2.Close()
50+
51+
// Bootstrap the DHTs
52+
err = dht1.Bootstrap(ctx)
53+
require.NoError(t, err)
54+
55+
err = dht2.Bootstrap(ctx)
56+
require.NoError(t, err)
57+
58+
// Create environments for both hosts
59+
env1 := &system.Env{
60+
Host: h1,
61+
DHT: dht1,
62+
}
63+
64+
env2 := &system.Env{
65+
Host: h2,
66+
DHT: dht2,
67+
}
68+
69+
// Connect the hosts
70+
err = connectHosts(h1, h2)
71+
require.NoError(t, err)
72+
73+
// Create DHT services
74+
d1 := &DHT{Env: env1}
75+
d2 := &DHT{Env: env2}
76+
77+
// Create a sub-context for the services that we can cancel independently
78+
svcCtx, svcCancel := context.WithCancel(ctx)
79+
defer svcCancel() // Ensure services are cancelled even if test fails
80+
81+
// Start DHT services with errgroup
82+
g := new(errgroup.Group)
83+
84+
g.Go(func() error {
85+
return d1.Serve(svcCtx)
86+
})
87+
88+
g.Go(func() error {
89+
return d2.Serve(svcCtx)
90+
})
91+
92+
// Wait for peer discovery
93+
require.Eventually(t, func() bool {
94+
return len(h1.Network().Peers()) > 0 && len(h2.Network().Peers()) > 0
95+
}, 3*time.Second, 100*time.Millisecond, "peers should discover each other")
96+
97+
// Verify that peers are connected
98+
require.Contains(t, h1.Network().Peers(), h2.ID(), "h1 should be connected to h2")
99+
require.Contains(t, h2.Network().Peers(), h1.ID(), "h2 should be connected to h1")
100+
101+
// Cancel service context
102+
svcCancel()
103+
104+
// Wait for services to shut down with timeout
105+
done := make(chan error, 1)
106+
go func() {
107+
done <- g.Wait()
108+
}()
109+
110+
select {
111+
case err := <-done:
112+
require.ErrorIs(t, err, context.Canceled)
113+
case <-time.After(2 * time.Second):
114+
t.Fatal("timeout waiting for services to shut down")
115+
}
116+
}
117+
118+
// TestDHT_HandlePeerFound verifies that the DHT service correctly handles peer information
119+
// when a new peer is discovered. It tests:
120+
// 1. Proper storage of peer addresses in the peerstore
121+
// 2. Correct handling of multiaddress sets
122+
// 3. Address persistence after peer discovery
123+
//
124+
// The test creates a mock peer with a set of addresses and verifies that when HandlePeerFound
125+
// is called, those addresses are correctly stored in the peerstore. It uses string-based
126+
// comparison of address sets to handle variations in address ordering and representation.
127+
func TestDHT_HandlePeerFound(t *testing.T) {
128+
t.Parallel()
129+
130+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
131+
defer cancel()
132+
133+
// Create a host
134+
h, err := libp2p.New()
135+
require.NoError(t, err)
136+
defer h.Close()
137+
138+
// Create DHT instance
139+
d, err := dual.New(ctx, h)
140+
require.NoError(t, err)
141+
defer d.Close()
142+
143+
// Create environment
144+
env := &system.Env{
145+
Host: h,
146+
DHT: d,
147+
}
148+
149+
// Create a mock peer
150+
mockPeer, err := libp2p.New()
151+
require.NoError(t, err)
152+
defer mockPeer.Close()
153+
154+
// Create peer info
155+
peerInfo := peer.AddrInfo{
156+
ID: mockPeer.ID(),
157+
Addrs: mockPeer.Addrs(),
158+
}
159+
160+
// Test HandlePeerFound
161+
env.HandlePeerFound(peerInfo)
162+
163+
// Verify peer is in peerstore
164+
storedAddrs := h.Peerstore().Addrs(mockPeer.ID())
165+
require.NotEmpty(t, storedAddrs, "peer should be added to peerstore")
166+
167+
// Convert addresses to strings for easier comparison
168+
expectedAddrs := make(map[string]struct{})
169+
for _, addr := range peerInfo.Addrs {
170+
expectedAddrs[addr.String()] = struct{}{}
171+
}
172+
173+
storedAddrStrings := make(map[string]struct{})
174+
for _, addr := range storedAddrs {
175+
storedAddrStrings[addr.String()] = struct{}{}
176+
}
177+
178+
// Compare address sets
179+
require.Equal(t, expectedAddrs, storedAddrStrings, "stored addresses should match expected addresses")
180+
}
181+
182+
// connectHosts establishes a direct connection between two libp2p hosts.
183+
// It takes the source (h1) and target (h2) hosts and attempts to connect them
184+
// using h2's peer info (ID and addresses). This is a helper function used
185+
// to establish initial connectivity in tests.
186+
func connectHosts(h1, h2 host.Host) error {
187+
h2Info := peer.AddrInfo{
188+
ID: h2.ID(),
189+
Addrs: h2.Addrs(),
190+
}
191+
return h1.Connect(context.Background(), h2Info)
192+
}

cmd/internal/serve/serve.go

+2
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,12 @@ func serve(c *cli.Context) error {
154154

155155
// Initialize and bind system services to the supervisor:
156156
// - MDNS for local network discovery
157+
// - DHT for peer discovery over the network
157158
// - P2P for distributed communication
158159
// - HTTP API server
159160
for _, s := range []suture.Service{
160161
&boot.MDNS{Env: &env},
162+
&boot.DHT{Env: &env},
161163
&glia.P2P{Env: &env, Router: rt},
162164
// &glia.Unix{Env: env, Router: rt, Path c.String("unix")},
163165
&glia.HTTP{Env: &env, Router: rt, ListenAddr: c.String("http")},

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/libp2p/go-libp2p-kad-dht v0.29.0
1414
github.com/lmittmann/tint v1.0.4
1515
github.com/lthibault/go-libp2p-inproc-transport v0.4.1
16+
github.com/lthibault/jitterbug/v2 v2.2.2
1617
github.com/mr-tron/base58 v1.2.0
1718
github.com/multiformats/go-multiaddr v0.15.0
1819
github.com/pkg/errors v0.9.1

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,8 @@ github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc=
421421
github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
422422
github.com/lthibault/go-libp2p-inproc-transport v0.4.1 h1:tuhLlTsQ7mtBPZYFnXu3ZELk7FVAPXLmmB6hWp0uuZA=
423423
github.com/lthibault/go-libp2p-inproc-transport v0.4.1/go.mod h1:KQxTLypXhS8rRKp8zZcbPzIwEi/O1PNR0xe9MZNPSxY=
424+
github.com/lthibault/jitterbug/v2 v2.2.2 h1:v4+0tqryaI/TlYzgYE0Vhz7ha6Jtz4yRjmBP+PcqWPQ=
425+
github.com/lthibault/jitterbug/v2 v2.2.2/go.mod h1:evaHKX+60nFbFnEvGNPybQMJ5vXay9auziApDGo47Sw=
424426
github.com/lthibault/util v0.0.12 h1:JDulMYR1TudCGBOob14p6q6ak6/zovl20uU4Pj7umuA=
425427
github.com/lthibault/util v0.0.12/go.mod h1:cGZBEILcwYh+UkEdWKgP1vEOlayHa5dh2ay8yixCJQ0=
426428
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
@@ -816,8 +818,6 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
816818
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
817819
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
818820
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
819-
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
820-
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
821821
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
822822
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
823823
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

0 commit comments

Comments
 (0)