Skip to content

Commit

Permalink
svs: more on history
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 9, 2025
1 parent aeb0088 commit 970b3f5
Show file tree
Hide file tree
Showing 12 changed files with 364 additions and 67 deletions.
107 changes: 78 additions & 29 deletions std/examples/svs/alo-history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ import (
"bufio"
"fmt"
"os"
"strings"

enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/engine"
"github.com/named-data/ndnd/std/log"
"github.com/named-data/ndnd/std/ndn"
spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3"
"github.com/named-data/ndnd/std/ndn/svs_ps"
"github.com/named-data/ndnd/std/object"
ndn_sync "github.com/named-data/ndnd/std/sync"
)

var group, _ = enc.NameFromStr("/ndn/svs")
var name enc.Name
var svsalo *ndn_sync.SvsALO
var store ndn.Store

func main() {
// This example shows how to use the SVS ALO with the SnapshotNodeHistory.
//
Expand All @@ -31,8 +39,8 @@ func main() {
}

// Parse command line arguments
group, _ := enc.NameFromStr("/ndn/svs")
name, err := enc.NameFromStr(os.Args[1])
var err error
name, err = enc.NameFromStr(os.Args[1])
if err != nil {
log.Fatal(nil, "Invalid node ID", "name", os.Args[1])
return
Expand All @@ -47,19 +55,30 @@ func main() {
}
defer app.Stop()

// History snapshot works best with persistent storage
ident := strings.ReplaceAll(name.String(), "/", "-")
store, err = object.NewBoltStore(fmt.Sprintf("chat%s.db", ident))
if err != nil {
log.Error(nil, "Unable to create object store", "err", err)
return
}

// Create object client
client := object.NewClient(app, object.NewMemoryStore(), nil)
client := object.NewClient(app, store, nil)
if err = client.Start(); err != nil {
log.Error(nil, "Unable to start object client", "err", err)
return
}
defer client.Stop()

// Create a new SVS ALO instance
svsalo := ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
svsalo = ndn_sync.NewSvsALO(ndn_sync.SvsAloOpts{
// Name is the name of the node
Name: name,

// Initial state is the state of the node
InitialState: readState(),

// Svs is the Sync group options
Svs: ndn_sync.SvSyncOpts{
Client: client,
Expand All @@ -82,25 +101,27 @@ func main() {

// Subscribe to all publications
svsalo.SubscribePublisher(enc.Name{}, func(pub ndn_sync.SvsPub) {
// Normal publication, just print it
if !pub.IsSnapshot {
// Normal publication, just print it
fmt.Printf("%s: %s\n", pub.Publisher, pub.Bytes())
return
}
} else {
// Snapshot publication, unwrap and print all messages
snapshot, err := svs_ps.ParseHistorySnap(enc.NewWireView(pub.Content), true)
if err != nil {
panic(err) // undefined behavior after this point
}

// Snapshot publication, unwrap and print all messages
snapshot, err := svs_ps.ParseHistorySnap(enc.NewWireView(pub.Content), true)
if err != nil {
log.Error(nil, "Unable to parse snapshot", "err", err)
return
}
fmt.Fprintf(os.Stderr, "*** Snapshot from %s with %d entries\n",
pub.Publisher, len(snapshot.Entries))
fmt.Fprintf(os.Stderr, "*** Snapshot from %s with %d entries\n",
pub.Publisher, len(snapshot.Entries))

// Print all messages in the snapshot
for _, entry := range snapshot.Entries {
fmt.Printf("%s: %s\n", pub.Publisher, entry.Content.Join())
// Print all messages in the snapshot
for _, entry := range snapshot.Entries {
fmt.Printf("%s: %s\n", pub.Publisher, entry.Content.Join())
}
}

// Commit the state after processing the publication
commitState(pub.State)
})

// Register routes to the local forwarder
Expand All @@ -126,10 +147,7 @@ func main() {
fmt.Fprintln(os.Stderr)

// Publish an initial message to announce our presence
_, err = svsalo.Publish(enc.Wire{[]byte("Joined chat")})
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
}
publish([]byte("Entered the chat room"))

counter := 1
reader := bufio.NewReader(os.Stdin)
Expand All @@ -150,19 +168,50 @@ func main() {
// Special testing function !! to send 20 messages after counter
if string(line) == "!!" {
for i := 0; i < 20; i++ {
_, err = svsalo.Publish(enc.Wire{[]byte(fmt.Sprintf("Message %d", counter))})
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
}
publish([]byte(fmt.Sprintf("Message %d", counter)))
counter++
}
continue
}

// Publish chat message
_, err = svsalo.Publish(enc.Wire{line})
publish(line)
}
}

func publish(content []byte) {
_, state, err := svsalo.Publish(enc.Wire{content})
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
}

// Commit the state after processing our own publication
commitState(state)
}

func commitState(state *spec_svs.InstanceState) {
// Once a publication is processed, ideally the application should persist
// it's own state and the state of the Sync group *atomically*.
//
// Applications can use their own data structures to store the state.
// In this example, we use the object store to persist the state.
store.Put(group, 0, state.Encode().Join())
}

func readState() *spec_svs.InstanceState {
// Read the state from the object store
// See commitState for more information
stateWire, err := store.Get(group, false)
if err != nil {
log.Error(nil, "Unable to get state from object store", "err", err)
os.Exit(1)
}
if stateWire != nil {
state, err := spec_svs.ParseInstanceState(enc.NewBufferView(stateWire), true)
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
log.Error(nil, "Unable to parse state from object store", "err", err)
os.Exit(1)
}
return state
}
return nil
}
4 changes: 2 additions & 2 deletions std/examples/svs/alo-latest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func main() {
fmt.Fprintln(os.Stderr)

// Publish an initial empty message to announce our presence
_, err = svsalo.Publish(enc.Wire{})
_, _, err = svsalo.Publish(enc.Wire{})
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
}
Expand All @@ -203,7 +203,7 @@ func main() {
// Publish chat message
msgCount++
msgSize += len(line)
_, err = svsalo.Publish(enc.Wire{line})
_, _, err = svsalo.Publish(enc.Wire{line})
if err != nil {
log.Error(nil, "Unable to publish message", "err", err)
}
Expand Down
10 changes: 10 additions & 0 deletions std/ndn/svs/v3/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ type SeqNoEntry struct {
//+field:natural
SeqNo uint64 `tlv:"0xd6"`
}

// This actually belongs in SVS-PS but codegen doesn't support cross-package
type InstanceState struct {
//+field:name
Name enc.Name `tlv:"0x07"`
//+field:natural
BootstrapTime uint64 `tlv:"0xd4"`
//+field:struct:StateVector
StateVector *StateVector `tlv:"0xc9"`
}
Loading

0 comments on commit 970b3f5

Please sign in to comment.