Skip to content

Commit

Permalink
dv: refactor prefix table sync
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 6, 2025
1 parent e005807 commit 868c19c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 55 deletions.
2 changes: 1 addition & 1 deletion dv/dv/advert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ func (a *advertModule) dataHandler(nName enc.Name, seqNo uint64, data enc.Wire)

// Update the local advertisement list
ns.Advert = advert
go a.dv.ribUpdate(ns)
go a.dv.updateRib(ns)
}
2 changes: 1 addition & 1 deletion dv/dv/advert_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,6 @@ func (a *advertModule) onStateVector(sv *spec_svs.StateVector, faceId uint64, ac

// Update FIB if needed
if fibDirty {
go a.dv.fibUpdate()
go a.dv.updateFib()
}
}
83 changes: 46 additions & 37 deletions dv/dv/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,24 @@ type Router struct {
// deadcheck for neighbors
deadcheck *time.Ticker

// neighbor table
neighbors *table.NeighborTable
// routing information base
rib *table.Rib
// prefix table
pfx *table.PrefixTable
// forwarding table
fib *table.Fib

// advertisement module
advert advertModule

// prefix table
pfx *table.PrefixTable
// prefix table svs instance
pfxSvs *ndn_sync.SvsALO
// prefix table svs subscriptions
pfxSubs map[uint64]enc.Name
// prefix table fifo
pfxFifo *object.MemoryFifoDir

// neighbor table
neighbors *table.NeighborTable
// routing information base
rib *table.Rib
// forwarding table
fib *table.Fib
}

// Create a new DV router.
Expand Down Expand Up @@ -112,37 +112,12 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
objDir: object.NewMemoryFifoDir(32), // keep last few advertisements
}

// Join prefix table sync group
dv.pfxFifo = object.NewMemoryFifoDir(PrefixSnapThreshold * 3)
dv.pfxSvs = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
Name: config.RouterDataPrefix(),
Svs: ndn_sync.SvSyncOpts{
Client: dv.client,
GroupPrefix: config.PrefixTableSyncPrefix(),
BootTime: dv.advert.bootTime,
},
Snapshot: &ndn_sync.SnapshotNodeLatest{
Client: dv.client,
SnapMe: func(name enc.Name) (enc.Wire, error) {
dv.pfxFifo.Push(name)
dv.pfxFifo.Evict(dv.client)
return dv.pfx.Snap(), nil
},
Threshold: PrefixSnapThreshold,
},
})
dv.pfxSubs = make(map[uint64]enc.Name)
// Create prefix table
dv.createPrefixTable()

// Create tables
// Create DV tables
dv.neighbors = table.NewNeighborTable(config, dv.nfdc)
dv.rib = table.NewRib(config)
dv.pfx = table.NewPrefixTable(config, func(w enc.Wire) {
if name, err := dv.pfxSvs.Publish(w); err != nil {
log.Error(dv, "Failed to publish prefix table update", "err", err)
} else {
dv.pfxFifo.Push(name)
}
})
dv.fib = table.NewFib(config, dv.nfdc)

return dv, nil
Expand Down Expand Up @@ -364,3 +339,37 @@ func (dv *Router) destroyFaces() {
}
}
}

func (dv *Router) createPrefixTable() {
// Memory FIFO and subscription list
dv.pfxFifo = object.NewMemoryFifoDir(PrefixSnapThreshold * 3)
dv.pfxSubs = make(map[uint64]enc.Name)

// SVS delivery agent
dv.pfxSvs = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
Name: dv.config.RouterDataPrefix(),
Svs: ndn_sync.SvSyncOpts{
Client: dv.client,
GroupPrefix: dv.config.PrefixTableSyncPrefix(),
BootTime: dv.advert.bootTime,
},
Snapshot: &ndn_sync.SnapshotNodeLatest{
Client: dv.client,
SnapMe: func(name enc.Name) (enc.Wire, error) {
dv.pfxFifo.Push(name)
dv.pfxFifo.Evict(dv.client)
return dv.pfx.Snap(), nil
},
Threshold: PrefixSnapThreshold,
},
})

// Local prefix table
dv.pfx = table.NewPrefixTable(dv.config, func(w enc.Wire) {
if name, err := dv.pfxSvs.Publish(w); err != nil {
log.Error(dv, "Failed to publish prefix table update", "err", err)
} else {
dv.pfxFifo.Push(name)
}
})
}
34 changes: 18 additions & 16 deletions dv/dv/table_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@ import (
"github.com/named-data/ndnd/std/sync"
)

// Compute the RIB chnages for this neighbor
func (dv *Router) ribUpdate(ns *table.NeighborState) {
// postUpdateRib should be called after the RIB has been updated.
// It triggers a corresponding fib update and advert generation.
// Run it in a separate goroutine to avoid deadlocks.
func (dv *Router) postUpdateRib() {
dv.updateFib()
dv.advert.generate()
dv.updatePrefixSubs()
}

// updateRib computes the RIB chnages for this neighbor
func (dv *Router) updateRib(ns *table.NeighborState) {
dv.mutex.Lock()
defer dv.mutex.Unlock()

Expand Down Expand Up @@ -53,11 +62,7 @@ func (dv *Router) ribUpdate(ns *table.NeighborState) {

// If advert changed, increment sequence number
if dirty {
go func() {
dv.fibUpdate()
dv.advert.generate()
dv.prefixSubsUpdate()
}()
go dv.postUpdateRib()
}
}

Expand All @@ -82,15 +87,12 @@ func (dv *Router) checkDeadNeighbors() {
}

if dirty {
go func() {
dv.fibUpdate()
dv.advert.generate()
}()
go dv.postUpdateRib()
}
}

// Update the FIB
func (dv *Router) fibUpdate() {
// updateFib synchronizes the FIB with the RIB.
func (dv *Router) updateFib() {
log.Debug(dv, "Sychronizing updates to forwarding table")

dv.mutex.Lock()
Expand Down Expand Up @@ -139,8 +141,8 @@ func (dv *Router) fibUpdate() {
dv.fib.RemoveUnmarked()
}

// prefixSubsUpdate updates the prefix table subscriptions
func (dv *Router) prefixSubsUpdate() {
// updatePrefixSubs updates the prefix table subscriptions
func (dv *Router) updatePrefixSubs() {
dv.mutex.Lock()
defer dv.mutex.Unlock()

Expand All @@ -161,7 +163,7 @@ func (dv *Router) prefixSubsUpdate() {
// Both snapshots and normal data are handled the same way
if dirty := dv.pfx.Apply(sp.Content); dirty {
// Update the local fib if prefix table changed
go dv.fibUpdate() // expensive
go dv.updateFib() // expensive
}
})
}
Expand Down

0 comments on commit 868c19c

Please sign in to comment.