Skip to content

Commit

Permalink
svs: refactor snap strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 6, 2025
1 parent 868c19c commit bf36434
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 45 deletions.
41 changes: 27 additions & 14 deletions std/sync/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,27 @@ type Snapshot interface {
// Snapshot returns the Snapshot trait.
Snapshot() Snapshot

// initialize the strategy, and set basic parameters.
initialize(node enc.Name, group enc.Name)
// initialize the strategy, and set up ps state.
initialize(snapPsState)

// setCallback sets the callback for fetched snapshot.
// check is called when the state vector is updated.
// The strategy can decide to block fetching for the snapshot.
//
// This function call MUST NOT make the onReceive callback.
check(snapCheckArgs)
}

// snapPsState is the shared data struct between snapshot strategy
// and the SVS data fetching layer.
type snapPsState struct {
// nodePrefix is the name of the current nodePrefix.
nodePrefix enc.Name
// groupPrefix is the name of the sync groupPrefix.
groupPrefix enc.Name

// onReceive is the callback for snapshot received from a remote party.
// The snapshot strategy should call the inner function when
// a snapshot is received.
//
// The callback provides a function to update the state vector,
// and return the snapshot publication. When updating the state vector,
Expand All @@ -30,23 +47,19 @@ type Snapshot interface {
// Even if the callback returns an error, the Publication field should
// be appropriately set. This will trigger a re-fetch for the producers.
//
setCallback(snapshotCallbackWrap)

// check is called when the state vector is updated.
// The strategy can decide to block fetching for the snapshot.
//
// This function call MUST NOT make the callback.
check(snapshotOnUpdateArgs)
onReceive func(callback snapRecvCallback)
}

type snapshotOnUpdateArgs struct {
// snapRecvCallback is the callback function passed to the onReceive callback.
// This callback should update the state if needed (lock is held by the caller).
type snapRecvCallback = func(state SvMap[svsDataState]) (SvsPub, error)

// snapCheckArgs is the arguments passed to the check function.
type snapCheckArgs struct {
// state is the current state vector.
state SvMap[svsDataState]
// node is the node that is updated.
node enc.Name
// hash is the hash of the node name (optimization).
hash string
}

type snapshotCallback = func(state SvMap[svsDataState]) (SvsPub, error)
type snapshotCallbackWrap = func(callback snapshotCallback)
28 changes: 9 additions & 19 deletions std/sync/snapshot_node_latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,8 @@ type SnapshotNodeLatest struct {
// Threshold is the number of updates before a snapshot is taken.
Threshold uint64

// nodePrefix is the name of this instance.
nodePrefix enc.Name
// groupPrefix is the groupPrefix name.
groupPrefix enc.Name
// callback is the snapshot callback.
callback snapshotCallbackWrap

// pss is the struct from the svs layer.
pss snapPsState
// prevSeq is my last snapshot sequence number.
prevSeq uint64
}
Expand All @@ -50,21 +45,16 @@ func (s *SnapshotNodeLatest) Snapshot() Snapshot {
return s
}

func (s *SnapshotNodeLatest) initialize(node enc.Name, group enc.Name) {
func (s *SnapshotNodeLatest) initialize(comm snapPsState) {
if s.Client == nil || s.SnapMe == nil || s.Threshold == 0 {
panic("SnapshotNodeLatest: not initialized")
}

s.nodePrefix = node
s.groupPrefix = group
}

func (s *SnapshotNodeLatest) setCallback(callback snapshotCallbackWrap) {
s.callback = callback
s.pss = comm
}

// check determines if a snapshot should be taken or fetched.
func (s *SnapshotNodeLatest) check(args snapshotOnUpdateArgs) {
func (s *SnapshotNodeLatest) check(args snapCheckArgs) {
// We only care about the latest boot.
// For all other states, make sure the fetch is skipped.
entries := args.state[args.hash]
Expand All @@ -73,7 +63,7 @@ func (s *SnapshotNodeLatest) check(args snapshotOnUpdateArgs) {
last := entries[i] // note: copy
lastV := last.Value // note: copy

if args.node.Equal(s.nodePrefix) {
if args.node.Equal(s.pss.nodePrefix) {
// This is me - check if I should snapshot
// 1. I have reached the threshold
// 2. I have not taken any snapshot yet
Expand Down Expand Up @@ -105,7 +95,7 @@ func (s *SnapshotNodeLatest) check(args snapshotOnUpdateArgs) {
// snapName is the naming convention for snapshots.
func (s *SnapshotNodeLatest) snapName(node enc.Name, boot uint64) enc.Name {
return node.
Append(s.groupPrefix...).
Append(s.pss.groupPrefix...).
Append(enc.NewTimestampComponent(boot)).
Append(enc.NewKeywordComponent("SNAP"))
}
Expand All @@ -126,7 +116,7 @@ func (s *SnapshotNodeLatest) fetch(node enc.Name, boot uint64) {
}

func (s *SnapshotNodeLatest) handleSnap(node enc.Name, boot uint64, cstate ndn.ConsumeState) {
s.callback(func(state SvMap[svsDataState]) (pub SvsPub, err error) {
s.pss.onReceive(func(state SvMap[svsDataState]) (pub SvsPub, err error) {
hash := node.TlvStr()
pub.Publisher = node

Expand Down Expand Up @@ -163,7 +153,7 @@ func (s *SnapshotNodeLatest) handleSnap(node enc.Name, boot uint64, cstate ndn.C

// snap takes a snapshot of the application state.
func (s *SnapshotNodeLatest) snap(boot uint64, seq uint64) {
name := s.snapName(s.nodePrefix, boot).WithVersion(seq)
name := s.snapName(s.pss.nodePrefix, boot).WithVersion(seq)

// Request snapshot from application
wire, err := s.SnapMe(name)
Expand Down
9 changes: 2 additions & 7 deletions std/sync/snapshot_null.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package sync

import enc "github.com/named-data/ndnd/std/encoding"

// SnapshotNull is a non-snapshot strategy.
type SnapshotNull struct {
}
Expand All @@ -10,11 +8,8 @@ func (s *SnapshotNull) Snapshot() Snapshot {
return s
}

func (s *SnapshotNull) initialize(enc.Name, enc.Name) {
}

func (s *SnapshotNull) setCallback(snapshotCallbackWrap) {
func (s *SnapshotNull) initialize(snapPsState) {
}

func (s *SnapshotNull) check(snapshotOnUpdateArgs) {
func (s *SnapshotNull) check(snapCheckArgs) {
}
7 changes: 5 additions & 2 deletions std/sync/svs_alo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO {
if s.opts.Snapshot == nil {
s.opts.Snapshot = &SnapshotNull{}
} else {
s.opts.Snapshot.initialize(s.opts.Name, s.opts.Svs.GroupPrefix)
s.opts.Snapshot.setCallback(s.snapshotCallback)
s.opts.Snapshot.initialize(snapPsState{
nodePrefix: s.opts.Name,
groupPrefix: s.opts.Svs.GroupPrefix,
onReceive: s.snapshotCallback,
})
}

// Initialize the SVS instance.
Expand Down
6 changes: 3 additions & 3 deletions std/sync/svs_alo_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) {
})

// Inform the snapshot strategy
s.opts.Snapshot.check(snapshotOnUpdateArgs{s.state, node, hash})
s.opts.Snapshot.check(snapCheckArgs{s.state, node, hash})

// Update the state vector
if got := s.svs.IncrSeqNo(node); got != seq {
Expand All @@ -94,7 +94,7 @@ func (s *SvsALO) consumeCheck(node enc.Name, hash string) {
}

// Check with the snapshot strategy
s.opts.Snapshot.check(snapshotOnUpdateArgs{s.state, node, hash})
s.opts.Snapshot.check(snapCheckArgs{s.state, node, hash})

totalPending := uint64(0)

Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *SvsALO) consumeObject(node enc.Name, boot uint64, seq uint64) {

// snapshotCallback is called by the snapshot strategy to indicate
// that a snapshot has been fetched.
func (s *SvsALO) snapshotCallback(callback snapshotCallback) {
func (s *SvsALO) snapshotCallback(callback snapRecvCallback) {
s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down

0 comments on commit bf36434

Please sign in to comment.