From 684cfb3cd7c3e52123f5f3e0e513ad1a915130af Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sun, 19 Jan 2025 20:07:55 -0300 Subject: [PATCH] mmm: fixes and more tests. --- mmm/delete.go | 6 +- mmm/mmmm_test.go | 211 +++++++++++++++++++++++++++++++++++++++++------ mmm/replace.go | 4 +- mmm/save.go | 72 ++++++++-------- 4 files changed, 230 insertions(+), 63 deletions(-) diff --git a/mmm/delete.go b/mmm/delete.go index f860805..aaa1b8c 100644 --- a/mmm/delete.go +++ b/mmm/delete.go @@ -24,7 +24,7 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Ev zeroRefs := false b := il.mmmm - b.Logger.Debug().Str("layer", il.name).Uint16("id", il.id).Msg("deleting") + b.Logger.Debug().Str("layer", il.name).Uint16("il", il.id).Msg("deleting") // first in the mmmm txn we check if we have the event still idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) @@ -70,7 +70,9 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Ev // if there are no more refs we delete the event from the id index and mmap if zeroRefs { - b.purge(mmmtxn, idPrefix8, pos) + if err := b.purge(mmmtxn, idPrefix8, pos); err != nil { + panic(err) + } } return nil diff --git a/mmm/mmmm_test.go b/mmm/mmmm_test.go index fee5fee..ccc2019 100644 --- a/mmm/mmmm_test.go +++ b/mmm/mmmm_test.go @@ -2,6 +2,7 @@ package mmm import ( "context" + "fmt" "os" "path/filepath" "testing" @@ -19,9 +20,10 @@ func TestMultiLayerIndexing(t *testing.T) { logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}) - // initialize MMM with two layers: + // initialize MMM with three layers: // 1. odd timestamps layer // 2. even timestamps layer + // 3. all events layer mmm := &MultiMmapManager{ Dir: tmpDir, Logger: &logger, @@ -36,6 +38,8 @@ func TestMultiLayerIndexing(t *testing.T) { return evt.CreatedAt%2 == 1 case "even": return evt.CreatedAt%2 == 0 + case "all": + return true } return false }, @@ -47,13 +51,13 @@ func TestMultiLayerIndexing(t *testing.T) { require.NoError(t, err) defer mmm.Close() - // create odd timestamps layer + // create layers err = mmm.CreateLayer("odd") require.NoError(t, err) - - // create even timestamps layer err = mmm.CreateLayer("even") require.NoError(t, err) + err = mmm.CreateLayer("all") + require.NoError(t, err) // create test events ctx := context.Background() @@ -74,31 +78,188 @@ func TestMultiLayerIndexing(t *testing.T) { require.True(t, stored) } - // query odd layer - oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{ - Kinds: []int{1}, - Since: &baseTime, - }) - require.NoError(t, err) + { + // query odd layer + oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) - oddCount := 0 - for evt := range oddResults { - require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(1)) - oddCount++ + oddCount := 0 + for evt := range oddResults { + require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(1)) + oddCount++ + } + require.Equal(t, 5, oddCount) + } + + { + // query even layer + evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) + + evenCount := 0 + for evt := range evenResults { + require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(0)) + evenCount++ + } + require.Equal(t, 5, evenCount) + } + + { + // query all layer + allResults, err := mmm.layers[2].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) + + allCount := 0 + for range allResults { + allCount++ + } + require.Equal(t, 10, allCount) } - require.Equal(t, 5, oddCount) - // query even layer - evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{ - Kinds: []int{1}, - Since: &baseTime, - }) + // delete some events + err = mmm.layers[0].DeleteEvent(ctx, events[1]) // odd timestamp require.NoError(t, err) + err = mmm.layers[1].DeleteEvent(ctx, events[2]) // even timestamp + require.NoError(t, err) + + // verify deletions + { + oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) + oddCount := 0 + for range oddResults { + oddCount++ + } + require.Equal(t, 4, oddCount) + } + + { + evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) + evenCount := 0 + for range evenResults { + evenCount++ + } + require.Equal(t, 4, evenCount) + } - evenCount := 0 - for evt := range evenResults { - require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(0)) - evenCount++ + { + allResults, err := mmm.layers[2].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) + allCount := 0 + for range allResults { + allCount++ + } + require.Equal(t, 10, allCount) + } + + // save events directly to layers regardless of timestamp + { + oddEvent := &nostr.Event{ + CreatedAt: baseTime + 100, // even timestamp + Kind: 1, + Content: "forced odd", + } + oddEvent.Sign(sk) + err = mmm.layers[0].SaveEvent(ctx, oddEvent) // save even timestamp to odd layer + require.NoError(t, err) + + // it is added to the odd il + oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) + oddCount := 0 + for range oddResults { + oddCount++ + } + require.Equal(t, 5, oddCount) + + // it doesn't affect the event il + evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + }) + require.NoError(t, err) + evenCount := 0 + for range evenResults { + evenCount++ + } + require.Equal(t, 4, evenCount) + } + + // test replaceable events + for _, layer := range mmm.layers { + replaceable := &nostr.Event{ + CreatedAt: baseTime + 0, + Kind: 0, + Content: fmt.Sprintf("first"), + } + replaceable.Sign(sk) + err := layer.ReplaceEvent(ctx, replaceable) + require.NoError(t, err) + } + + // replace events alternating between layers + for i := range mmm.layers { + content := fmt.Sprintf("last %d", i) + + newEvt := &nostr.Event{ + CreatedAt: baseTime + 1000, + Kind: 0, + Content: content, + } + newEvt.Sign(sk) + + layer := mmm.layers[i] + err = layer.ReplaceEvent(ctx, newEvt) + require.NoError(t, err) + + // verify replacement in the layer that did it + results, err := layer.QueryEvents(ctx, nostr.Filter{ + Kinds: []int{0}, + }) + require.NoError(t, err) + + count := 0 + for evt := range results { + require.Equal(t, content, evt.Content) + count++ + } + require.Equal(t, 1, count) + + // verify other layers still have old version + for j := 0; j < 3; j++ { + if mmm.layers[j] == layer { + continue + } + results, err := mmm.layers[j].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{0}, + }) + require.NoError(t, err) + + count := 0 + for evt := range results { + if i < j { + require.Equal(t, "first", evt.Content) + } else { + require.Equal(t, evt.Content, fmt.Sprintf("last %d", j)) + } + count++ + } + + require.Equal(t, 1, count, "%d/%d", i, j) + } } - require.Equal(t, 5, evenCount) } diff --git a/mmm/replace.go b/mmm/replace.go index c8a4935..720d041 100644 --- a/mmm/replace.go +++ b/mmm/replace.go @@ -27,13 +27,13 @@ func (il *IndexingLayer) ReplaceEvent(ctx context.Context, evt *nostr.Event) err return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { // now we fetch the past events, whatever they are, delete them and then save the new - results, err := il.query(iltxn, filter, 10) // in theory limit could be just 1 and this should work + prevResults, err := il.query(iltxn, filter, 10) // in theory limit could be just 1 and this should work if err != nil { return fmt.Errorf("failed to query past events with %s: %w", filter, err) } shouldStore := true - for _, previous := range results { + for _, previous := range prevResults { if internal.IsOlder(previous.Event, evt) { if err := il.delete(mmmtxn, iltxn, previous.Event); err != nil { return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err) diff --git a/mmm/save.go b/mmm/save.go index 459f3e3..6f7dcf6 100644 --- a/mmm/save.go +++ b/mmm/save.go @@ -117,33 +117,29 @@ func (b *MultiMmapManager) storeOn(mmmtxn *lmdb.Txn, ils []*IndexingLayer, iltxn idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) val, err := mmmtxn.Get(b.indexId, idPrefix8) if err == nil { - // we found the event, which means it is already indexed by every layer who wanted to index it - return false, nil - } - - // we will only proceed if we get a NotFound -- for anything else we will error - if !lmdb.IsNotFound(err) { - return false, fmt.Errorf("error storing: %w", err) - } - - // prepare value to be saved in the id index - // val: [posb][layerIdRefs...] - val = make([]byte, 12, 12+2*len(b.layers)) - - // these are the bytes we must fill in with the position once we have it - reservedResults := make([][]byte, 0, len(b.layers)) - - for i, il := range ils { - iltxn := iltxns[i] - // start a txn here and close it at the end only as we will have to use the valReserve arrays later - for k := range il.getIndexKeysForEvent(evt) { - valReserve, err := iltxn.PutReserve(k.dbi, k.key, 12, 0) - if err != nil { - b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer") + // we found the event, now check if it is already indexed by the layers that want to store it + for i := len(ils) - 1; i >= 0; i-- { + for s := 12; s < len(val); s += 2 { + ilid := binary.BigEndian.Uint16(val[s : s+2]) + if ils[i].id == ilid { + // swap delete this il, but keep the deleted ones at the end + // (so the caller can successfully finalize the transactions) + ils[i], ils[len(ils)-1] = ils[len(ils)-1], ils[i] + ils = ils[0 : len(ils)-1] + iltxns[i], iltxns[len(iltxns)-1] = iltxns[len(iltxns)-1], iltxns[i] + iltxns = iltxns[0 : len(iltxns)-1] + break + } } - reservedResults = append(reservedResults, valReserve) } - val = binary.BigEndian.AppendUint16(val, il.id) + } else if !lmdb.IsNotFound(err) { + // now if we got an error from lmdb we will only proceed if we get a NotFound -- for anything else we will error + return false, fmt.Errorf("error checking existence: %w", err) + } + + // if all ils already have this event indexed (or no il was given) we can end here + if len(ils) == 0 { + return false, nil } // find a suitable place for this to be stored in @@ -189,17 +185,25 @@ func (b *MultiMmapManager) storeOn(mmmtxn *lmdb.Txn, ils []*IndexingLayer, iltxn // write to the mmap betterbinary.Marshal(*evt, b.mmapf[pos.start:]) - // this is what we will write in the indexes - posb := make([]byte, 12) - binary.BigEndian.PutUint32(posb[0:4], pos.size) - binary.BigEndian.PutUint64(posb[4:12], pos.start) + // prepare value to be saved in the id index (if we didn't have it already) + // val: [posb][layerIdRefs...] + if val == nil { + val = make([]byte, 12, 12+2*len(b.layers)) + binary.BigEndian.PutUint32(val[0:4], pos.size) + binary.BigEndian.PutUint64(val[4:12], pos.start) + } + + // each index that was reserved above for the different layers + for i, il := range ils { + iltxn := iltxns[i] - // the id index (it already had the refcounts, but was missing the pos) - copy(val[0:12], posb) + for k := range il.getIndexKeysForEvent(evt) { + if err := iltxn.Put(k.dbi, k.key, val[0:12] /* pos */, 0); err != nil { + b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer") + } + } - // each index that was reserved above for the different Layers - for _, valReserve := range reservedResults { - copy(valReserve, posb) + val = binary.BigEndian.AppendUint16(val, il.id) } // store the id index with the refcounts