Skip to content

Commit

Permalink
mmm: fixes and more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Jan 19, 2025
1 parent cf52741 commit 684cfb3
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 63 deletions.
6 changes: 4 additions & 2 deletions mmm/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Ev
zeroRefs := false
b := il.mmmm

b.Logger.Debug().Str("layer", il.name).Uint16("id", il.id).Msg("deleting")
b.Logger.Debug().Str("layer", il.name).Uint16("il", il.id).Msg("deleting")

// first in the mmmm txn we check if we have the event still
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
Expand Down Expand Up @@ -70,7 +70,9 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Ev

// if there are no more refs we delete the event from the id index and mmap
if zeroRefs {
b.purge(mmmtxn, idPrefix8, pos)
if err := b.purge(mmmtxn, idPrefix8, pos); err != nil {
panic(err)
}
}

return nil
Expand Down
211 changes: 186 additions & 25 deletions mmm/mmmm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mmm

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
Expand All @@ -19,9 +20,10 @@ func TestMultiLayerIndexing(t *testing.T) {

logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr})

// initialize MMM with two layers:
// initialize MMM with three layers:
// 1. odd timestamps layer
// 2. even timestamps layer
// 3. all events layer
mmm := &MultiMmapManager{
Dir: tmpDir,
Logger: &logger,
Expand All @@ -36,6 +38,8 @@ func TestMultiLayerIndexing(t *testing.T) {
return evt.CreatedAt%2 == 1
case "even":
return evt.CreatedAt%2 == 0
case "all":
return true
}
return false
},
Expand All @@ -47,13 +51,13 @@ func TestMultiLayerIndexing(t *testing.T) {
require.NoError(t, err)
defer mmm.Close()

// create odd timestamps layer
// create layers
err = mmm.CreateLayer("odd")
require.NoError(t, err)

// create even timestamps layer
err = mmm.CreateLayer("even")
require.NoError(t, err)
err = mmm.CreateLayer("all")
require.NoError(t, err)

// create test events
ctx := context.Background()
Expand All @@ -74,31 +78,188 @@ func TestMultiLayerIndexing(t *testing.T) {
require.True(t, stored)
}

// query odd layer
oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
Since: &baseTime,
})
require.NoError(t, err)
{
// query odd layer
oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)

oddCount := 0
for evt := range oddResults {
require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(1))
oddCount++
oddCount := 0
for evt := range oddResults {
require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(1))
oddCount++
}
require.Equal(t, 5, oddCount)
}

{
// query even layer
evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)

evenCount := 0
for evt := range evenResults {
require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(0))
evenCount++
}
require.Equal(t, 5, evenCount)
}

{
// query all layer
allResults, err := mmm.layers[2].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)

allCount := 0
for range allResults {
allCount++
}
require.Equal(t, 10, allCount)
}
require.Equal(t, 5, oddCount)

// query even layer
evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
Since: &baseTime,
})
// delete some events
err = mmm.layers[0].DeleteEvent(ctx, events[1]) // odd timestamp
require.NoError(t, err)
err = mmm.layers[1].DeleteEvent(ctx, events[2]) // even timestamp
require.NoError(t, err)

// verify deletions
{
oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)
oddCount := 0
for range oddResults {
oddCount++
}
require.Equal(t, 4, oddCount)
}

{
evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)
evenCount := 0
for range evenResults {
evenCount++
}
require.Equal(t, 4, evenCount)
}

evenCount := 0
for evt := range evenResults {
require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(0))
evenCount++
{
allResults, err := mmm.layers[2].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)
allCount := 0
for range allResults {
allCount++
}
require.Equal(t, 10, allCount)
}

// save events directly to layers regardless of timestamp
{
oddEvent := &nostr.Event{
CreatedAt: baseTime + 100, // even timestamp
Kind: 1,
Content: "forced odd",
}
oddEvent.Sign(sk)
err = mmm.layers[0].SaveEvent(ctx, oddEvent) // save even timestamp to odd layer
require.NoError(t, err)

// it is added to the odd il
oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)
oddCount := 0
for range oddResults {
oddCount++
}
require.Equal(t, 5, oddCount)

// it doesn't affect the event il
evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{
Kinds: []int{1},
})
require.NoError(t, err)
evenCount := 0
for range evenResults {
evenCount++
}
require.Equal(t, 4, evenCount)
}

// test replaceable events
for _, layer := range mmm.layers {
replaceable := &nostr.Event{
CreatedAt: baseTime + 0,
Kind: 0,
Content: fmt.Sprintf("first"),
}
replaceable.Sign(sk)
err := layer.ReplaceEvent(ctx, replaceable)
require.NoError(t, err)
}

