Skip to content

Commit

Permalink
std: add write caching store
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 11, 2025
1 parent 4bb53e8 commit 561b71f
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 1 deletion.
8 changes: 7 additions & 1 deletion std/examples/svs/alo-history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,17 @@ func main() {

// History snapshot works best with persistent storage
ident := strings.ReplaceAll(name.String(), "/", "-")
store, err = object.NewBoltStore(fmt.Sprintf("chat%s.db", ident))
bstore, err := object.NewBoltStore(fmt.Sprintf("chat%s.db", ident))
if err != nil {
log.Error(nil, "Unable to create object store", "err", err)
return
}
defer bstore.Close()

// Use caching layer to reduce the number of writes to the underlying store
cstore := object.NewCachingStore(bstore)
defer cstore.Close()
store = cstore

// Create object client
client := object.NewClient(app, store, nil)
Expand Down
204 changes: 204 additions & 0 deletions std/object/store_caching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package object

import (
"sync"
"time"

enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/log"
"github.com/named-data/ndnd/std/ndn"
)

// CachingStore is a store that caches objects from another store.
type CachingStore struct {
// store is the underlying store.
store ndn.Store
// cache is the cache.
cache *MemoryStore
// journal is the journal.
journal []journalEntry
// jmutex is the journal mutex.
jmutex sync.Mutex
// txp is the active transaction parent
txp *CachingStore
// flushTimer is the flush timer.
flushTimer *time.Timer
}

type journalEntry struct {
// name is the name of the object.
name enc.Name
// version is the version of the object.
version uint64
// wire is the wire of the object.
wire []byte
// action is the action.
action journalAction
}

type journalAction uint8

const (
journalActionPut = iota
journalActionRemove
)

// NewCachingStore creates a new caching store.
func NewCachingStore(store ndn.Store) *CachingStore {
return &CachingStore{
store: store,
cache: NewMemoryStore(),
journal: make([]journalEntry, 0),
}
}

func (s *CachingStore) Close() error {
return s.Flush()
}

func (s *CachingStore) String() string {
return "caching-store"
}

func (s *CachingStore) Get(name enc.Name, prefix bool) ([]byte, error) {
wire, err := s.cache.Get(name, prefix)
if wire != nil || err != nil {
return wire, err
}
return s.store.Get(name, prefix)
}

func (s *CachingStore) Put(name enc.Name, version uint64, wire []byte) error {
if err := s.cache.Put(name, version, wire); err != nil {
return err
}

s.jmutex.Lock()
defer s.jmutex.Unlock()

s.journal = append(s.journal, journalEntry{
name: name,
version: version,
wire: wire,
action: journalActionPut,
})

s.scheduleFlush()

return nil
}

func (s *CachingStore) Remove(name enc.Name, prefix bool) error {
if err := s.cache.Remove(name, prefix); err != nil {
return err
}

s.jmutex.Lock()
defer s.jmutex.Unlock()

s.journal = append(s.journal, journalEntry{
name: name,
action: journalActionRemove,
})

s.scheduleFlush()

return nil
}

func (s *CachingStore) Begin() (ndn.Store, error) {
cacheTxn, err := s.cache.Begin()
if err != nil {
return nil, err
}

return &CachingStore{
store: s.store,
cache: cacheTxn.(*MemoryStore),
journal: make([]journalEntry, 0),
txp: s,
}, nil
}

func (s *CachingStore) Commit() error {
if err := s.cache.Commit(); err != nil {
return err
}

s.txp.jmutex.Lock()
defer s.txp.jmutex.Unlock()

s.txp.journal = append(s.txp.journal, s.journal...)
s.txp.scheduleFlush()

return nil
}

func (s *CachingStore) Rollback() error {
return s.cache.Rollback()
}

// Flush flushes the cache.
func (s *CachingStore) Flush() error {
s.jmutex.Lock()
journal := s.journal
s.journal = s.journal[len(journal):]
s.jmutex.Unlock()

if len(journal) == 0 {
return nil
}

if err := func() error {
tx, err := s.store.Begin()
if err != nil {
return err
}
defer tx.Commit()

for _, entry := range journal {
switch entry.action {
case journalActionPut:
if err := tx.Put(entry.name, entry.version, entry.wire); err != nil {
return err
}
case journalActionRemove:
if err := tx.Remove(entry.name, false); err != nil {
return err
}
}
}

return nil
}(); err != nil {
return err
}

// Evict cache entries from PUT journal entries.
// TODO: this is incorrect if a new PUT entry is added during the flush.
for _, entry := range journal {
if entry.action == journalActionPut {
s.cache.Remove(entry.name, false)
}
}

return nil
}

func (s *CachingStore) scheduleFlush() {
if s.flushTimer != nil || s.txp != nil || len(s.journal) == 0 {
return
}

s.flushTimer = time.AfterFunc(100*time.Millisecond, func() {
if err := s.Flush(); err != nil {
log.Error(s, "Failed to flush cache", "err", err)
}

s.jmutex.Lock()
defer s.jmutex.Unlock()

s.flushTimer = nil
s.scheduleFlush()
})
}

0 comments on commit 561b71f

Please sign in to comment.