From 38010c3307507b20f36f573ed5567b25838335b5 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Thu, 6 Feb 2025 06:46:43 +0000 Subject: [PATCH] dv: restore fifo --- dv/dv/router.go | 16 +++++++++++++--- std/examples/svs-alo/main.go | 2 +- std/object/dir_memory_fifo.go | 21 +++++++++++++++++---- std/sync/snapshot_node_latest.go | 19 ++++++++++++------- 4 files changed, 43 insertions(+), 15 deletions(-) diff --git a/dv/dv/router.go b/dv/dv/router.go index d94575b6..927eabe1 100644 --- a/dv/dv/router.go +++ b/dv/dv/router.go @@ -19,6 +19,8 @@ import ( "github.com/named-data/ndnd/std/utils" ) +const PrefixSnapThreshold = 50 + type Router struct { // go-ndn app that this router is attached to engine ndn.Engine @@ -51,10 +53,13 @@ type Router struct { // advertisement module advert advertModule + // prefix table svs instance pfxSvs *ndn_sync.SvsALO // prefix table svs subscriptions pfxSubs map[uint64]enc.Name + // prefix table fifo + pfxFifo *object.MemoryFifoDir } // Create a new DV router. @@ -107,6 +112,7 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) { } // Join prefix table sync group + dv.pfxFifo = object.NewMemoryFifoDir(PrefixSnapThreshold * 3) dv.pfxSvs = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{ Name: config.RouterDataPrefix(), Svs: ndn_sync.SvSyncOpts{ @@ -116,10 +122,12 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) { }, Snapshot: &ndn_sync.SnapshotNodeLatest{ Client: dv.client, - SnapMe: func() (enc.Wire, error) { + SnapMe: func(name enc.Name) (enc.Wire, error) { + dv.pfxFifo.Push(name) + dv.pfxFifo.Evict(dv.client) return dv.pfx.Snap(), nil }, - Threshold: 50, + Threshold: PrefixSnapThreshold, }, }) dv.pfxSubs = make(map[uint64]enc.Name) @@ -128,8 +136,10 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) { dv.neighbors = table.NewNeighborTable(config, dv.nfdc) dv.rib = table.NewRib(config) dv.pfx = table.NewPrefixTable(config, func(w enc.Wire) { - if _, err := dv.pfxSvs.Publish(w); err != nil { + if name, err := dv.pfxSvs.Publish(w); err != nil { log.Error(dv, "Failed to publish prefix table update", "err", err) + } else { + dv.pfxFifo.Push(name) } }) dv.fib = table.NewFib(config, dv.nfdc) diff --git a/std/examples/svs-alo/main.go b/std/examples/svs-alo/main.go index 04c0f776..ae6483b2 100644 --- a/std/examples/svs-alo/main.go +++ b/std/examples/svs-alo/main.go @@ -83,7 +83,7 @@ func main() { // in this case should contain the entire state of the node. Snapshot: &ndn_sync.SnapshotNodeLatest{ Client: client, - SnapMe: func() (enc.Wire, error) { + SnapMe: func(enc.Name) (enc.Wire, error) { // In this example, we will ignore the old messages // and only send a message with the total number of messages. // diff --git a/std/object/dir_memory_fifo.go b/std/object/dir_memory_fifo.go index cf49b224..289d6792 100644 --- a/std/object/dir_memory_fifo.go +++ b/std/object/dir_memory_fifo.go @@ -1,6 +1,8 @@ package object import ( + "sync" + enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/ndn" ) @@ -8,20 +10,25 @@ import ( // MemoryFifoDir is a simple object directory that evicts the oldest name // when it reaches its size size. type MemoryFifoDir struct { - list []enc.Name - size int + mutex sync.Mutex + list []enc.Name + size int } // NewMemoryFifoDir creates a new directory. func NewMemoryFifoDir(size int) *MemoryFifoDir { return &MemoryFifoDir{ - list: make([]enc.Name, 0), - size: size, + mutex: sync.Mutex{}, + list: make([]enc.Name, 0), + size: size, } } // Push adds a name to the directory. func (d *MemoryFifoDir) Push(name enc.Name) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.list = append(d.list, name.Clone()) } @@ -29,6 +36,9 @@ func (d *MemoryFifoDir) Push(name enc.Name) { // If the directory has not reached its size, it returns nil. // It is recommended to use Evict() instead to remove objects from a client. func (d *MemoryFifoDir) Pop() enc.Name { + d.mutex.Lock() + defer d.mutex.Unlock() + if len(d.list) < d.size { return nil } @@ -54,5 +64,8 @@ func (d *MemoryFifoDir) Evict(client ndn.Client) error { // Count returns the number of names in the directory. func (d *MemoryFifoDir) Count() int { + d.mutex.Lock() + defer d.mutex.Unlock() + return len(d.list) } diff --git a/std/sync/snapshot_node_latest.go b/std/sync/snapshot_node_latest.go index 986c2546..c86f3d80 100644 --- a/std/sync/snapshot_node_latest.go +++ b/std/sync/snapshot_node_latest.go @@ -24,12 +24,14 @@ type SnapshotNodeLatest struct { // SnapMe is the callback to get a snapshot of the application state. // - // The state should encode the entire state of the node, - // and should replace any previous publications completely. + // The state should encode the entire state of the node, and should replace + // any previous publications completely. If this snapshot is delivered to a + // node, previous publications will be ignored by the receiving node. // - // If this snapshot is delivered to a node, previous publications will - // be ignored by the receiving node. - SnapMe func() (enc.Wire, error) + // The callback is passed the name of the snapshot that will be created. + // The application may insert this name in a FIFO directory to manage storage + // and remove old publications and snapshots. + SnapMe func(enc.Name) (enc.Wire, error) // Threshold is the number of updates before a snapshot is taken. Threshold uint64 @@ -161,13 +163,16 @@ func (s *SnapshotNodeLatest) handleSnapshot(node enc.Name, boot uint64, cstate n // snap takes a snapshot of the application state. func (s *SnapshotNodeLatest) snap(boot uint64, seq uint64) { - wire, err := s.SnapMe() + name := s.snapName(s.nodePrefix, boot).WithVersion(seq) + + // Request snapshot from application + wire, err := s.SnapMe(name) if err != nil { log.Error(nil, "Failed to get snapshot", "err", err) return } - name := s.snapName(s.nodePrefix, boot).WithVersion(seq) + // Publish snapshot into our store name, err = s.Client.Produce(ndn.ProduceArgs{ Name: name, Content: wire,