Skip to content

Commit

Permalink
mmm: support ReplaceEvent() and single IndexingLayer storing.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Jan 19, 2025
1 parent 4b52299 commit cf52741
Show file tree
Hide file tree
Showing 6 changed files with 608 additions and 392 deletions.
95 changes: 47 additions & 48 deletions mmm/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,67 +12,66 @@ import (
)

func (il *IndexingLayer) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error {
mmmtxn.RawRead = true
return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error {
return il.delete(mmmtxn, iltxn, evt)
})
})
}

func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Event) error {
zeroRefs := false
b := il.mmmm

return b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
txn.RawRead = true
b.Logger.Debug().Str("layer", il.name).Uint16("id", il.id).Msg("deleting")

b.Logger.Debug().Str("layer", il.name).Uint16("id", il.id).Msg("deleting")

// first in the mmmm txn we check if we have the event still
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
val, err := txn.Get(b.indexId, idPrefix8)
if err != nil {
if lmdb.IsNotFound(err) {
// we already do not have this anywhere
return nil
}
return fmt.Errorf("failed to check if we have the event %x: %w", idPrefix8, err)
// first in the mmmm txn we check if we have the event still
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
val, err := mmmtxn.Get(b.indexId, idPrefix8)
if err != nil {
if lmdb.IsNotFound(err) {
// we already do not have this anywhere
return nil
}
return fmt.Errorf("failed to check if we have the event %x: %w", idPrefix8, err)
}

// we have this, but do we have it in the current layer?
// val is [posb][il_idx][il_idx...]
pos := positionFromBytes(val[0:12])
// we have this, but do we have it in the current layer?
// val is [posb][il_idx][il_idx...]
pos := positionFromBytes(val[0:12])

// check references
currentLayer := binary.BigEndian.AppendUint16(nil, il.id)
for i := 12; i < len(val); i += 2 {
if slices.Equal(val[i:i+2], currentLayer) {
// we will remove the current layer if it's found
nextval := make([]byte, len(val)-2)
copy(nextval, val[0:i])
copy(nextval[i:], val[i+2:])
// check references
currentLayer := binary.BigEndian.AppendUint16(nil, il.id)
for i := 12; i < len(val); i += 2 {
if slices.Equal(val[i:i+2], currentLayer) {
// we will remove the current layer if it's found
nextval := make([]byte, len(val)-2)
copy(nextval, val[0:i])
copy(nextval[i:], val[i+2:])

if err := txn.Put(b.indexId, idPrefix8, nextval, 0); err != nil {
return fmt.Errorf("failed to update references for %x: %w", idPrefix8, err)
}
if err := mmmtxn.Put(b.indexId, idPrefix8, nextval, 0); err != nil {
return fmt.Errorf("failed to update references for %x: %w", idPrefix8, err)
}

// if there are no more layers we will delete everything later
zeroRefs = len(nextval) == 12
// if there are no more layers we will delete everything later
zeroRefs = len(nextval) == 12

break
}
break
}
}

// then in the il transaction we remove it
if err := il.lmdbEnv.Update(func(txn *lmdb.Txn) error {
// calculate all index keys we have for this event and delete them
for k := range il.getIndexKeysForEvent(evt) {
if err := txn.Del(k.dbi, k.key, val[0:12]); err != nil && !lmdb.IsNotFound(err) {
return fmt.Errorf("index entry %v/%x deletion failed: %w", k.dbi, k.key, err)
}
}
return nil
}); err != nil {
return fmt.Errorf("failed to delete indexes for %s on %d: %w", evt.ID, il.id, err)
// calculate all index keys we have for this event and delete them
for k := range il.getIndexKeysForEvent(evt) {
if err := iltxn.Del(k.dbi, k.key, val[0:12]); err != nil && !lmdb.IsNotFound(err) {
return fmt.Errorf("index entry %v/%x deletion failed: %w", k.dbi, k.key, err)
}
}

// if there are no more refs we delete the event from the id index and mmap
if zeroRefs {
b.purge(txn, idPrefix8, pos)
}
// if there are no more refs we delete the event from the id index and mmap
if zeroRefs {
b.purge(mmmtxn, idPrefix8, pos)
}

return nil
})
return nil
}
3 changes: 3 additions & 0 deletions mmm/indexinglayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"os"

"github.com/PowerDNS/lmdb-go/lmdb"
"github.com/fiatjaf/eventstore"
"github.com/nbd-wtf/go-nostr"
)

var _ eventstore.Store = (*IndexingLayer)(nil)

type IndexingLayer struct {
name string // this is set automatically internally

Expand Down
104 changes: 104 additions & 0 deletions mmm/mmmm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package mmm

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/nbd-wtf/go-nostr"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

func TestMultiLayerIndexing(t *testing.T) {
// Create a temporary directory for the test
tmpDir, err := os.MkdirTemp("", "mmm-test-*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr})

// initialize MMM with two layers:
// 1. odd timestamps layer
// 2. even timestamps layer
mmm := &MultiMmapManager{
Dir: tmpDir,
Logger: &logger,
LayerBuilder: func(name string, b *MultiMmapManager) *IndexingLayer {
return &IndexingLayer{
dbpath: filepath.Join(tmpDir, name),
mmmm: b,
MaxLimit: 100,
ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool {
switch name {
case "odd":
return evt.CreatedAt%2 == 1
case "even":
return evt.CreatedAt%2 == 0
}
return false
},
}
},
}

err = mmm.Init()
require.NoError(t, err)
defer mmm.Close()

// create odd timestamps layer
err = mmm.CreateLayer("odd")
require.NoError(t, err)

// create even timestamps layer
err = mmm.CreateLayer("even")
require.NoError(t, err)

// create test events
ctx := context.Background()
baseTime := nostr.Timestamp(0)
sk := "945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb"
events := make([]*nostr.Event, 10)
for i := 0; i < 10; i++ {
evt := &nostr.Event{
CreatedAt: baseTime + nostr.Timestamp(i),
Kind: 1,
Tags: nostr.Tags{},
Content: "test content",
}
evt.Sign(sk)
events[i] = evt
stored, err := mmm.StoreGlobal(ctx, evt)
require.NoError(t, err)
require.True(t, stored)
}

// query odd layer
oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
Since: &baseTime,
})
require.NoError(t, err)

oddCount := 0
for evt := range oddResults {
require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(1))
oddCount++
}
require.Equal(t, 5, oddCount)

// query even layer
evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
Since: &baseTime,
})
require.NoError(t, err)

evenCount := 0
for evt := range evenResults {
require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(0))
evenCount++
}
require.Equal(t, 5, evenCount)
}
Loading

0 comments on commit cf52741

Please sign in to comment.