Skip to content

Commit

Permalink
mmm: EnsureLayer() and saner layer management.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Feb 14, 2025
1 parent 6738efa commit 2898e57
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 104 deletions.
8 changes: 5 additions & 3 deletions mmm/indexinglayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"os"
"path/filepath"

"github.com/PowerDNS/lmdb-go/lmdb"
"github.com/fiatjaf/eventstore"
Expand All @@ -24,7 +25,6 @@ type IndexingLayer struct {
// this is stored in the knownLayers db as a value, and used to keep track of which layer owns each event
id uint16

dbpath string
lmdbEnv *lmdb.Env

indexCreatedAt lmdb.DBI
Expand All @@ -40,6 +40,8 @@ type IndexingLayer struct {
const multiIndexCreationFlags uint = lmdb.Create | lmdb.DupSort

func (il *IndexingLayer) Init() error {
path := filepath.Join(il.mmmm.Dir, il.name)

if il.MaxLimit == 0 {
il.MaxLimit = 500
}
Expand All @@ -55,11 +57,11 @@ func (il *IndexingLayer) Init() error {
env.SetMapSize(1 << 38) // ~273GB

// create directory if it doesn't exist and open it
if err := os.MkdirAll(il.dbpath, 0755); err != nil {
if err := os.MkdirAll(path, 0755); err != nil {
return err
}

err = env.Open(il.dbpath, lmdb.NoTLS, 0644)
err = env.Open(path, lmdb.NoTLS, 0644)
if err != nil {
return err
}
Expand Down
124 changes: 62 additions & 62 deletions mmm/mmmm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ type mmap []byte
func (_ mmap) String() string { return "<memory-mapped file>" }

type MultiMmapManager struct {
Dir string
Logger *zerolog.Logger
LayerBuilder func(name string, b *MultiMmapManager) *IndexingLayer
Dir string
Logger *zerolog.Logger

layers []*IndexingLayer
lastId uint16

mmapfPath string
mmapf mmap
Expand Down Expand Up @@ -54,7 +52,7 @@ func (b *MultiMmapManager) Init() error {
// create directory if it doesn't exist
dbpath := filepath.Join(b.Dir, "mmmm")
if err := os.MkdirAll(dbpath, 0755); err != nil {
return err
return fmt.Errorf("failed to create directory %s: %w", dbpath, err)
}

// open a huge mmapped file
Expand Down Expand Up @@ -134,48 +132,6 @@ func (b *MultiMmapManager) Init() error {
logOp.Msg("loaded free ranges")
}

// initialize layers from what we have stored
{
logOp := b.Logger.Debug()

cursor, err := txn.OpenCursor(b.knownLayers)
if err != nil && !lmdb.IsNotFound(err) {
return fmt.Errorf("on layers: %w", err)
}

b.layers = make([]*IndexingLayer, 0, 20)
for k, v, err := cursor.Get(nil, nil, lmdb.First); err == nil; k, v, err = cursor.Get(nil, nil, lmdb.Next) {
name := string(k)
id := binary.BigEndian.Uint16(v)
logOp.Str("name", name).Msg("loaded layer")

il := b.LayerBuilder(name, b)
if il == nil {
logOp.Str("name", name).Msg("layer was deleted")
if err := txn.Del(b.knownLayers, k, nil); err != nil {
return fmt.Errorf("on delete '%s': %w", name, err)
}

// remove all events that were only referenced by this layer
if err := b.removeAllReferencesFromLayer(txn, binary.BigEndian.Uint16(v)); err != nil {
return fmt.Errorf("on removing references: %w", err)
}

continue
}

il.name = name
il.id = id

if b.lastId < id {
b.lastId = id
}

il.Init()
b.layers = append(b.layers, il)
}
}

return nil
}); err != nil {
return fmt.Errorf("failed to open and load db data: %w", err)
Expand All @@ -184,27 +140,40 @@ func (b *MultiMmapManager) Init() error {
return nil
}

func (b *MultiMmapManager) CreateLayer(name string) error {
il := b.LayerBuilder(name, b)
if il == nil {
return fmt.Errorf("tried to create layer %s, but got a nil", name)
}

func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error {
il.mmmm = b
il.name = name

b.lastId++
if b.lastId == 0 {
return fmt.Errorf("reached end of available layer ids, mmm needs a refactor to be able to reuse ids or something like that")
}
il.id = b.lastId
err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
txn.RawRead = true

il.Init()
nameb := []byte(name)
if idv, err := txn.Get(b.knownLayers, nameb); lmdb.IsNotFound(err) {
if id, err := b.getNextAvailableLayerId(txn); err != nil {
return fmt.Errorf("failed to reserve a layer id for %s: %w", name, err)
} else {
il.id = id
}

err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
if err := il.runThroughEvents(txn); err != nil {
if err := il.Init(); err != nil {
return fmt.Errorf("failed to init new layer %s: %w", name, err)
}

if err := il.runThroughEvents(txn); err != nil {
return fmt.Errorf("failed to run %s through events: %w", name, err)
}
return txn.Put(b.knownLayers, []byte(name), binary.LittleEndian.AppendUint16(nil, il.id), 0)
} else if err == nil {
il.id = binary.LittleEndian.Uint16(idv)

if err := il.Init(); err != nil {
return fmt.Errorf("failed to init old layer %s: %w", name, err)
}

return nil
} else {
return err
}
return txn.Put(b.knownLayers, []byte(name), nil, 0)
})
if err != nil {
return err
Expand All @@ -214,6 +183,37 @@ func (b *MultiMmapManager) CreateLayer(name string) error {
return nil
}

// getNextAvailableLayerId iterates through all existing layers to find a vacant id
func (b *MultiMmapManager) getNextAvailableLayerId(txn *lmdb.Txn) (uint16, error) {
cursor, err := txn.OpenCursor(b.knownLayers)
if err != nil {
return 0, fmt.Errorf("failed to open cursor: %w", err)
}

used := [1 << 16]bool{}
_, val, err := cursor.Get(nil, nil, lmdb.First)
for err == nil {
// something was found
used[binary.LittleEndian.Uint16(val)] = true
// next
_, val, err = cursor.Get(nil, nil, lmdb.Next)
}
if !lmdb.IsNotFound(err) {
// a real error
return 0, err
}

// loop exited, get the first available
var id uint16
for num, isUsed := range used {
if !isUsed {
id = uint16(num)
break
}
}
return id, nil
}

func (b *MultiMmapManager) Close() {
b.lmdbEnv.Close()
for _, il := range b.layers {
Expand Down
77 changes: 38 additions & 39 deletions mmm/mmmm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/PowerDNS/lmdb-go/lmdb"
Expand All @@ -17,9 +16,8 @@ import (

func TestMultiLayerIndexing(t *testing.T) {
// Create a temporary directory for the test
tmpDir, err := os.MkdirTemp("", "mmm-test-*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
tmpDir := "/tmp/eventstore-mmm-test"
os.RemoveAll(tmpDir)

logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr})

Expand All @@ -30,36 +28,33 @@ func TestMultiLayerIndexing(t *testing.T) {
mmm := &MultiMmapManager{
Dir: tmpDir,
Logger: &logger,
LayerBuilder: func(name string, b *MultiMmapManager) *IndexingLayer {
return &IndexingLayer{
dbpath: filepath.Join(tmpDir, name),
mmmm: b,
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool {
switch name {
case "odd":
return evt.CreatedAt%2 == 1
case "even":
return evt.CreatedAt%2 == 0
case "all":
return true
}
return false
},
}
},
}

err = mmm.Init()
err := mmm.Init()
require.NoError(t, err)
defer mmm.Close()

// create layers
err = mmm.CreateLayer("odd")
err = mmm.EnsureLayer("odd", &IndexingLayer{
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool {
return evt.CreatedAt%2 == 1
},
})
require.NoError(t, err)
err = mmm.CreateLayer("even")
err = mmm.EnsureLayer("even", &IndexingLayer{
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool {
return evt.CreatedAt%2 == 0
},
})
require.NoError(t, err)
err = mmm.CreateLayer("all")
err = mmm.EnsureLayer("all", &IndexingLayer{
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool {
return true
},
})
require.NoError(t, err)

// create test events
Expand Down Expand Up @@ -242,7 +237,7 @@ func TestMultiLayerIndexing(t *testing.T) {
}
require.Equal(t, 1, count)

// verify other layers still have old version
// verify other layers still have the old version
for j := 0; j < 3; j++ {
if mmm.layers[j] == layer {
continue
Expand Down Expand Up @@ -279,28 +274,32 @@ func TestLayerReferenceTracking(t *testing.T) {
mmm := &MultiMmapManager{
Dir: tmpDir,
Logger: &logger,
LayerBuilder: func(name string, b *MultiMmapManager) *IndexingLayer {
return &IndexingLayer{
dbpath: filepath.Join(tmpDir, name),
mmmm: b,
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool { return true },
}
},
}

err = mmm.Init()
require.NoError(t, err)
defer mmm.Close()

// create three layers
err = mmm.CreateLayer("layer1")
err = mmm.EnsureLayer("layer1", &IndexingLayer{
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool { return true },
})
require.NoError(t, err)
err = mmm.CreateLayer("layer2")
err = mmm.EnsureLayer("layer2", &IndexingLayer{
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool { return true },
})
require.NoError(t, err)
err = mmm.CreateLayer("layer3")
err = mmm.EnsureLayer("layer3", &IndexingLayer{
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool { return true },
})
require.NoError(t, err)
err = mmm.CreateLayer("layer4")
err = mmm.EnsureLayer("layer4", &IndexingLayer{
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool { return true },
})
require.NoError(t, err)

// create test events
Expand Down

0 comments on commit 2898e57

Please sign in to comment.