Skip to content

Commit 358b674

Browse files
committedJan 3, 2025·
dv: switch prefix table to object client
1 parent 73ec1ea commit 358b674

File tree

7 files changed

+114
-122
lines changed

7 files changed

+114
-122
lines changed
 

‎dv/dv/prefix_sync.go

+31-42
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ import (
77
"github.com/named-data/ndnd/dv/tlv"
88
enc "github.com/named-data/ndnd/std/encoding"
99
"github.com/named-data/ndnd/std/log"
10-
"github.com/named-data/ndnd/std/ndn"
10+
"github.com/named-data/ndnd/std/object"
1111
ndn_sync "github.com/named-data/ndnd/std/sync"
12-
"github.com/named-data/ndnd/std/utils"
1312
)
1413

1514
// Fetch all required prefix data
@@ -60,31 +59,26 @@ func (dv *Router) prefixDataFetch(nodeId enc.Name) {
6059
// Fetch the prefix data
6160
log.Debugf("prefixDataFetch: fetching prefix data for %s [%d => %d]", nodeId, router.Known, router.Latest)
6261

63-
cfg := &ndn.InterestConfig{
64-
MustBeFresh: true,
65-
Lifetime: utils.IdPtr(4 * time.Second),
66-
Nonce: utils.ConvertNonce(dv.engine.Timer().Nonce()),
67-
}
68-
69-
isSnap := router.Latest-router.Known > 100
62+
// Prefix object for other router
7063
name := append(nodeId,
7164
enc.NewStringComponent(enc.TypeKeywordNameComponent, "DV"),
7265
enc.NewStringComponent(enc.TypeKeywordNameComponent, "PFX"),
7366
)
74-
if isSnap {
75-
name = append(name, enc.NewStringComponent(enc.TypeKeywordNameComponent, "SNAP"))
76-
cfg.CanBePrefix = true
67+
if router.Latest-router.Known > table.PrefixTableSnapThreshold {
68+
// no version - discover the latest snapshot
69+
name = append(name, table.PrefixTableSnap)
7770
} else {
78-
name = append(name, enc.NewSequenceNumComponent(router.Known+1))
71+
name = append(name,
72+
enc.NewSequenceNumComponent(router.Known+1),
73+
enc.NewVersionComponent(0), // immutable
74+
)
7975
}
8076

81-
interest, err := dv.engine.Spec().MakeInterest(name, cfg, nil, nil)
82-
if err != nil {
83-
log.Warnf("prefixDataFetch: failed to make Interest: %+v", err)
84-
return
85-
}
77+
dv.client.Consume(name, func(state *object.ConsumeState) bool {
78+
if !state.IsComplete() {
79+
return true
80+
}
8681

87-
err = dv.engine.Express(interest, func(args ndn.ExpressCallbackArgs) {
8882
go func() {
8983
// Done fetching, restart if needed
9084
defer func() {
@@ -95,30 +89,22 @@ func (dv *Router) prefixDataFetch(nodeId enc.Name) {
9589
go dv.prefixDataFetch(nodeId) // recheck
9690
}()
9791

98-
// Sleep this goroutine if no data was received
99-
if args.Result != ndn.InterestResultData {
100-
log.Warnf("prefixDataFetch: failed to fetch prefix data %s: %d", interest.FinalName, args.Result)
101-
102-
// see advertDataFetch
103-
if args.Result != ndn.InterestResultTimeout {
104-
time.Sleep(2 * time.Second)
105-
} else {
106-
time.Sleep(100 * time.Millisecond)
107-
}
92+
// Wait before retry if there was a failure
93+
if err := state.Error(); err != nil {
94+
log.Warnf("prefixDataFetch: failed to fetch prefix data %s: %+v", name, err)
95+
time.Sleep(1 * time.Second)
10896
return
10997
}
11098

111-
dv.processPrefixData(args.Data, router)
99+
dv.processPrefixData(state.Name(), state.Content(), router)
112100
}()
101+
102+
return true
113103
})
114-
if err != nil {
115-
log.Warnf("prefixDataFetch: failed to express Interest: %+v", err)
116-
return
117-
}
118104
}
119105

120-
func (dv *Router) processPrefixData(data ndn.Data, router *table.PrefixTableRouter) {
121-
ops, err := tlv.ParsePrefixOpList(enc.NewWireReader(data.Content()), true)
106+
func (dv *Router) processPrefixData(name enc.Name, data []byte, router *table.PrefixTableRouter) {
107+
ops, err := tlv.ParsePrefixOpList(enc.NewBufferReader(data), true)
122108
if err != nil {
123109
log.Warnf("prefixDataFetch: failed to parse PrefixOpList: %+v", err)
124110
return
@@ -127,18 +113,21 @@ func (dv *Router) processPrefixData(data ndn.Data, router *table.PrefixTableRout
127113
dv.mutex.Lock()
128114
defer dv.mutex.Unlock()
129115

130-
// Update sequence number
131-
dataName := data.Name()
132-
seqNo := dataName[len(dataName)-1]
133-
if seqNo.Typ != enc.TypeSequenceNumNameComponent {
116+
// Get sequence number from name
117+
seqNo := name[len(name)-2]
118+
if seqNo.Equal(table.PrefixTableSnap) && name[len(name)-1].Typ == enc.TypeVersionNameComponent {
119+
// version is sequence number for snapshot
120+
seqNo = name[len(name)-1]
121+
} else if seqNo.Typ != enc.TypeSequenceNumNameComponent {
122+
// version is immutable, sequence number is in name
134123
log.Warnf("prefixDataFetch: unexpected sequence number type: %s", seqNo.Typ)
135124
return
136125
}
137126

138127
// Update the prefix table
139128
router.Known = seqNo.NumberVal()
140129
if dv.pfx.Apply(ops) {
141-
// Update the local fib if prefix table changed (very expensive)
142-
go dv.fibUpdate()
130+
// Update the local fib if prefix table changed
131+
go dv.fibUpdate() // very expensive
143132
}
144133
}

‎dv/dv/router.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/named-data/ndnd/std/log"
1212
"github.com/named-data/ndnd/std/ndn"
1313
mgmt "github.com/named-data/ndnd/std/ndn/mgmt_2022"
14+
"github.com/named-data/ndnd/std/object"
1415
ndn_sync "github.com/named-data/ndnd/std/sync"
1516
"github.com/named-data/ndnd/std/utils"
1617
)
@@ -20,6 +21,8 @@ type Router struct {
2021
engine ndn.Engine
2122
// config for this router
2223
config *config.Config
24+
// object client
25+
client *object.Client
2326
// nfd management thread
2427
nfdc *nfdc.NfdMgmtThread
2528
// single mutex for all operations
@@ -59,6 +62,7 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
5962
dv := &Router{
6063
engine: engine,
6164
config: config,
65+
client: object.NewClient(engine, object.NewMemoryStore()),
6266
nfdc: nfdc.NewNfdMgmtThread(engine),
6367
mutex: sync.Mutex{},
6468
}
@@ -74,7 +78,7 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
7478
// Create tables
7579
dv.neighbors = table.NewNeighborTable(config, dv.nfdc)
7680
dv.rib = table.NewRib(config)
77-
dv.pfx = table.NewPrefixTable(config, engine, dv.pfxSvs)
81+
dv.pfx = table.NewPrefixTable(config, dv.client, dv.pfxSvs)
7882
dv.fib = table.NewFib(config, dv.nfdc)
7983

8084
return dv, nil
@@ -94,6 +98,10 @@ func (dv *Router) Start() (err error) {
9498
defer dv.heartbeat.Stop()
9599
defer dv.deadcheck.Stop()
96100

101+
// Start object client
102+
dv.client.Start()
103+
defer dv.client.Stop()
104+
97105
// Start management thread
98106
go dv.nfdc.Start()
99107
defer dv.nfdc.Stop()
@@ -179,15 +187,6 @@ func (dv *Router) register() (err error) {
179187
return err
180188
}
181189

182-
// Prefix Data
183-
err = dv.engine.AttachHandler(dv.config.PrefixTableDataPrefix(),
184-
func(args ndn.InterestHandlerArgs) {
185-
go dv.pfx.OnDataInterest(args)
186-
})
187-
if err != nil {
188-
return err
189-
}
190-
191190
// Readvertise Data
192191
err = dv.engine.AttachHandler(dv.config.ReadvertisePrefix(),
193192
func(args ndn.InterestHandlerArgs) {

‎dv/scripts/add_routes.sh

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/bin/bash
2+
3+
# This test script adds and removes routes from a local DV forwarder.
4+
# This can be used to test a large number of changes to the RIB.
5+
6+
MAX_RIB_SIZE=3
7+
8+
ITER=0
9+
while true; do
10+
ITER=$((ITER+1))
11+
12+
RIB_SIZE=0
13+
while [ $RIB_SIZE -lt $MAX_RIB_SIZE ]; do
14+
RIB_SIZE=$((RIB_SIZE+1))
15+
ndnd fw route add prefix=/my/route/$RIB_SIZE/$ITER origin=65 face=1
16+
sleep 0.05
17+
done
18+
19+
sleep 0.1
20+
21+
RIB_SIZE=0
22+
while [ $RIB_SIZE -lt $MAX_RIB_SIZE ]; do
23+
RIB_SIZE=$((RIB_SIZE+1))
24+
ndnd fw route remove prefix=/my/route/$RIB_SIZE/$ITER origin=65 face=1
25+
sleep 0.05
26+
done
27+
done

‎dv/table/prefix_table.go

+26-69
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,27 @@
11
package table
22

33
import (
4-
"sync"
5-
"time"
6-
74
"github.com/named-data/ndnd/dv/config"
85
"github.com/named-data/ndnd/dv/tlv"
96
enc "github.com/named-data/ndnd/std/encoding"
107
"github.com/named-data/ndnd/std/log"
11-
"github.com/named-data/ndnd/std/ndn"
12-
"github.com/named-data/ndnd/std/security"
8+
"github.com/named-data/ndnd/std/object"
139
ndn_sync "github.com/named-data/ndnd/std/sync"
1410
"github.com/named-data/ndnd/std/utils"
1511
)
1612

13+
const PrefixTableSnapThreshold = 100
14+
15+
var PrefixTableSnap = enc.NewStringComponent(enc.TypeKeywordNameComponent, "SNAP")
16+
1717
type PrefixTable struct {
1818
config *config.Config
19-
engine ndn.Engine
19+
client *object.Client
2020
svs *ndn_sync.SvSync
2121

2222
routers map[uint64]*PrefixTableRouter
2323
me *PrefixTableRouter
2424

25-
repo map[uint64][]byte
26-
repoMutex sync.RWMutex
2725
snapshotAt uint64
2826
}
2927

@@ -41,19 +39,16 @@ type PrefixEntry struct {
4139

4240
func NewPrefixTable(
4341
config *config.Config,
44-
engine ndn.Engine,
42+
client *object.Client,
4543
svs *ndn_sync.SvSync,
4644
) *PrefixTable {
4745
pt := &PrefixTable{
4846
config: config,
49-
engine: engine,
47+
client: client,
5048
svs: svs,
5149

5250
routers: make(map[uint64]*PrefixTableRouter),
5351
me: nil,
54-
55-
repo: make(map[uint64][]byte),
56-
repoMutex: sync.RWMutex{},
5752
}
5853

5954
pt.me = pt.GetRouter(config.RouterName())
@@ -155,12 +150,19 @@ func (pt *PrefixTable) publishOp(content enc.Wire) {
155150
pt.me.Known = seq
156151
pt.me.Latest = seq
157152

158-
// Create the new data
159-
name := append(pt.config.PrefixTableDataPrefix(), enc.NewSequenceNumComponent(seq))
160-
pt.publish(name, content)
153+
// Produce the operation
154+
_, err := pt.client.Produce(object.ProduceArgs{
155+
Name: append(pt.config.PrefixTableDataPrefix(), enc.NewSequenceNumComponent(seq)),
156+
Content: content,
157+
Version: utils.IdPtr(uint64(0)), // immutable
158+
})
159+
if err != nil {
160+
log.Errorf("prefix-table: failed to produce op: %v", err)
161+
return
162+
}
161163

162164
// Create snapshot if needed
163-
if pt.snapshotAt-seq >= 100 {
165+
if seq-pt.snapshotAt >= PrefixTableSnapThreshold/2 {
164166
pt.publishSnap()
165167
}
166168
}
@@ -179,59 +181,14 @@ func (pt *PrefixTable) publishSnap() {
179181
})
180182
}
181183

182-
// Store snapshot in repo
183-
// TODO: this can be a segmented object
184-
pt.snapshotAt = pt.me.Latest
185-
snapPfx := append(pt.config.PrefixTableDataPrefix(),
186-
enc.NewStringComponent(enc.TypeKeywordNameComponent, "SNAP"))
187-
snapName := append(snapPfx, enc.NewSequenceNumComponent(pt.snapshotAt))
188-
pt.publish(snapName, snap.Encode())
189-
190-
// Point prefix to the snapshot
191-
pt.repoMutex.Lock()
192-
defer pt.repoMutex.Unlock()
193-
pt.repo[snapPfx.Hash()] = pt.repo[snapName.Hash()]
194-
}
195-
196-
func (pt *PrefixTable) publish(name enc.Name, content enc.Wire) {
197-
// TODO: sign the prefix table data
198-
signer := security.NewSha256Signer()
199-
200-
data, err := pt.engine.Spec().MakeData(
201-
name,
202-
&ndn.DataConfig{
203-
ContentType: utils.IdPtr(ndn.ContentTypeBlob),
204-
Freshness: utils.IdPtr(1 * time.Second),
205-
},
206-
content,
207-
signer)
184+
_, err := pt.client.Produce(object.ProduceArgs{
185+
Name: append(pt.config.PrefixTableDataPrefix(), PrefixTableSnap),
186+
Content: snap.Encode(),
187+
Version: utils.IdPtr(pt.me.Latest),
188+
})
208189
if err != nil {
209-
log.Warnf("prefix-table: publish failed to make data: %+v", err)
210-
return
211-
}
212-
213-
// Store the data packet in our mem repo
214-
pt.repoMutex.Lock()
215-
defer pt.repoMutex.Unlock()
216-
pt.repo[name.Hash()] = data.Wire.Join()
217-
}
218-
219-
// Received prefix data Interest
220-
func (pt *PrefixTable) OnDataInterest(args ndn.InterestHandlerArgs) {
221-
// TODO: remove old entries from repo
222-
223-
pt.repoMutex.RLock()
224-
defer pt.repoMutex.RUnlock()
225-
226-
// Find exact match in repo
227-
name := args.Interest.Name()
228-
if data := pt.repo[name.Hash()]; data != nil {
229-
err := args.Reply(enc.Wire{data})
230-
if err != nil {
231-
log.Warnf("prefix-table: failed to reply: %+v", err)
232-
}
233-
return
190+
log.Errorf("prefix-table: failed to produce snap: %v", err)
234191
}
235192

236-
log.Warnf("prefix-table: repo failed to find data for for %s", name)
193+
pt.snapshotAt = pt.me.Latest
237194
}

‎std/object/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ func (c *Client) Engine() ndn.Engine {
6767
return c.engine
6868
}
6969

70+
// Get the underlying store
71+
func (c *Client) Store() ndn.Store {
72+
return c.store
73+
}
74+
7075
// Main goroutine for all client processing
7176
func (c *Client) run() {
7277
for {

‎std/object/client_consume.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type ConsumeState struct {
4242

4343
// returns the name of the object being consumed
4444
func (a *ConsumeState) Name() enc.Name {
45-
return a.name
45+
return a.fetchName
4646
}
4747

4848
// returns the error that occurred during fetching

‎std/object/store_memory.go

+15
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,14 @@ func (s *MemoryStore) Rollback() error {
9090
return nil
9191
}
9292

93+
func (s *MemoryStore) MemSize() int {
94+
s.mutex.RLock()
95+
defer s.mutex.RUnlock()
96+
size := 0
97+
s.root.walk(func(n *memoryStoreNode) { size += len(n.wire) })
98+
return size
99+
}
100+
93101
func (n *memoryStoreNode) find(name enc.Name) *memoryStoreNode {
94102
if len(name) == 0 {
95103
return n
@@ -183,3 +191,10 @@ func (n *memoryStoreNode) merge(tx *memoryStoreNode) {
183191
}
184192
}
185193
}
194+
195+
func (n *memoryStoreNode) walk(f func(*memoryStoreNode)) {
196+
f(n)
197+
for _, child := range n.children {
198+
child.walk(f)
199+
}
200+
}

0 commit comments

Comments
 (0)
Please sign in to comment.