diff --git a/dv/config/schema.tlv b/dv/config/schema.tlv index ebab26fe..37707c49 100644 Binary files a/dv/config/schema.tlv and b/dv/config/schema.tlv differ diff --git a/dv/config/schema.trust b/dv/config/schema.trust index 2b8b44a5..4a46c61d 100644 --- a/dv/config/schema.trust +++ b/dv/config/schema.trust @@ -13,9 +13,7 @@ // Prefix table Sync #pfx_svs: #network/"32=DV"/"32=PFS" <= #router_cert // Prefix table data -#pfx_data: #router/"32=DV"/"32=PFX"/_/_ <= #router_cert -// Prefix table snapshot -#pfx_snap: #router/"32=DV"/"32=PFX"/_/"32=SNAP" <= #router_cert +#pfx_data: #router/#pfx_svs/_/_ <= #router_cert // Certificate definitions #network_cert: #network/#KEY diff --git a/dv/dv/advert_data.go b/dv/dv/advert_data.go index c1a06f70..e7851411 100644 --- a/dv/dv/advert_data.go +++ b/dv/dv/advert_data.go @@ -36,9 +36,9 @@ func (a *advertModule) generate() { } func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64) { - // debounce; wait before fetching, then check if this is still the latest - // sequence number known for this neighbor - time.Sleep(10 * time.Millisecond) + a.dv.mutex.Lock() + defer a.dv.mutex.Unlock() + if ns := a.dv.neighbors.Get(nName); ns == nil || ns.AdvertBoot != bootTime || ns.AdvertSeq != seqNo { return } @@ -52,24 +52,21 @@ func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64) WithVersion(seqNo) a.dv.client.Consume(advName, func(state ndn.ConsumeState) { - go func() { - fetchErr := state.Error() - if fetchErr != nil { - log.Warn(a, "Failed to fetch advertisement", "name", state.Name(), "err", fetchErr) - time.Sleep(1 * time.Second) // wait on error + if err := state.Error(); err != nil { + log.Warn(a, "Failed to fetch advertisement", "name", state.Name(), "err", err) + time.AfterFunc(1*time.Second, func() { a.dataFetch(nName, bootTime, seqNo) - return - } + }) + return + } - // Process the advertisement - a.dataHandler(nName, seqNo, state.Content()) - }() + // Process the advertisement + go a.dataHandler(nName, seqNo, state.Content()) }) } // Received advertisement Data func (a *advertModule) dataHandler(nName enc.Name, seqNo uint64, data enc.Wire) { - // Lock DV state a.dv.mutex.Lock() defer a.dv.mutex.Unlock() diff --git a/dv/dv/advert_sync.go b/dv/dv/advert_sync.go index 6ed9753b..6a657d83 100644 --- a/dv/dv/advert_sync.go +++ b/dv/dv/advert_sync.go @@ -191,7 +191,9 @@ func (a *advertModule) onStateVector(sv *spec_svs.StateVector, faceId uint64, ac ns.AdvertBoot = entry.BootstrapTime ns.AdvertSeq = entry.SeqNo - go a.dataFetch(node.Name, entry.BootstrapTime, entry.SeqNo) + time.AfterFunc(10*time.Millisecond, func() { // debounce + a.dataFetch(node.Name, entry.BootstrapTime, entry.SeqNo) + }) } // Update FIB if needed diff --git a/dv/dv/prefix_sync.go b/dv/dv/prefix_sync.go deleted file mode 100644 index 64c371bd..00000000 --- a/dv/dv/prefix_sync.go +++ /dev/null @@ -1,90 +0,0 @@ -package dv - -import ( - "time" - - enc "github.com/named-data/ndnd/std/encoding" - "github.com/named-data/ndnd/std/log" - "github.com/named-data/ndnd/std/ndn" - ndn_sync "github.com/named-data/ndnd/std/sync" -) - -// Fetch all required prefix data -func (dv *Router) prefixDataFetchAll() { - dv.mutex.Lock() - defer dv.mutex.Unlock() - - for _, e := range dv.rib.Entries() { - router := dv.pfx.GetRouter(e.Name()) - if router != nil && router.Known < router.Latest { - dv.prefixDataFetch(e.Name()) - } - } -} - -// Received prefix sync update -func (dv *Router) onPfxSyncUpdate(ssu ndn_sync.SvSyncUpdate) { - dv.mutex.Lock() - defer dv.mutex.Unlock() - - // Update the prefix table - r := dv.pfx.GetRouter(ssu.Name) - if ssu.Boot > r.BootTime { - r.BootTime = ssu.Boot - r.Known = 0 // new boot - } else if ssu.Boot < r.BootTime { - return // old update - } - r.Latest = ssu.High - - // Start a fetching thread (if needed) - dv.prefixDataFetch(ssu.Name) -} - -// Fetch prefix data (call with lock held) -func (dv *Router) prefixDataFetch(nName enc.Name) { - // Check if the RIB has this destination - if !dv.rib.Has(nName) { - return - } - - // At any given time, there is only one thread fetching - // prefix data for a node. This thread recursively calls itself. - router := dv.pfx.GetRouter(nName) - if router == nil || router.Fetching || router.Known >= router.Latest { - return - } - router.Fetching = true - - // Fetch the prefix data object - log.Debug(dv.pfx, "Fetching prefix data", "router", nName, "known", router.Known, "latest", router.Latest) - - // Get the versioned name of the next object to fetch - // This can be either a snapshot or a normal data depending on - // the state of the router in the prefix table - name := router.GetNextDataName() - - // Fetch the object - dv.client.Consume(name, func(state ndn.ConsumeState) { - go func() { - fetchErr := state.Error() - if fetchErr != nil { - log.Warn(dv.pfx, "Failed to fetch prefix data", "name", name, "err", fetchErr) - time.Sleep(1 * time.Second) // wait on error - } - - dv.mutex.Lock() - defer dv.mutex.Unlock() - - // Process the prefix data on success - if fetchErr == nil && dv.pfx.ApplyData(state.Name(), state.Content(), router) { - // Update the local fib if prefix table changed - go dv.fibUpdate() // very expensive - } - - // Done fetching, restart if needed - router.Fetching = false - dv.prefixDataFetch(nName) - }() - }) -} diff --git a/dv/dv/router.go b/dv/dv/router.go index ab75f5c1..d94575b6 100644 --- a/dv/dv/router.go +++ b/dv/dv/router.go @@ -52,7 +52,9 @@ type Router struct { // advertisement module advert advertModule // prefix table svs instance - pfxSvs *ndn_sync.SvSync + pfxSvs *ndn_sync.SvsALO + // prefix table svs subscriptions + pfxSubs map[uint64]enc.Name } // Create a new DV router. @@ -104,18 +106,32 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) { objDir: object.NewMemoryFifoDir(32), // keep last few advertisements } - // Create sync groups - dv.pfxSvs = ndn_sync.NewSvSync(ndn_sync.SvSyncOpts{ - Client: dv.client, - GroupPrefix: config.PrefixTableSyncPrefix(), - OnUpdate: func(ssu ndn_sync.SvSyncUpdate) { go dv.onPfxSyncUpdate(ssu) }, - BootTime: dv.advert.bootTime, + // Join prefix table sync group + dv.pfxSvs = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{ + Name: config.RouterDataPrefix(), + Svs: ndn_sync.SvSyncOpts{ + Client: dv.client, + GroupPrefix: config.PrefixTableSyncPrefix(), + BootTime: dv.advert.bootTime, + }, + Snapshot: &ndn_sync.SnapshotNodeLatest{ + Client: dv.client, + SnapMe: func() (enc.Wire, error) { + return dv.pfx.Snap(), nil + }, + Threshold: 50, + }, }) + dv.pfxSubs = make(map[uint64]enc.Name) // Create tables dv.neighbors = table.NewNeighborTable(config, dv.nfdc) dv.rib = table.NewRib(config) - dv.pfx = table.NewPrefixTable(config, dv.client, dv.pfxSvs) + dv.pfx = table.NewPrefixTable(config, func(w enc.Wire) { + if _, err := dv.pfxSvs.Publish(w); err != nil { + log.Error(dv, "Failed to publish prefix table update", "err", err) + } + }) dv.fib = table.NewFib(config, dv.nfdc) return dv, nil @@ -170,6 +186,9 @@ func (dv *Router) Start() (err error) { dv.rib.Set(dv.config.RouterName(), dv.config.RouterName(), 0) dv.advert.generate() + // Initialize prefix table + dv.pfx.Reset() + for { select { case <-dv.heartbeat.C: diff --git a/dv/dv/table_algo.go b/dv/dv/table_algo.go index 479029b8..ecf35b96 100644 --- a/dv/dv/table_algo.go +++ b/dv/dv/table_algo.go @@ -5,6 +5,7 @@ import ( "github.com/named-data/ndnd/dv/table" enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/log" + "github.com/named-data/ndnd/std/sync" ) // Compute the RIB chnages for this neighbor @@ -55,7 +56,7 @@ func (dv *Router) ribUpdate(ns *table.NeighborState) { go func() { dv.fibUpdate() dv.advert.generate() - dv.prefixDataFetchAll() + dv.prefixSubsUpdate() }() } } @@ -107,14 +108,14 @@ func (dv *Router) fibUpdate() { } // Update paths to all routers from RIB - for _, router := range dv.rib.Entries() { + for hash, router := range dv.rib.Entries() { // Skip if this is us if router.Name().Equal(dv.config.RouterName()) { continue } // Get FIB entry to reach this router - fes := dv.rib.GetFibEntries(dv.neighbors, router.Name().Hash()) + fes := dv.rib.GetFibEntries(dv.neighbors, hash) // Add entry to the router itself routerPrefix := router.Name().Append(enc.NewKeywordComponent("DV")) @@ -137,3 +138,41 @@ func (dv *Router) fibUpdate() { } dv.fib.RemoveUnmarked() } + +// prefixSubsUpdate updates the prefix table subscriptions +func (dv *Router) prefixSubsUpdate() { + dv.mutex.Lock() + defer dv.mutex.Unlock() + + // Get all prefixes from the RIB + for hash, router := range dv.rib.Entries() { + if router.Name().Equal(dv.config.RouterName()) { + continue + } + + if _, ok := dv.pfxSubs[hash]; !ok { + log.Info(dv, "Router is now reachable", "name", router.Name()) + dv.pfxSubs[hash] = router.Name() + + dv.pfxSvs.SubscribePublisher(router.Name(), func(sp sync.SvsPub) { + dv.mutex.Lock() + defer dv.mutex.Unlock() + + // Both snapshots and normal data are handled the same way + if dirty := dv.pfx.Apply(sp.Content); dirty { + // Update the local fib if prefix table changed + go dv.fibUpdate() // expensive + } + }) + } + } + + // Remove dead subscriptions + for hash, name := range dv.pfxSubs { + if !dv.rib.Has(name) { + log.Info(dv, "Router is now unreachable", "name", name) + dv.pfxSvs.UnsubscribePublisher(name) + delete(dv.pfxSubs, hash) + } + } +} diff --git a/dv/table/prefix_table.go b/dv/table/prefix_table.go index 9781388f..7c9acabf 100644 --- a/dv/table/prefix_table.go +++ b/dv/table/prefix_table.go @@ -5,32 +5,16 @@ import ( "github.com/named-data/ndnd/dv/tlv" enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/log" - "github.com/named-data/ndnd/std/ndn" - "github.com/named-data/ndnd/std/object" - ndn_sync "github.com/named-data/ndnd/std/sync" ) -const PrefixSnapThreshold = 100 -const PrefixSnapKeyword = "SNAP" - type PrefixTable struct { - config *config.Config - client ndn.Client - svs *ndn_sync.SvSync - + config *config.Config + publish func(enc.Wire) routers map[string]*PrefixTableRouter me *PrefixTableRouter - - snapshotAt uint64 - objDir *object.MemoryFifoDir } type PrefixTableRouter struct { - Name enc.Name - Fetching bool - BootTime uint64 - Known uint64 - Latest uint64 Prefixes map[string]*PrefixEntry } @@ -38,27 +22,14 @@ type PrefixEntry struct { Name enc.Name } -func NewPrefixTable( - config *config.Config, - client ndn.Client, - svs *ndn_sync.SvSync, -) *PrefixTable { +func NewPrefixTable(config *config.Config, publish func(enc.Wire)) *PrefixTable { pt := &PrefixTable{ - config: config, - client: client, - svs: svs, - + config: config, + publish: publish, routers: make(map[string]*PrefixTableRouter), me: nil, - - snapshotAt: 0, - objDir: object.NewMemoryFifoDir(3 * PrefixSnapThreshold), } - pt.me = pt.GetRouter(config.RouterName()) - pt.me.BootTime = svs.GetBootTime() - pt.Reset() - return pt } @@ -67,11 +38,10 @@ func (pt *PrefixTable) String() string { } func (pt *PrefixTable) GetRouter(name enc.Name) *PrefixTableRouter { - hash := name.String() + hash := name.TlvStr() router := pt.routers[hash] if router == nil { router = &PrefixTableRouter{ - Name: name, Prefixes: make(map[string]*PrefixEntry), } pt.routers[hash] = router @@ -81,22 +51,22 @@ func (pt *PrefixTable) GetRouter(name enc.Name) *PrefixTableRouter { func (pt *PrefixTable) Reset() { log.Info(pt, "Reset table") - pt.me.Prefixes = make(map[string]*PrefixEntry) + clear(pt.me.Prefixes) op := tlv.PrefixOpList{ ExitRouter: &tlv.Destination{Name: pt.config.RouterName()}, PrefixOpReset: true, } - pt.publishOp(op.Encode()) + pt.publish(op.Encode()) } func (pt *PrefixTable) Announce(name enc.Name) { log.Info(pt, "Announce prefix", "name", name) - hash := name.String() + hash := name.TlvStr() // Skip if matching entry already exists // This will also need to check that all params are equal - if entry := pt.me.Prefixes[hash]; entry != nil && entry.Name.Equal(name) { + if entry := pt.me.Prefixes[hash]; entry != nil { return } @@ -110,12 +80,12 @@ func (pt *PrefixTable) Announce(name enc.Name) { Cost: 1, }}, } - pt.publishOp(op.Encode()) + pt.publish(op.Encode()) } func (pt *PrefixTable) Withdraw(name enc.Name) { log.Info(pt, "Withdraw prefix", "name", name) - hash := name.String() + hash := name.TlvStr() // Check if entry does not exist if entry := pt.me.Prefixes[hash]; entry == nil { @@ -129,11 +99,17 @@ func (pt *PrefixTable) Withdraw(name enc.Name) { ExitRouter: &tlv.Destination{Name: pt.config.RouterName()}, PrefixOpRemoves: []*tlv.PrefixOpRemove{{Name: name}}, } - pt.publishOp(op.Encode()) + pt.publish(op.Encode()) } // Applies ops from a list. Returns if dirty. -func (pt *PrefixTable) Apply(ops *tlv.PrefixOpList) (dirty bool) { +func (pt *PrefixTable) Apply(wire enc.Wire) (dirty bool) { + ops, err := tlv.ParsePrefixOpList(enc.NewWireReader(wire), true) + if err != nil { + log.Warn(pt, "Failed to parse PrefixOpList", "err", err) + return false + } + if ops.ExitRouter == nil || len(ops.ExitRouter.Name) == 0 { log.Error(pt, "Received PrefixOpList has no ExitRouter") return false @@ -149,103 +125,20 @@ func (pt *PrefixTable) Apply(ops *tlv.PrefixOpList) (dirty bool) { for _, add := range ops.PrefixOpAdds { log.Info(pt, "Add remote prefix", "router", ops.ExitRouter.Name, "name", add.Name) - router.Prefixes[add.Name.String()] = &PrefixEntry{Name: add.Name} + router.Prefixes[add.Name.TlvStr()] = &PrefixEntry{Name: add.Name} dirty = true } for _, remove := range ops.PrefixOpRemoves { log.Info(pt, "Remove remote prefix", "router", ops.ExitRouter.Name, "name", remove.Name) - delete(router.Prefixes, remove.Name.String()) + delete(router.Prefixes, remove.Name.TlvStr()) dirty = true } return dirty } -// Get the object name to fetch the next prefix table data. -// If the difference between Known and Latest is greater than the threshold, -// fetch the latest snapshot. Otherwise, fetch the next sequence number. -func (r *PrefixTableRouter) GetNextDataName() enc.Name { - // //32=DV/32=PFX/t=/32=SNAP/v= - // //32=DV/32=PFX/t=/seq=/v=0 - prefix := r.Name. - Append(enc.NewKeywordComponent("DV")). - Append(enc.NewKeywordComponent("PFX")). - Append(enc.NewTimestampComponent(r.BootTime)) - - if r.Latest-r.Known > PrefixSnapThreshold { - return prefix. - Append(enc.NewKeywordComponent(PrefixSnapKeyword)) - } - - return prefix. - Append(enc.NewSequenceNumComponent(r.Known + 1)). - WithVersion(enc.VersionImmutable) -} - -// Process the received prefix data. Returns if dirty. -func (pt *PrefixTable) ApplyData(name enc.Name, data enc.Wire, router *PrefixTableRouter) bool { - if len(name) < 2 { - log.Warn(pt, "Unexpected name length", "len", len(name)) - return false - } - - // Get sequence number from name - // //32=DV/32=PFX/t=/32=SNAP/v= - // //32=DV/32=PFX/t=/seq=/v=0 - var seqNo uint64 - if name.At(-2).IsKeyword(PrefixSnapKeyword) { - // version is sequence number for snapshot - seqNo = name.At(-1).NumberVal() - } else if name.At(-2).IsSequenceNum() { - // version is immutable, sequence number is in name - seqNo = name.At(-2).NumberVal() - } else { - log.Warn(pt, "Unexpected prefix data name", "name", name) - return false - } - - // Parse the prefix data - ops, err := tlv.ParsePrefixOpList(enc.NewWireReader(data), true) - if err != nil { - log.Warn(pt, "Failed to parse PrefixOpList", "err", err) - return false - } - - // Update the prefix table - router.Known = seqNo - return pt.Apply(ops) -} - -func (pt *PrefixTable) publishOp(content enc.Wire) { - // Increment our sequence number - seq := pt.svs.IncrSeqNo(pt.config.RouterName()) - pt.me.Known = seq - pt.me.Latest = seq - - // Produce the operation - // //32=DV/32=PFX/t=/seq=/v=0 - name, err := pt.client.Produce(ndn.ProduceArgs{ - Name: pt.config.PrefixTableDataPrefix(). - Append(enc.NewTimestampComponent(pt.me.BootTime)). - Append(enc.NewSequenceNumComponent(seq)). - WithVersion(enc.VersionImmutable), - Content: content, - }) - if err != nil { - log.Error(pt, "Failed to produce op", "err", err) - return - } - pt.objDir.Push(name) - - // Create snapshot if needed - if seq-pt.snapshotAt >= PrefixSnapThreshold/2 { - pt.publishSnap() - } -} - -func (pt *PrefixTable) publishSnap() { - // Encode the snapshot +func (pt *PrefixTable) Snap() enc.Wire { snap := tlv.PrefixOpList{ ExitRouter: &tlv.Destination{Name: pt.config.RouterName()}, PrefixOpReset: true, @@ -259,21 +152,5 @@ func (pt *PrefixTable) publishSnap() { }) } - // Produce the snapshot - // //32=DV/32=PFX/t=/32=SNAP/v= - name, err := pt.client.Produce(ndn.ProduceArgs{ - Name: pt.config.PrefixTableDataPrefix(). - Append(enc.NewTimestampComponent(pt.me.BootTime)). - Append(enc.NewKeywordComponent(PrefixSnapKeyword)). - WithVersion(pt.me.Latest), - Content: snap.Encode(), - }) - if err != nil { - log.Error(pt, "Failed to produce snap", "err", err) - } - pt.objDir.Push(name) - pt.objDir.Evict(pt.client) - - // Mark current snapshot time for next - pt.snapshotAt = pt.me.Latest + return snap.Encode() } diff --git a/dv/table/rib.go b/dv/table/rib.go index 7d064842..8686bfe9 100644 --- a/dv/table/rib.go +++ b/dv/table/rib.go @@ -2,6 +2,7 @@ package table import ( "fmt" + "iter" "github.com/named-data/ndnd/dv/config" "github.com/named-data/ndnd/dv/tlv" @@ -102,14 +103,16 @@ func (r *Rib) Has(destName enc.Name) bool { } // Get all destinations reachable in the RIB. -func (r *Rib) Entries() []*RibEntry { - entries := make([]*RibEntry, 0, len(r.entries)) - for _, entry := range r.entries { - if entry.lowest1 < config.CostInfinity { - entries = append(entries, entry) +func (r *Rib) Entries() iter.Seq2[uint64, *RibEntry] { + return func(yield func(uint64, *RibEntry) bool) { + for hash, entry := range r.entries { + if entry.lowest1 < config.CostInfinity { + if !yield(hash, entry) { + return + } + } } } - return entries } // Remove all entries with a given next hop. diff --git a/std/sync/svs_alo.go b/std/sync/svs_alo.go index 17839af0..b143d03d 100644 --- a/std/sync/svs_alo.go +++ b/std/sync/svs_alo.go @@ -78,7 +78,7 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO { publpipe: make(chan enc.Name, 16), stop: make(chan struct{}), - onError: func(err error) { log.Error(nil, err.Error()) }, + onError: func(err error) { log.Warn(nil, err.Error()) }, onPublisher: nil, }