From 970b3f54006f24c2caec7ae790cb859853a01678 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Sun, 9 Feb 2025 02:20:22 +0000 Subject: [PATCH] svs: more on history --- std/examples/svs/alo-history/main.go | 107 +++++++++++---- std/examples/svs/alo-latest/main.go | 4 +- std/ndn/svs/v3/definitions.go | 10 ++ std/ndn/svs/v3/zz_generated.go | 198 +++++++++++++++++++++++++++ std/object/client_local.go | 2 +- std/object/client_produce.go | 3 - std/object/store_bolt.go | 19 +-- std/sync/snapshot_node_history.go | 2 +- std/sync/svs.go | 14 +- std/sync/svs_alo.go | 44 ++++-- std/sync/svs_alo_data.go | 24 +++- std/sync/svs_pub.go | 4 +- 12 files changed, 364 insertions(+), 67 deletions(-) diff --git a/std/examples/svs/alo-history/main.go b/std/examples/svs/alo-history/main.go index 25fabf14..9e862ef3 100644 --- a/std/examples/svs/alo-history/main.go +++ b/std/examples/svs/alo-history/main.go @@ -4,15 +4,23 @@ import ( "bufio" "fmt" "os" + "strings" enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/engine" "github.com/named-data/ndnd/std/log" + "github.com/named-data/ndnd/std/ndn" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" "github.com/named-data/ndnd/std/ndn/svs_ps" "github.com/named-data/ndnd/std/object" ndn_sync "github.com/named-data/ndnd/std/sync" ) +var group, _ = enc.NameFromStr("/ndn/svs") +var name enc.Name +var svsalo *ndn_sync.SvsALO +var store ndn.Store + func main() { // This example shows how to use the SVS ALO with the SnapshotNodeHistory. // @@ -31,8 +39,8 @@ func main() { } // Parse command line arguments - group, _ := enc.NameFromStr("/ndn/svs") - name, err := enc.NameFromStr(os.Args[1]) + var err error + name, err = enc.NameFromStr(os.Args[1]) if err != nil { log.Fatal(nil, "Invalid node ID", "name", os.Args[1]) return @@ -47,8 +55,16 @@ func main() { } defer app.Stop() + // History snapshot works best with persistent storage + ident := strings.ReplaceAll(name.String(), "/", "-") + store, err = object.NewBoltStore(fmt.Sprintf("chat%s.db", ident)) + if err != nil { + log.Error(nil, "Unable to create object store", "err", err) + return + } + // Create object client - client := object.NewClient(app, object.NewMemoryStore(), nil) + client := object.NewClient(app, store, nil) if err = client.Start(); err != nil { log.Error(nil, "Unable to start object client", "err", err) return @@ -56,10 +72,13 @@ func main() { defer client.Stop() // Create a new SVS ALO instance - svsalo := ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{ + svsalo = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{ // Name is the name of the node Name: name, + // Initial state is the state of the node + InitialState: readState(), + // Svs is the Sync group options Svs: ndn_sync.SvSyncOpts{ Client: client, @@ -82,25 +101,27 @@ func main() { // Subscribe to all publications svsalo.SubscribePublisher(enc.Name{}, func(pub ndn_sync.SvsPub) { - // Normal publication, just print it if !pub.IsSnapshot { + // Normal publication, just print it fmt.Printf("%s: %s\n", pub.Publisher, pub.Bytes()) - return - } + } else { + // Snapshot publication, unwrap and print all messages + snapshot, err := svs_ps.ParseHistorySnap(enc.NewWireView(pub.Content), true) + if err != nil { + panic(err) // undefined behavior after this point + } - // Snapshot publication, unwrap and print all messages - snapshot, err := svs_ps.ParseHistorySnap(enc.NewWireView(pub.Content), true) - if err != nil { - log.Error(nil, "Unable to parse snapshot", "err", err) - return - } - fmt.Fprintf(os.Stderr, "*** Snapshot from %s with %d entries\n", - pub.Publisher, len(snapshot.Entries)) + fmt.Fprintf(os.Stderr, "*** Snapshot from %s with %d entries\n", + pub.Publisher, len(snapshot.Entries)) - // Print all messages in the snapshot - for _, entry := range snapshot.Entries { - fmt.Printf("%s: %s\n", pub.Publisher, entry.Content.Join()) + // Print all messages in the snapshot + for _, entry := range snapshot.Entries { + fmt.Printf("%s: %s\n", pub.Publisher, entry.Content.Join()) + } } + + // Commit the state after processing the publication + commitState(pub.State) }) // Register routes to the local forwarder @@ -126,10 +147,7 @@ func main() { fmt.Fprintln(os.Stderr) // Publish an initial message to announce our presence - _, err = svsalo.Publish(enc.Wire{[]byte("Joined chat")}) - if err != nil { - log.Error(nil, "Unable to publish message", "err", err) - } + publish([]byte("Entered the chat room")) counter := 1 reader := bufio.NewReader(os.Stdin) @@ -150,19 +168,50 @@ func main() { // Special testing function !! to send 20 messages after counter if string(line) == "!!" { for i := 0; i < 20; i++ { - _, err = svsalo.Publish(enc.Wire{[]byte(fmt.Sprintf("Message %d", counter))}) - if err != nil { - log.Error(nil, "Unable to publish message", "err", err) - } + publish([]byte(fmt.Sprintf("Message %d", counter))) counter++ } continue } - // Publish chat message - _, err = svsalo.Publish(enc.Wire{line}) + publish(line) + } +} + +func publish(content []byte) { + _, state, err := svsalo.Publish(enc.Wire{content}) + if err != nil { + log.Error(nil, "Unable to publish message", "err", err) + } + + // Commit the state after processing our own publication + commitState(state) +} + +func commitState(state *spec_svs.InstanceState) { + // Once a publication is processed, ideally the application should persist + // it's own state and the state of the Sync group *atomically*. + // + // Applications can use their own data structures to store the state. + // In this example, we use the object store to persist the state. + store.Put(group, 0, state.Encode().Join()) +} + +func readState() *spec_svs.InstanceState { + // Read the state from the object store + // See commitState for more information + stateWire, err := store.Get(group, false) + if err != nil { + log.Error(nil, "Unable to get state from object store", "err", err) + os.Exit(1) + } + if stateWire != nil { + state, err := spec_svs.ParseInstanceState(enc.NewBufferView(stateWire), true) if err != nil { - log.Error(nil, "Unable to publish message", "err", err) + log.Error(nil, "Unable to parse state from object store", "err", err) + os.Exit(1) } + return state } + return nil } diff --git a/std/examples/svs/alo-latest/main.go b/std/examples/svs/alo-latest/main.go index 019965e1..447e5a39 100644 --- a/std/examples/svs/alo-latest/main.go +++ b/std/examples/svs/alo-latest/main.go @@ -180,7 +180,7 @@ func main() { fmt.Fprintln(os.Stderr) // Publish an initial empty message to announce our presence - _, err = svsalo.Publish(enc.Wire{}) + _, _, err = svsalo.Publish(enc.Wire{}) if err != nil { log.Error(nil, "Unable to publish message", "err", err) } @@ -203,7 +203,7 @@ func main() { // Publish chat message msgCount++ msgSize += len(line) - _, err = svsalo.Publish(enc.Wire{line}) + _, _, err = svsalo.Publish(enc.Wire{line}) if err != nil { log.Error(nil, "Unable to publish message", "err", err) } diff --git a/std/ndn/svs/v3/definitions.go b/std/ndn/svs/v3/definitions.go index 1667a8b9..d1c01e30 100644 --- a/std/ndn/svs/v3/definitions.go +++ b/std/ndn/svs/v3/definitions.go @@ -28,3 +28,13 @@ type SeqNoEntry struct { //+field:natural SeqNo uint64 `tlv:"0xd6"` } + +// This actually belongs in SVS-PS but codegen doesn't support cross-package +type InstanceState struct { + //+field:name + Name enc.Name `tlv:"0x07"` + //+field:natural + BootstrapTime uint64 `tlv:"0xd4"` + //+field:struct:StateVector + StateVector *StateVector `tlv:"0xc9"` +} diff --git a/std/ndn/svs/v3/zz_generated.go b/std/ndn/svs/v3/zz_generated.go index f78a5f84..4f6af256 100644 --- a/std/ndn/svs/v3/zz_generated.go +++ b/std/ndn/svs/v3/zz_generated.go @@ -740,3 +740,201 @@ func ParseSeqNoEntry(reader enc.WireView, ignoreCritical bool) (*SeqNoEntry, err context.Init() return context.Parse(reader, ignoreCritical) } + +type InstanceStateEncoder struct { + length uint + + Name_length uint + + StateVector_encoder StateVectorEncoder +} + +type InstanceStateParsingContext struct { + StateVector_context StateVectorParsingContext +} + +func (encoder *InstanceStateEncoder) Init(value *InstanceState) { + if value.Name != nil { + encoder.Name_length = 0 + for _, c := range value.Name { + encoder.Name_length += uint(c.EncodingLength()) + } + } + + if value.StateVector != nil { + encoder.StateVector_encoder.Init(value.StateVector) + } + + l := uint(0) + if value.Name != nil { + l += 1 + l += uint(enc.TLNum(encoder.Name_length).EncodingLength()) + l += encoder.Name_length + } + l += 1 + l += uint(1 + enc.Nat(value.BootstrapTime).EncodingLength()) + if value.StateVector != nil { + l += 1 + l += uint(enc.TLNum(encoder.StateVector_encoder.length).EncodingLength()) + l += encoder.StateVector_encoder.length + } + encoder.length = l + +} + +func (context *InstanceStateParsingContext) Init() { + + context.StateVector_context.Init() +} + +func (encoder *InstanceStateEncoder) EncodeInto(value *InstanceState, buf []byte) { + + pos := uint(0) + + if value.Name != nil { + buf[pos] = byte(7) + pos += 1 + pos += uint(enc.TLNum(encoder.Name_length).EncodeInto(buf[pos:])) + for _, c := range value.Name { + pos += uint(c.EncodeInto(buf[pos:])) + } + } + buf[pos] = byte(212) + pos += 1 + + buf[pos] = byte(enc.Nat(value.BootstrapTime).EncodeInto(buf[pos+1:])) + pos += uint(1 + buf[pos]) + if value.StateVector != nil { + buf[pos] = byte(201) + pos += 1 + pos += uint(enc.TLNum(encoder.StateVector_encoder.length).EncodeInto(buf[pos:])) + if encoder.StateVector_encoder.length > 0 { + encoder.StateVector_encoder.EncodeInto(value.StateVector, buf[pos:]) + pos += encoder.StateVector_encoder.length + } + } +} + +func (encoder *InstanceStateEncoder) Encode(value *InstanceState) enc.Wire { + + wire := make(enc.Wire, 1) + wire[0] = make([]byte, encoder.length) + buf := wire[0] + encoder.EncodeInto(value, buf) + + return wire +} + +func (context *InstanceStateParsingContext) Parse(reader enc.WireView, ignoreCritical bool) (*InstanceState, error) { + + var handled_Name bool = false + var handled_BootstrapTime bool = false + var handled_StateVector bool = false + + progress := -1 + _ = progress + + value := &InstanceState{} + var err error + var startPos int + for { + startPos = reader.Pos() + if startPos >= reader.Length() { + break + } + typ := enc.TLNum(0) + l := enc.TLNum(0) + typ, err = reader.ReadTLNum() + if err != nil { + return nil, enc.ErrFailToParse{TypeNum: 0, Err: err} + } + l, err = reader.ReadTLNum() + if err != nil { + return nil, enc.ErrFailToParse{TypeNum: 0, Err: err} + } + + err = nil + if handled := false; true { + switch typ { + case 7: + if true { + handled = true + handled_Name = true + delegate := reader.Delegate(int(l)) + value.Name, err = delegate.ReadName() + } + case 212: + if true { + handled = true + handled_BootstrapTime = true + value.BootstrapTime = uint64(0) + { + for i := 0; i < int(l); i++ { + x := byte(0) + x, err = reader.ReadByte() + if err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + break + } + value.BootstrapTime = uint64(value.BootstrapTime<<8) | uint64(x) + } + } + } + case 201: + if true { + handled = true + handled_StateVector = true + value.StateVector, err = context.StateVector_context.Parse(reader.Delegate(int(l)), ignoreCritical) + } + default: + if !ignoreCritical && ((typ <= 31) || ((typ & 1) == 1)) { + return nil, enc.ErrUnrecognizedField{TypeNum: typ} + } + handled = true + err = reader.Skip(int(l)) + } + if err == nil && !handled { + } + if err != nil { + return nil, enc.ErrFailToParse{TypeNum: typ, Err: err} + } + } + } + + startPos = reader.Pos() + err = nil + + if !handled_Name && err == nil { + value.Name = nil + } + if !handled_BootstrapTime && err == nil { + err = enc.ErrSkipRequired{Name: "BootstrapTime", TypeNum: 212} + } + if !handled_StateVector && err == nil { + value.StateVector = nil + } + + if err != nil { + return nil, err + } + + return value, nil +} + +func (value *InstanceState) Encode() enc.Wire { + encoder := InstanceStateEncoder{} + encoder.Init(value) + return encoder.Encode(value) +} + +func (value *InstanceState) Bytes() []byte { + return value.Encode().Join() +} + +func ParseInstanceState(reader enc.WireView, ignoreCritical bool) (*InstanceState, error) { + context := InstanceStateParsingContext{} + context.Init() + return context.Parse(reader, ignoreCritical) +} diff --git a/std/object/client_local.go b/std/object/client_local.go index 7a68d564..c07081af 100644 --- a/std/object/client_local.go +++ b/std/object/client_local.go @@ -44,7 +44,7 @@ func (c *Client) GetLocal(name enc.Name) (enc.Wire, error) { for i := uint64(0); i <= lastSeg; i++ { name[len(name)-1] = enc.NewSegmentComponent(i) - raw, err := c.store.Get(name, true) + raw, err := c.store.Get(name, false) if err != nil { return nil, err } diff --git a/std/object/client_produce.go b/std/object/client_produce.go index 8ee7e25f..66f12d0c 100644 --- a/std/object/client_produce.go +++ b/std/object/client_produce.go @@ -132,9 +132,6 @@ func (c *Client) Remove(name enc.Name) error { return nil } - c.store.Begin() - defer c.store.Commit() - // Remove object data err := c.store.Remove(name, true) if err != nil { diff --git a/std/object/store_bolt.go b/std/object/store_bolt.go index 5dcc6e73..f8e98b89 100644 --- a/std/object/store_bolt.go +++ b/std/object/store_bolt.go @@ -49,7 +49,7 @@ func (s *BoltStore) Close() error { } func (s *BoltStore) Get(name enc.Name, prefix bool) (wire []byte, err error) { - key := s.encodeName(name) + key := s.nameKey(name) err = s.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(BoltBucket) if bucket == nil { @@ -92,11 +92,11 @@ func (s *BoltStore) Get(name enc.Name, prefix bool) (wire []byte, err error) { } func (s *BoltStore) Put(name enc.Name, version uint64, wire []byte) error { - key := s.encodeName(name) + key := s.nameKey(name) - buf := make([]byte, 8, 8+len(wire)) + buf := make([]byte, 8+len(wire)) binary.BigEndian.PutUint64(buf, version) - buf = append(buf, wire...) + copy(buf[8:], wire) // get lock after encoding data s.wmut.Lock() @@ -120,7 +120,7 @@ func (s *BoltStore) Put(name enc.Name, version uint64, wire []byte) error { } func (s *BoltStore) Remove(name enc.Name, prefix bool) error { - key := s.encodeName(name) + key := s.nameKey(name) return s.db.Update(func(tx *bolt.Tx) (err error) { bucket := tx.Bucket(BoltBucket) if bucket == nil { @@ -178,11 +178,6 @@ func (s *BoltStore) Rollback() error { return err } -func (s *BoltStore) encodeName(name enc.Name) []byte { - buf := make([]byte, name.EncodingLength()) - size := 0 - for _, comp := range name { - size += comp.EncodeInto(buf[size:]) - } - return buf[:size] +func (s *BoltStore) nameKey(name enc.Name) []byte { + return name.BytesInner() } diff --git a/std/sync/snapshot_node_history.go b/std/sync/snapshot_node_history.go index fb860840..a7784edc 100644 --- a/std/sync/snapshot_node_history.go +++ b/std/sync/snapshot_node_history.go @@ -218,7 +218,7 @@ func (s *SnapshotNodeHistory) takeSnap(seqNo uint64) { if len(index.SeqNos) > 0 { s.prevSeq = index.SeqNos[len(index.SeqNos)-1] } - if s.prevSeq >= seqNo-s.Threshold+1 { + if s.prevSeq > 0 && seqNo < s.prevSeq+s.Threshold-1 { log.Info(s, "Previous snapshot is still current", "prev", s.prevSeq, "seq", seqNo) return } diff --git a/std/sync/svs.go b/std/sync/svs.go index d8802f2c..47d230e3 100644 --- a/std/sync/svs.go +++ b/std/sync/svs.go @@ -39,6 +39,7 @@ type SvSyncOpts struct { GroupPrefix enc.Name OnUpdate func(SvSyncUpdate) + InitialState *spec_svs.StateVector BootTime uint64 PeriodicTimeout time.Duration SuppressionPeriod time.Duration @@ -64,6 +65,17 @@ func NewSvSync(opts SvSyncOpts) *SvSync { panic("SvSync: OnUpdate is required") } + // Use initial state if provided + initialState := NewSvMap[uint64](0) + if opts.InitialState != nil { + for _, node := range opts.InitialState.Entries { + hash := node.Name.TlvStr() + for _, entry := range node.SeqNoEntries { + initialState.Set(hash, entry.BootstrapTime, entry.SeqNo) + } + } + } + // Set default options if opts.BootTime == 0 { opts.BootTime = uint64(time.Now().Unix()) @@ -86,7 +98,7 @@ func NewSvSync(opts SvSyncOpts) *SvSync { ticker: time.NewTicker(1 * time.Second), mutex: sync.Mutex{}, - state: NewSvMap[uint64](0), + state: initialState, mtime: make(map[string]time.Time), suppress: false, diff --git a/std/sync/svs_alo.go b/std/sync/svs_alo.go index a84ecba6..ecfd1189 100644 --- a/std/sync/svs_alo.go +++ b/std/sync/svs_alo.go @@ -6,6 +6,7 @@ import ( enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/log" "github.com/named-data/ndnd/std/ndn" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" ) // SvsALO is a Sync Transport with At Least One delivery semantics. @@ -47,6 +48,8 @@ type SvsAloOpts struct { Svs SvSyncOpts // Snapshot is the snapshot strategy. Snapshot Snapshot + // InitialState is the initial state of the instance. + InitialState *spec_svs.InstanceState // MaxPipelineSize is the number of objects to fetch // concurrently for a single publisher (default 10) @@ -83,14 +86,40 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO { s.opts.MaxPipelineSize = 10 } - // Initialize the SVS instance. + // Read initial state if provided. s.opts.Svs.OnUpdate = s.onSvsUpdate + if s.opts.InitialState != nil { + if !s.opts.InitialState.Name.Equal(s.opts.Name) { + panic("Name mismatch in provided initial state") + } + s.opts.Svs.BootTime = s.opts.InitialState.BootstrapTime + s.opts.Svs.InitialState = s.opts.InitialState.StateVector + + for _, entry := range s.opts.InitialState.StateVector.Entries { + hash := entry.Name.TlvStr() + for _, seqEntry := range entry.SeqNoEntries { + s.state.Set(hash, seqEntry.BootstrapTime, svsDataState{ + Known: seqEntry.SeqNo, + Latest: seqEntry.SeqNo, + Pending: seqEntry.SeqNo, + }) + } + } + } + + // Initialize the underlying SVS instance s.svs = NewSvSync(s.opts.Svs) - // Get instance state + // Initialize the state vector with our own state. + // If initial state is provided, this should be equal. seqNo := s.svs.GetSeqNo(s.opts.Name) + s.state.Set(s.opts.Name.TlvStr(), s.BootTime(), svsDataState{ + Known: seqNo, + Latest: seqNo, + Pending: seqNo, + }) - // Use null snapshot strategy by default + // Configure the snapshot strategy. if s.opts.Snapshot == nil { s.opts.Snapshot = &SnapshotNull{} } else { @@ -102,13 +131,6 @@ func NewSvsALO(opts SvsAloOpts) *SvsALO { }) } - // Initialize the state vector with our own state. - s.state.Set(s.opts.Name.TlvStr(), s.BootTime(), svsDataState{ - Known: seqNo, - Latest: seqNo, - Pending: seqNo, - }) - return s } @@ -170,7 +192,7 @@ func (s *SvsALO) SetOnPublisher(callback func(enc.Name)) { } // Publish sends a message to the group -func (s *SvsALO) Publish(content enc.Wire) (enc.Name, error) { +func (s *SvsALO) Publish(content enc.Wire) (enc.Name, *spec_svs.InstanceState, error) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/std/sync/svs_alo_data.go b/std/sync/svs_alo_data.go index 3cc2551c..e19e2b9f 100644 --- a/std/sync/svs_alo_data.go +++ b/std/sync/svs_alo_data.go @@ -6,6 +6,7 @@ import ( enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/ndn" + spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3" ) type svsDataState struct { @@ -33,7 +34,7 @@ func (s *SvsALO) objectName(node enc.Name, boot uint64, seq uint64) enc.Name { WithVersion(enc.VersionImmutable) } -func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) { +func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, *spec_svs.InstanceState, error) { // This instance owns the underlying SVS instance. // So we can be sure that the sequence number does not // change while we hold the lock on this instance. @@ -46,7 +47,7 @@ func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) { Content: content, }) if err != nil { - return nil, err + return nil, nil, err } // We don't get notified of changes to our own state. @@ -65,7 +66,7 @@ func (s *SvsALO) produceObject(content enc.Wire) (enc.Name, error) { panic("[BUG] sequence number mismatch - who changed it?") } - return name, nil + return name, s.instanceState(), nil } // consumeCheck looks for new objects to fetch and queues them. @@ -223,11 +224,24 @@ func (s *SvsALO) queuePub(pub SvsPub) { pub.subcribers = slices.Collect(s.nodePs.Subs(pub.Publisher)) } - pub.State = s.state.Encode(func(state svsDataState) uint64 { + if pub.State == nil { + pub.State = s.instanceState() + } + + s.outpipe <- pub +} + +// instanceState returns the current state of the instance. +func (s *SvsALO) instanceState() *spec_svs.InstanceState { + stateVector := s.state.Encode(func(state svsDataState) uint64 { return state.Known }) - s.outpipe <- pub + return &spec_svs.InstanceState{ + Name: s.opts.Name, + BootstrapTime: s.BootTime(), + StateVector: stateVector, + } } // queueError queues an error to the application. diff --git a/std/sync/svs_pub.go b/std/sync/svs_pub.go index 513a758d..fc9e66aa 100644 --- a/std/sync/svs_pub.go +++ b/std/sync/svs_pub.go @@ -19,8 +19,8 @@ type SvsPub struct { SeqNum uint64 // IsSnapshot is true if this is a snapshot. IsSnapshot bool - // State is the state vector after this publication is applied. - State *spec_svs.StateVector + // State is the state after this publication is applied. + State *spec_svs.InstanceState // subcribers is the list of subscribers. subcribers []func(SvsPub)