// replace events alternating between layers
for i := range mmm.layers {
content := fmt.Sprintf("last %d", i)

newEvt := &nostr.Event{
CreatedAt: baseTime + 1000,
Kind: 0,
Content: content,
}
newEvt.Sign(sk)

layer := mmm.layers[i]
err = layer.ReplaceEvent(ctx, newEvt)
require.NoError(t, err)

// verify replacement in the layer that did it
results, err := layer.QueryEvents(ctx, nostr.Filter{
Kinds: []int{0},
})
require.NoError(t, err)

count := 0
for evt := range results {
require.Equal(t, content, evt.Content)
count++
}
require.Equal(t, 1, count)

// verify other layers still have old version
for j := 0; j < 3; j++ {
if mmm.layers[j] == layer {
continue
}
results, err := mmm.layers[j].QueryEvents(ctx, nostr.Filter{
Kinds: []int{0},
})
require.NoError(t, err)

count := 0
for evt := range results {
if i < j {
require.Equal(t, "first", evt.Content)
} else {
require.Equal(t, evt.Content, fmt.Sprintf("last %d", j))
}
count++
}

require.Equal(t, 1, count, "%d/%d", i, j)
}
}
require.Equal(t, 5, evenCount)
}
4 changes: 2 additions & 2 deletions mmm/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ func (il *IndexingLayer) ReplaceEvent(ctx context.Context, evt *nostr.Event) err

return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error {
// now we fetch the past events, whatever they are, delete them and then save the new
results, err := il.query(iltxn, filter, 10) // in theory limit could be just 1 and this should work
prevResults, err := il.query(iltxn, filter, 10) // in theory limit could be just 1 and this should work
if err != nil {
return fmt.Errorf("failed to query past events with %s: %w", filter, err)
}

shouldStore := true
for _, previous := range results {
for _, previous := range prevResults {
if internal.IsOlder(previous.Event, evt) {
if err := il.delete(mmmtxn, iltxn, previous.Event); err != nil {
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err)
Expand Down
72 changes: 38 additions & 34 deletions mmm/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,33 +117,29 @@ func (b *MultiMmapManager) storeOn(mmmtxn *lmdb.Txn, ils []*IndexingLayer, iltxn
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
val, err := mmmtxn.Get(b.indexId, idPrefix8)
if err == nil {
// we found the event, which means it is already indexed by every layer who wanted to index it
return false, nil
}

// we will only proceed if we get a NotFound -- for anything else we will error
if !lmdb.IsNotFound(err) {
return false, fmt.Errorf("error storing: %w", err)
}

// prepare value to be saved in the id index
// val: [posb][layerIdRefs...]
val = make([]byte, 12, 12+2*len(b.layers))

// these are the bytes we must fill in with the position once we have it
reservedResults := make([][]byte, 0, len(b.layers))

for i, il := range ils {
iltxn := iltxns[i]
// start a txn here and close it at the end only as we will have to use the valReserve arrays later
for k := range il.getIndexKeysForEvent(evt) {
valReserve, err := iltxn.PutReserve(k.dbi, k.key, 12, 0)
if err != nil {
b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer")
// we found the event, now check if it is already indexed by the layers that want to store it
for i := len(ils) - 1; i >= 0; i-- {
for s := 12; s < len(val); s += 2 {
ilid := binary.BigEndian.Uint16(val[s : s+2])
if ils[i].id == ilid {
// swap delete this il, but keep the deleted ones at the end
// (so the caller can successfully finalize the transactions)
ils[i], ils[len(ils)-1] = ils[len(ils)-1], ils[i]
ils = ils[0 : len(ils)-1]
iltxns[i], iltxns[len(iltxns)-1] = iltxns[len(iltxns)-1], iltxns[i]
iltxns = iltxns[0 : len(iltxns)-1]
break
}
}
reservedResults = append(reservedResults, valReserve)
}
val = binary.BigEndian.AppendUint16(val, il.id)
} else if !lmdb.IsNotFound(err) {
// now if we got an error from lmdb we will only proceed if we get a NotFound -- for anything else we will error
return false, fmt.Errorf("error checking existence: %w", err)
}

// if all ils already have this event indexed (or no il was given) we can end here
if len(ils) == 0 {
return false, nil
}

// find a suitable place for this to be stored in
Expand Down Expand Up @@ -189,17 +185,25 @@ func (b *MultiMmapManager) storeOn(mmmtxn *lmdb.Txn, ils []*IndexingLayer, iltxn
// write to the mmap
betterbinary.Marshal(*evt, b.mmapf[pos.start:])

// this is what we will write in the indexes
posb := make([]byte, 12)
binary.BigEndian.PutUint32(posb[0:4], pos.size)
binary.BigEndian.PutUint64(posb[4:12], pos.start)
// prepare value to be saved in the id index (if we didn't have it already)
// val: [posb][layerIdRefs...]
if val == nil {
val = make([]byte, 12, 12+2*len(b.layers))
binary.BigEndian.PutUint32(val[0:4], pos.size)
binary.BigEndian.PutUint64(val[4:12], pos.start)
}

// each index that was reserved above for the different layers
for i, il := range ils {
iltxn := iltxns[i]

// the id index (it already had the refcounts, but was missing the pos)
copy(val[0:12], posb)
for k := range il.getIndexKeysForEvent(evt) {
if err := iltxn.Put(k.dbi, k.key, val[0:12] /* pos */, 0); err != nil {
b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer")
}
}

// each index that was reserved above for the different Layers
for _, valReserve := range reservedResults {
copy(valReserve, posb)
val = binary.BigEndian.AppendUint16(val, il.id)
}

// store the id index with the refcounts
Expand Down

0 comments on commit 684cfb3

Please sign in to comment.