Skip to content

Commit

Permalink
dv: move to svs alo
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 6, 2025
1 parent 6e3b82d commit 6fd6712
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 273 deletions.
Binary file modified dv/config/schema.tlv
Binary file not shown.
4 changes: 1 addition & 3 deletions dv/config/schema.trust
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// Prefix table Sync
#pfx_svs: #network/"32=DV"/"32=PFS" <= #router_cert
// Prefix table data
#pfx_data: #router/"32=DV"/"32=PFX"/_/_ <= #router_cert
// Prefix table snapshot
#pfx_snap: #router/"32=DV"/"32=PFX"/_/"32=SNAP" <= #router_cert
#pfx_data: #router/#pfx_svs/_/_ <= #router_cert

// Certificate definitions
#network_cert: #network/#KEY
Expand Down
25 changes: 11 additions & 14 deletions dv/dv/advert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func (a *advertModule) generate() {
}

func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64) {
// debounce; wait before fetching, then check if this is still the latest
// sequence number known for this neighbor
time.Sleep(10 * time.Millisecond)
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

if ns := a.dv.neighbors.Get(nName); ns == nil || ns.AdvertBoot != bootTime || ns.AdvertSeq != seqNo {
return
}
Expand All @@ -52,24 +52,21 @@ func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64)
WithVersion(seqNo)

a.dv.client.Consume(advName, func(state ndn.ConsumeState) {
go func() {
fetchErr := state.Error()
if fetchErr != nil {
log.Warn(a, "Failed to fetch advertisement", "name", state.Name(), "err", fetchErr)
time.Sleep(1 * time.Second) // wait on error
if err := state.Error(); err != nil {
log.Warn(a, "Failed to fetch advertisement", "name", state.Name(), "err", err)
time.AfterFunc(1*time.Second, func() {
a.dataFetch(nName, bootTime, seqNo)
return
}
})
return
}

// Process the advertisement
a.dataHandler(nName, seqNo, state.Content())
}()
// Process the advertisement
go a.dataHandler(nName, seqNo, state.Content())
})
}

// Received advertisement Data
func (a *advertModule) dataHandler(nName enc.Name, seqNo uint64, data enc.Wire) {
// Lock DV state
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

Expand Down
4 changes: 3 additions & 1 deletion dv/dv/advert_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ func (a *advertModule) onStateVector(sv *spec_svs.StateVector, faceId uint64, ac
ns.AdvertBoot = entry.BootstrapTime
ns.AdvertSeq = entry.SeqNo

go a.dataFetch(node.Name, entry.BootstrapTime, entry.SeqNo)
time.AfterFunc(10*time.Millisecond, func() { // debounce
a.dataFetch(node.Name, entry.BootstrapTime, entry.SeqNo)
})
}

// Update FIB if needed
Expand Down
90 changes: 0 additions & 90 deletions dv/dv/prefix_sync.go

This file was deleted.

35 changes: 27 additions & 8 deletions dv/dv/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ type Router struct {
// advertisement module
advert advertModule
// prefix table svs instance
pfxSvs *ndn_sync.SvSync
pfxSvs *ndn_sync.SvsALO
// prefix table svs subscriptions
pfxSubs map[uint64]enc.Name
}

// Create a new DV router.
Expand Down Expand Up @@ -104,18 +106,32 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
objDir: object.NewMemoryFifoDir(32), // keep last few advertisements
}

// Create sync groups
dv.pfxSvs = ndn_sync.NewSvSync(ndn_sync.SvSyncOpts{
Client: dv.client,
GroupPrefix: config.PrefixTableSyncPrefix(),
OnUpdate: func(ssu ndn_sync.SvSyncUpdate) { go dv.onPfxSyncUpdate(ssu) },
BootTime: dv.advert.bootTime,
// Join prefix table sync group
dv.pfxSvs = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
Name: config.RouterDataPrefix(),
Svs: ndn_sync.SvSyncOpts{
Client: dv.client,
GroupPrefix: config.PrefixTableSyncPrefix(),
BootTime: dv.advert.bootTime,
},
Snapshot: &ndn_sync.SnapshotNodeLatest{
Client: dv.client,
SnapMe: func() (enc.Wire, error) {
return dv.pfx.Snap(), nil
},
Threshold: 50,
},
})
dv.pfxSubs = make(map[uint64]enc.Name)

