Skip to content

Commit

Permalink
svs: take snapshot on delivery thread
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 6, 2025
1 parent bf36434 commit 3b181b3
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 36 deletions.
11 changes: 9 additions & 2 deletions std/sync/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ type Snapshot interface {
// initialize the strategy, and set up ps state.
initialize(snapPsState)

// check is called when the state vector is updated.
// checkFetch is called when the state vector is updated for a different node.
// The strategy can decide to block fetching for the snapshot.
// This function may also be called for this node name with a different boot time.
//
// This function call MUST NOT make the onReceive callback.
check(snapCheckArgs)
checkFetch(snapCheckArgs)

// checkSelf is called when the state for this node is updated (for this boot).
// The strategy can decide to take a snapshot.
// This function is called from the delivery thread, so the delivery state
// is provided. The strategy should not block or depend on fetch state.
checkSelf(SvMap[uint64])
}

// snapPsState is the shared data struct between snapshot strategy
Expand Down
54 changes: 28 additions & 26 deletions std/sync/snapshot_node_latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,22 @@ func (s *SnapshotNodeLatest) initialize(comm snapPsState) {
s.pss = comm
}

// check determines if a snapshot should be taken or fetched.
func (s *SnapshotNodeLatest) check(args snapCheckArgs) {
// checkFetch determines if a snapshot should be fetched.
func (s *SnapshotNodeLatest) checkFetch(args snapCheckArgs) {
// We only care about the latest boot.
// For all other states, make sure the fetch is skipped.
entries := args.state[args.hash]
for i := range entries {
if i == len(entries)-1 { // if is last entry
last := entries[i] // note: copy
lastV := last.Value // note: copy

if args.node.Equal(s.pss.nodePrefix) {
// This is me - check if I should snapshot
// 1. I have reached the threshold
// 2. I have not taken any snapshot yet
if lastV.Latest-s.prevSeq >= s.Threshold || (s.prevSeq == 0 && lastV.Latest > 0) {
s.snap(last.Boot, lastV.Latest)
}
} else {
// This is not me - check if I should fetch
// 1. Pending gap is more than 2*threshold
// 2. I have not fetched anything yet
// And, I'm not already blocked by a fetch
if lastV.SnapBlock == 0 && (lastV.Latest-lastV.Pending >= s.Threshold*2 || lastV.Pending == 0) {
entries[i].Value.SnapBlock = 1 // released by fetch callback
s.fetch(args.node, entries[i].Boot)
}
boot, value := entries[i].Boot, entries[i].Value

// Check if we should fetch a snapshot
// 1. Pending gap is more than 2*threshold
// 2. I have not fetched anything yet
// And, I'm not already blocked by a fetch
if value.SnapBlock == 0 && (value.Latest-value.Pending >= s.Threshold*2 || value.Pending == 0) {
entries[i].Value.SnapBlock = 1 // released by fetch callback
s.fetch(args.node, boot)
}
return
}
Expand All @@ -92,6 +82,21 @@ func (s *SnapshotNodeLatest) check(args snapCheckArgs) {
}
}

// checkSelf is called when the state for this node is updated.
func (s *SnapshotNodeLatest) checkSelf(delivered SvMap[uint64]) {
// This strategy only cares about the latest boot.
boots := delivered[s.pss.nodePrefix.TlvStr()]
entry := boots[len(boots)-1]

// Check if I should take a snapshot
// 1. I have reached the threshold
// 2. I have not taken any snapshot yet
if entry.Value-s.prevSeq >= s.Threshold || (s.prevSeq == 0 && entry.Value > 0) {
s.prevSeq = entry.Value
s.takeSnap(entry.Boot, entry.Value)
}
}

// snapName is the naming convention for snapshots.
func (s *SnapshotNodeLatest) snapName(node enc.Name, boot uint64) enc.Name {
return node.
Expand Down Expand Up @@ -151,8 +156,8 @@ func (s *SnapshotNodeLatest) handleSnap(node enc.Name, boot uint64, cstate ndn.C
})
}

// snap takes a snapshot of the application state.
func (s *SnapshotNodeLatest) snap(boot uint64, seq uint64) {
// takeSnap takes a snapshot of the application state.
func (s *SnapshotNodeLatest) takeSnap(boot uint64, seq uint64) {
name := s.snapName(s.pss.nodePrefix, boot).WithVersion(seq)

// Request snapshot from application
Expand All @@ -171,7 +176,4 @@ func (s *SnapshotNodeLatest) snap(boot uint64, seq uint64) {
log.Error(nil, "Failed to publish snapshot", "err", err, "name", name)
return
}

// TODO: FIFO directory
s.prevSeq = seq
}
5 changes: 4 additions & 1 deletion std/sync/snapshot_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@ func (s *SnapshotNull) Snapshot() Snapshot {
func (s *SnapshotNull) initialize(snapPsState) {
}

func (s *SnapshotNull) check(snapCheckArgs) {
func (s *SnapshotNull) checkFetch(snapCheckArgs) {
}

func (s *SnapshotNull) checkSelf(SvMap[uint64]) {
}
10 changes: 9 additions & 1 deletion std/sync/svs_alo.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO {
s.opts.Snapshot.initialize(snapPsState{
nodePrefix: s.opts.Name,
groupPrefix: s.opts.Svs.GroupPrefix,
onReceive: s.snapshotCallback,
onReceive: s.snapRecvCallback,
})
}

Expand Down Expand Up @@ -252,4 +252,12 @@ func (s *SvsALO) deliver(out svsPubOut) {
s.delivered.Set(out.hash, out.pub.BootTime, out.pub.SeqNum)
}
}

// If this is a publication from self, alert the snapshot strategy.
// It may decide to take a snapshot.
if !out.pub.IsSnapshot &&
out.pub.Publisher.Equal(s.opts.Name) &&
out.pub.BootTime == s.svs.GetBootTime() {
s.opts.Snapshot.checkSelf(s.delivered)
}
}
35 changes: 29 additions & 6 deletions std/sync/svs_alo_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,27 @@ func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) {
Pending: seq,
})

// Inform the snapshot strategy
s.opts.Snapshot.check(snapCheckArgs{s.state, node, hash})
// This is never sent out to the application (duh) since subs is null.
// But we still need to update the delivered vector, and inform the
// snapshot strategy *after* making that update.
//
// This is especially important because the snapshot may depend on the
// entire delivered state, which is what the application will return.
//
// Unfortunately since this happens on a different goroutine, the initial
// snapshot will be taken only after the sequence number is already incremented.
// But this problem is more generalized anyway (what if the app dies at this point?)
s.outpipe <- svsPubOut{
hash: hash,
pub: SvsPub{
Publisher: node,
Content: content,
DataName: name,
BootTime: boot,
SeqNum: seq,
},
subs: nil,
}

// Update the state vector
if got := s.svs.IncrSeqNo(node); got != seq {
Expand All @@ -94,7 +113,7 @@ func (s *SvsALO) consumeCheck(node enc.Name, hash string) {
}

// Check with the snapshot strategy
s.opts.Snapshot.check(snapCheckArgs{s.state, node, hash})
s.opts.Snapshot.checkFetch(snapCheckArgs{s.state, node, hash})

totalPending := uint64(0)

Expand Down Expand Up @@ -226,15 +245,19 @@ func (s *SvsALO) consumeObject(node enc.Name, boot uint64, seq uint64) {
// we WILL deliver it, update known state
s.state.Set(hash, boot, entry)
for _, pub := range deliver {
s.outpipe <- svsPubOut{hash: hash, pub: pub, subs: subs}
s.outpipe <- svsPubOut{
hash: hash,
pub: pub,
subs: subs,
}
}
}
})
}

// snapshotCallback is called by the snapshot strategy to indicate
// snapRecvCallback is called by the snapshot strategy to indicate
// that a snapshot has been fetched.
func (s *SvsALO) snapshotCallback(callback snapRecvCallback) {
func (s *SvsALO) snapRecvCallback(callback snapRecvCallback) {
s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down

0 comments on commit 3b181b3

Please sign in to comment.