Skip to content

Commit

Permalink
dv: restore fifo
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 6, 2025
1 parent 6fd6712 commit 38010c3
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
16 changes: 13 additions & 3 deletions dv/dv/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/named-data/ndnd/std/utils"
)

const PrefixSnapThreshold = 50

type Router struct {
// go-ndn app that this router is attached to
engine ndn.Engine
Expand Down Expand Up @@ -51,10 +53,13 @@ type Router struct {

// advertisement module
advert advertModule

// prefix table svs instance
pfxSvs *ndn_sync.SvsALO
// prefix table svs subscriptions
pfxSubs map[uint64]enc.Name
// prefix table fifo
pfxFifo *object.MemoryFifoDir
}

// Create a new DV router.
Expand Down Expand Up @@ -107,6 +112,7 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
}

// Join prefix table sync group
dv.pfxFifo = object.NewMemoryFifoDir(PrefixSnapThreshold * 3)
dv.pfxSvs = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
Name: config.RouterDataPrefix(),
Svs: ndn_sync.SvSyncOpts{
Expand All @@ -116,10 +122,12 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
},
Snapshot: &ndn_sync.SnapshotNodeLatest{
Client: dv.client,
SnapMe: func() (enc.Wire, error) {
SnapMe: func(name enc.Name) (enc.Wire, error) {
dv.pfxFifo.Push(name)
dv.pfxFifo.Evict(dv.client)
return dv.pfx.Snap(), nil
},
Threshold: 50,
Threshold: PrefixSnapThreshold,
},
})
dv.pfxSubs = make(map[uint64]enc.Name)
Expand All @@ -128,8 +136,10 @@ func NewRouter(config *config.Config, engine ndn.Engine) (*Router, error) {
dv.neighbors = table.NewNeighborTable(config, dv.nfdc)
dv.rib = table.NewRib(config)
dv.pfx = table.NewPrefixTable(config, func(w enc.Wire) {
if _, err := dv.pfxSvs.Publish(w); err != nil {
if name, err := dv.pfxSvs.Publish(w); err != nil {
log.Error(dv, "Failed to publish prefix table update", "err", err)
} else {
dv.pfxFifo.Push(name)
}
})
dv.fib = table.NewFib(config, dv.nfdc)
Expand Down
2 changes: 1 addition & 1 deletion std/examples/svs-alo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
// in this case should contain the entire state of the node.
Snapshot: &ndn_sync.SnapshotNodeLatest{
Client: client,
SnapMe: func() (enc.Wire, error) {
SnapMe: func(enc.Name) (enc.Wire, error) {
// In this example, we will ignore the old messages
// and only send a message with the total number of messages.
//
Expand Down
21 changes: 17 additions & 4 deletions std/object/dir_memory_fifo.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,44 @@
package object

import (
"sync"

enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/ndn"
)

// MemoryFifoDir is a simple object directory that evicts the oldest name
// when it reaches its size size.
type MemoryFifoDir struct {
list []enc.Name
size int
mutex sync.Mutex
list []enc.Name
size int
}

// NewMemoryFifoDir creates a new directory.
func NewMemoryFifoDir(size int) *MemoryFifoDir {
return &MemoryFifoDir{
list: make([]enc.Name, 0),
size: size,
mutex: sync.Mutex{},
list: make([]enc.Name, 0),
size: size,
}
}

// Push adds a name to the directory.
func (d *MemoryFifoDir) Push(name enc.Name) {
d.mutex.Lock()
defer d.mutex.Unlock()

d.list = append(d.list, name.Clone())
}

// Pop removes the oldest name from the directory and returns it.
// If the directory has not reached its size, it returns nil.
// It is recommended to use Evict() instead to remove objects from a client.
func (d *MemoryFifoDir) Pop() enc.Name {
d.mutex.Lock()
defer d.mutex.Unlock()

if len(d.list) < d.size {
return nil
}
Expand All @@ -54,5 +64,8 @@ func (d *MemoryFifoDir) Evict(client ndn.Client) error {

// Count returns the number of names in the directory.
func (d *MemoryFifoDir) Count() int {
d.mutex.Lock()
defer d.mutex.Unlock()

return len(d.list)
}
19 changes: 12 additions & 7 deletions std/sync/snapshot_node_latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ type SnapshotNodeLatest struct {

// SnapMe is the callback to get a snapshot of the application state.
//
// The state should encode the entire state of the node,
// and should replace any previous publications completely.
// The state should encode the entire state of the node, and should replace
// any previous publications completely. If this snapshot is delivered to a
// node, previous publications will be ignored by the receiving node.
//
// If this snapshot is delivered to a node, previous publications will
// be ignored by the receiving node.
SnapMe func() (enc.Wire, error)
// The callback is passed the name of the snapshot that will be created.
// The application may insert this name in a FIFO directory to manage storage
// and remove old publications and snapshots.
SnapMe func(enc.Name) (enc.Wire, error)
// Threshold is the number of updates before a snapshot is taken.
Threshold uint64

Expand Down Expand Up @@ -161,13 +163,16 @@ func (s *SnapshotNodeLatest) handleSnapshot(node enc.Name, boot uint64, cstate n

// snap takes a snapshot of the application state.
func (s *SnapshotNodeLatest) snap(boot uint64, seq uint64) {
wire, err := s.SnapMe()
name := s.snapName(s.nodePrefix, boot).WithVersion(seq)

// Request snapshot from application
wire, err := s.SnapMe(name)
if err != nil {
log.Error(nil, "Failed to get snapshot", "err", err)
return
}

name := s.snapName(s.nodePrefix, boot).WithVersion(seq)
// Publish snapshot into our store
name, err = s.Client.Produce(ndn.ProduceArgs{
Name: name,
Content: wire,
Expand Down

0 comments on commit 38010c3

Please sign in to comment.