// Create tables
dv.neighbors = table.NewNeighborTable(config, dv.nfdc)
dv.rib = table.NewRib(config)
dv.pfx = table.NewPrefixTable(config, dv.client, dv.pfxSvs)
dv.pfx = table.NewPrefixTable(config, func(w enc.Wire) {
if _, err := dv.pfxSvs.Publish(w); err != nil {
log.Error(dv, "Failed to publish prefix table update", "err", err)
}
})
dv.fib = table.NewFib(config, dv.nfdc)

return dv, nil
Expand Down Expand Up @@ -170,6 +186,9 @@ func (dv *Router) Start() (err error) {
dv.rib.Set(dv.config.RouterName(), dv.config.RouterName(), 0)
dv.advert.generate()

// Initialize prefix table
dv.pfx.Reset()

for {
select {
case <-dv.heartbeat.C:
Expand Down
45 changes: 42 additions & 3 deletions dv/dv/table_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/named-data/ndnd/dv/table"
enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/log"
"github.com/named-data/ndnd/std/sync"
)

// Compute the RIB chnages for this neighbor
Expand Down Expand Up @@ -55,7 +56,7 @@ func (dv *Router) ribUpdate(ns *table.NeighborState) {
go func() {
dv.fibUpdate()
dv.advert.generate()
dv.prefixDataFetchAll()
dv.prefixSubsUpdate()
}()
}
}
Expand Down Expand Up @@ -107,14 +108,14 @@ func (dv *Router) fibUpdate() {
}

// Update paths to all routers from RIB
for _, router := range dv.rib.Entries() {
for hash, router := range dv.rib.Entries() {
// Skip if this is us
if router.Name().Equal(dv.config.RouterName()) {
continue
}

// Get FIB entry to reach this router
fes := dv.rib.GetFibEntries(dv.neighbors, router.Name().Hash())
fes := dv.rib.GetFibEntries(dv.neighbors, hash)

// Add entry to the router itself
routerPrefix := router.Name().Append(enc.NewKeywordComponent("DV"))
Expand All @@ -137,3 +138,41 @@ func (dv *Router) fibUpdate() {
}
dv.fib.RemoveUnmarked()
}

// prefixSubsUpdate updates the prefix table subscriptions
func (dv *Router) prefixSubsUpdate() {
dv.mutex.Lock()
defer dv.mutex.Unlock()

// Get all prefixes from the RIB
for hash, router := range dv.rib.Entries() {
if router.Name().Equal(dv.config.RouterName()) {
continue
}

if _, ok := dv.pfxSubs[hash]; !ok {
log.Info(dv, "Router is now reachable", "name", router.Name())
dv.pfxSubs[hash] = router.Name()

dv.pfxSvs.SubscribePublisher(router.Name(), func(sp sync.SvsPub) {
dv.mutex.Lock()
defer dv.mutex.Unlock()

// Both snapshots and normal data are handled the same way
if dirty := dv.pfx.Apply(sp.Content); dirty {
// Update the local fib if prefix table changed
go dv.fibUpdate() // expensive
}
})
}
}

// Remove dead subscriptions
for hash, name := range dv.pfxSubs {
if !dv.rib.Has(name) {
log.Info(dv, "Router is now unreachable", "name", name)
dv.pfxSvs.UnsubscribePublisher(name)
delete(dv.pfxSubs, hash)
}
}
}
Loading

0 comments on commit 6fd6712

Please sign in to comment.