From 3b181b36975a9d27d0da8856ec0482504e32ac74 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Thu, 6 Feb 2025 21:31:27 +0000 Subject: [PATCH] svs: take snapshot on delivery thread --- std/sync/snapshot.go | 11 +++++-- std/sync/snapshot_node_latest.go | 54 +++++++++++++++++--------------- std/sync/snapshot_null.go | 5 ++- std/sync/svs_alo.go | 10 +++++- std/sync/svs_alo_data.go | 35 +++++++++++++++++---- 5 files changed, 79 insertions(+), 36 deletions(-) diff --git a/std/sync/snapshot.go b/std/sync/snapshot.go index e8fccdb9..8c118152 100644 --- a/std/sync/snapshot.go +++ b/std/sync/snapshot.go @@ -9,11 +9,18 @@ type Snapshot interface { // initialize the strategy, and set up ps state. initialize(snapPsState) - // check is called when the state vector is updated. + // checkFetch is called when the state vector is updated for a different node. // The strategy can decide to block fetching for the snapshot. + // This function may also be called for this node name with a different boot time. // // This function call MUST NOT make the onReceive callback. - check(snapCheckArgs) + checkFetch(snapCheckArgs) + + // checkSelf is called when the state for this node is updated (for this boot). + // The strategy can decide to take a snapshot. + // This function is called from the delivery thread, so the delivery state + // is provided. The strategy should not block or depend on fetch state. + checkSelf(SvMap[uint64]) } // snapPsState is the shared data struct between snapshot strategy diff --git a/std/sync/snapshot_node_latest.go b/std/sync/snapshot_node_latest.go index ae186c0a..19947ec3 100644 --- a/std/sync/snapshot_node_latest.go +++ b/std/sync/snapshot_node_latest.go @@ -53,32 +53,22 @@ func (s *SnapshotNodeLatest) initialize(comm snapPsState) { s.pss = comm } -// check determines if a snapshot should be taken or fetched. -func (s *SnapshotNodeLatest) check(args snapCheckArgs) { +// checkFetch determines if a snapshot should be fetched. +func (s *SnapshotNodeLatest) checkFetch(args snapCheckArgs) { // We only care about the latest boot. // For all other states, make sure the fetch is skipped. entries := args.state[args.hash] for i := range entries { if i == len(entries)-1 { // if is last entry - last := entries[i] // note: copy - lastV := last.Value // note: copy - - if args.node.Equal(s.pss.nodePrefix) { - // This is me - check if I should snapshot - // 1. I have reached the threshold - // 2. I have not taken any snapshot yet - if lastV.Latest-s.prevSeq >= s.Threshold || (s.prevSeq == 0 && lastV.Latest > 0) { - s.snap(last.Boot, lastV.Latest) - } - } else { - // This is not me - check if I should fetch - // 1. Pending gap is more than 2*threshold - // 2. I have not fetched anything yet - // And, I'm not already blocked by a fetch - if lastV.SnapBlock == 0 && (lastV.Latest-lastV.Pending >= s.Threshold*2 || lastV.Pending == 0) { - entries[i].Value.SnapBlock = 1 // released by fetch callback - s.fetch(args.node, entries[i].Boot) - } + boot, value := entries[i].Boot, entries[i].Value + + // Check if we should fetch a snapshot + // 1. Pending gap is more than 2*threshold + // 2. I have not fetched anything yet + // And, I'm not already blocked by a fetch + if value.SnapBlock == 0 && (value.Latest-value.Pending >= s.Threshold*2 || value.Pending == 0) { + entries[i].Value.SnapBlock = 1 // released by fetch callback + s.fetch(args.node, boot) } return } @@ -92,6 +82,21 @@ func (s *SnapshotNodeLatest) check(args snapCheckArgs) { } } +// checkSelf is called when the state for this node is updated. +func (s *SnapshotNodeLatest) checkSelf(delivered SvMap[uint64]) { + // This strategy only cares about the latest boot. + boots := delivered[s.pss.nodePrefix.TlvStr()] + entry := boots[len(boots)-1] + + // Check if I should take a snapshot + // 1. I have reached the threshold + // 2. I have not taken any snapshot yet + if entry.Value-s.prevSeq >= s.Threshold || (s.prevSeq == 0 && entry.Value > 0) { + s.prevSeq = entry.Value + s.takeSnap(entry.Boot, entry.Value) + } +} + // snapName is the naming convention for snapshots. func (s *SnapshotNodeLatest) snapName(node enc.Name, boot uint64) enc.Name { return node. @@ -151,8 +156,8 @@ func (s *SnapshotNodeLatest) handleSnap(node enc.Name, boot uint64, cstate ndn.C }) } -// snap takes a snapshot of the application state. -func (s *SnapshotNodeLatest) snap(boot uint64, seq uint64) { +// takeSnap takes a snapshot of the application state. +func (s *SnapshotNodeLatest) takeSnap(boot uint64, seq uint64) { name := s.snapName(s.pss.nodePrefix, boot).WithVersion(seq) // Request snapshot from application @@ -171,7 +176,4 @@ func (s *SnapshotNodeLatest) snap(boot uint64, seq uint64) { log.Error(nil, "Failed to publish snapshot", "err", err, "name", name) return } - - // TODO: FIFO directory - s.prevSeq = seq } diff --git a/std/sync/snapshot_null.go b/std/sync/snapshot_null.go index 149f742d..1c9a10fd 100644 --- a/std/sync/snapshot_null.go +++ b/std/sync/snapshot_null.go @@ -11,5 +11,8 @@ func (s *SnapshotNull) Snapshot() Snapshot { func (s *SnapshotNull) initialize(snapPsState) { } -func (s *SnapshotNull) check(snapCheckArgs) { +func (s *SnapshotNull) checkFetch(snapCheckArgs) { +} + +func (s *SnapshotNull) checkSelf(SvMap[uint64]) { } diff --git a/std/sync/svs_alo.go b/std/sync/svs_alo.go index c9343e0b..e3f14e84 100644 --- a/std/sync/svs_alo.go +++ b/std/sync/svs_alo.go @@ -94,7 +94,7 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO { s.opts.Snapshot.initialize(snapPsState{ nodePrefix: s.opts.Name, groupPrefix: s.opts.Svs.GroupPrefix, - onReceive: s.snapshotCallback, + onReceive: s.snapRecvCallback, }) } @@ -252,4 +252,12 @@ func (s *SvsALO) deliver(out svsPubOut) { s.delivered.Set(out.hash, out.pub.BootTime, out.pub.SeqNum) } } + + // If this is a publication from self, alert the snapshot strategy. + // It may decide to take a snapshot. + if !out.pub.IsSnapshot && + out.pub.Publisher.Equal(s.opts.Name) && + out.pub.BootTime == s.svs.GetBootTime() { + s.opts.Snapshot.checkSelf(s.delivered) + } } diff --git a/std/sync/svs_alo_data.go b/std/sync/svs_alo_data.go index bfd540a0..75edd781 100644 --- a/std/sync/svs_alo_data.go +++ b/std/sync/svs_alo_data.go @@ -76,8 +76,27 @@ func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) { Pending: seq, }) - // Inform the snapshot strategy - s.opts.Snapshot.check(snapCheckArgs{s.state, node, hash}) + // This is never sent out to the application (duh) since subs is null. + // But we still need to update the delivered vector, and inform the + // snapshot strategy *after* making that update. + // + // This is especially important because the snapshot may depend on the + // entire delivered state, which is what the application will return. + // + // Unfortunately since this happens on a different goroutine, the initial + // snapshot will be taken only after the sequence number is already incremented. + // But this problem is more generalized anyway (what if the app dies at this point?) + s.outpipe <- svsPubOut{ + hash: hash, + pub: SvsPub{ + Publisher: node, + Content: content, + DataName: name, + BootTime: boot, + SeqNum: seq, + }, + subs: nil, + } // Update the state vector if got := s.svs.IncrSeqNo(node); got != seq { @@ -94,7 +113,7 @@ func (s *SvsALO) consumeCheck(node enc.Name, hash string) { } // Check with the snapshot strategy - s.opts.Snapshot.check(snapCheckArgs{s.state, node, hash}) + s.opts.Snapshot.checkFetch(snapCheckArgs{s.state, node, hash}) totalPending := uint64(0) @@ -226,15 +245,19 @@ func (s *SvsALO) consumeObject(node enc.Name, boot uint64, seq uint64) { // we WILL deliver it, update known state s.state.Set(hash, boot, entry) for _, pub := range deliver { - s.outpipe <- svsPubOut{hash: hash, pub: pub, subs: subs} + s.outpipe <- svsPubOut{ + hash: hash, + pub: pub, + subs: subs, + } } } }) } -// snapshotCallback is called by the snapshot strategy to indicate +// snapRecvCallback is called by the snapshot strategy to indicate // that a snapshot has been fetched. -func (s *SvsALO) snapshotCallback(callback snapRecvCallback) { +func (s *SvsALO) snapRecvCallback(callback snapRecvCallback) { s.mutex.Lock() defer s.mutex.Unlock()