diff --git a/mmm/delete.go b/mmm/delete.go index 11b19a3..f860805 100644 --- a/mmm/delete.go +++ b/mmm/delete.go @@ -12,67 +12,66 @@ import ( ) func (il *IndexingLayer) DeleteEvent(ctx context.Context, evt *nostr.Event) error { + return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error { + mmmtxn.RawRead = true + return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { + return il.delete(mmmtxn, iltxn, evt) + }) + }) +} + +func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, evt *nostr.Event) error { zeroRefs := false b := il.mmmm - return b.lmdbEnv.Update(func(txn *lmdb.Txn) error { - txn.RawRead = true + b.Logger.Debug().Str("layer", il.name).Uint16("id", il.id).Msg("deleting") - b.Logger.Debug().Str("layer", il.name).Uint16("id", il.id).Msg("deleting") - - // first in the mmmm txn we check if we have the event still - idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) - val, err := txn.Get(b.indexId, idPrefix8) - if err != nil { - if lmdb.IsNotFound(err) { - // we already do not have this anywhere - return nil - } - return fmt.Errorf("failed to check if we have the event %x: %w", idPrefix8, err) + // first in the mmmm txn we check if we have the event still + idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) + val, err := mmmtxn.Get(b.indexId, idPrefix8) + if err != nil { + if lmdb.IsNotFound(err) { + // we already do not have this anywhere + return nil } + return fmt.Errorf("failed to check if we have the event %x: %w", idPrefix8, err) + } - // we have this, but do we have it in the current layer? - // val is [posb][il_idx][il_idx...] - pos := positionFromBytes(val[0:12]) + // we have this, but do we have it in the current layer? + // val is [posb][il_idx][il_idx...] + pos := positionFromBytes(val[0:12]) - // check references - currentLayer := binary.BigEndian.AppendUint16(nil, il.id) - for i := 12; i < len(val); i += 2 { - if slices.Equal(val[i:i+2], currentLayer) { - // we will remove the current layer if it's found - nextval := make([]byte, len(val)-2) - copy(nextval, val[0:i]) - copy(nextval[i:], val[i+2:]) + // check references + currentLayer := binary.BigEndian.AppendUint16(nil, il.id) + for i := 12; i < len(val); i += 2 { + if slices.Equal(val[i:i+2], currentLayer) { + // we will remove the current layer if it's found + nextval := make([]byte, len(val)-2) + copy(nextval, val[0:i]) + copy(nextval[i:], val[i+2:]) - if err := txn.Put(b.indexId, idPrefix8, nextval, 0); err != nil { - return fmt.Errorf("failed to update references for %x: %w", idPrefix8, err) - } + if err := mmmtxn.Put(b.indexId, idPrefix8, nextval, 0); err != nil { + return fmt.Errorf("failed to update references for %x: %w", idPrefix8, err) + } - // if there are no more layers we will delete everything later - zeroRefs = len(nextval) == 12 + // if there are no more layers we will delete everything later + zeroRefs = len(nextval) == 12 - break - } + break } + } - // then in the il transaction we remove it - if err := il.lmdbEnv.Update(func(txn *lmdb.Txn) error { - // calculate all index keys we have for this event and delete them - for k := range il.getIndexKeysForEvent(evt) { - if err := txn.Del(k.dbi, k.key, val[0:12]); err != nil && !lmdb.IsNotFound(err) { - return fmt.Errorf("index entry %v/%x deletion failed: %w", k.dbi, k.key, err) - } - } - return nil - }); err != nil { - return fmt.Errorf("failed to delete indexes for %s on %d: %w", evt.ID, il.id, err) + // calculate all index keys we have for this event and delete them + for k := range il.getIndexKeysForEvent(evt) { + if err := iltxn.Del(k.dbi, k.key, val[0:12]); err != nil && !lmdb.IsNotFound(err) { + return fmt.Errorf("index entry %v/%x deletion failed: %w", k.dbi, k.key, err) } + } - // if there are no more refs we delete the event from the id index and mmap - if zeroRefs { - b.purge(txn, idPrefix8, pos) - } + // if there are no more refs we delete the event from the id index and mmap + if zeroRefs { + b.purge(mmmtxn, idPrefix8, pos) + } - return nil - }) + return nil } diff --git a/mmm/indexinglayer.go b/mmm/indexinglayer.go index ac47b07..b6791fd 100644 --- a/mmm/indexinglayer.go +++ b/mmm/indexinglayer.go @@ -7,9 +7,12 @@ import ( "os" "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/fiatjaf/eventstore" "github.com/nbd-wtf/go-nostr" ) +var _ eventstore.Store = (*IndexingLayer)(nil) + type IndexingLayer struct { name string // this is set automatically internally diff --git a/mmm/mmmm_test.go b/mmm/mmmm_test.go new file mode 100644 index 0000000..fee5fee --- /dev/null +++ b/mmm/mmmm_test.go @@ -0,0 +1,104 @@ +package mmm + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/nbd-wtf/go-nostr" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestMultiLayerIndexing(t *testing.T) { + // Create a temporary directory for the test + tmpDir, err := os.MkdirTemp("", "mmm-test-*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}) + + // initialize MMM with two layers: + // 1. odd timestamps layer + // 2. even timestamps layer + mmm := &MultiMmapManager{ + Dir: tmpDir, + Logger: &logger, + LayerBuilder: func(name string, b *MultiMmapManager) *IndexingLayer { + return &IndexingLayer{ + dbpath: filepath.Join(tmpDir, name), + mmmm: b, + MaxLimit: 100, + ShouldIndex: func(ctx context.Context, evt *nostr.Event) bool { + switch name { + case "odd": + return evt.CreatedAt%2 == 1 + case "even": + return evt.CreatedAt%2 == 0 + } + return false + }, + } + }, + } + + err = mmm.Init() + require.NoError(t, err) + defer mmm.Close() + + // create odd timestamps layer + err = mmm.CreateLayer("odd") + require.NoError(t, err) + + // create even timestamps layer + err = mmm.CreateLayer("even") + require.NoError(t, err) + + // create test events + ctx := context.Background() + baseTime := nostr.Timestamp(0) + sk := "945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb" + events := make([]*nostr.Event, 10) + for i := 0; i < 10; i++ { + evt := &nostr.Event{ + CreatedAt: baseTime + nostr.Timestamp(i), + Kind: 1, + Tags: nostr.Tags{}, + Content: "test content", + } + evt.Sign(sk) + events[i] = evt + stored, err := mmm.StoreGlobal(ctx, evt) + require.NoError(t, err) + require.True(t, stored) + } + + // query odd layer + oddResults, err := mmm.layers[0].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + Since: &baseTime, + }) + require.NoError(t, err) + + oddCount := 0 + for evt := range oddResults { + require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(1)) + oddCount++ + } + require.Equal(t, 5, oddCount) + + // query even layer + evenResults, err := mmm.layers[1].QueryEvents(ctx, nostr.Filter{ + Kinds: []int{1}, + Since: &baseTime, + }) + require.NoError(t, err) + + evenCount := 0 + for evt := range evenResults { + require.Equal(t, evt.CreatedAt%2, nostr.Timestamp(0)) + evenCount++ + } + require.Equal(t, 5, evenCount) +} diff --git a/mmm/query.go b/mmm/query.go index 0b1a3d5..7873708 100644 --- a/mmm/query.go +++ b/mmm/query.go @@ -53,11 +53,6 @@ func (il *IndexingLayer) QueryEvents(ctx context.Context, filter nostr.Filter) ( return ch, nil } - queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := il.prepareQueries(filter) - if err != nil { - return nil, err - } - // max number of events we'll return limit := il.MaxLimit / 4 if filter.Limit > 0 && filter.Limit < il.MaxLimit { @@ -74,350 +69,361 @@ func (il *IndexingLayer) QueryEvents(ctx context.Context, filter nostr.Filter) ( txn.RawRead = true defer close(ch) - iterators := make([]*iterator, len(queries)) - exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore - results := make([][]internal.IterEvent, len(queries)) - pulledPerQuery := make([]int, len(queries)) - - // these are kept updated so we never pull from the iterator that is at further distance - // (i.e. the one that has the oldest event among all) - // we will continue to pull from it as soon as some other iterator takes the position - oldest := internal.IterEvent{Q: -1} - - secondPhase := false // after we have gathered enough events we will change the way we iterate - secondBatch := make([][]internal.IterEvent, 0, len(queries)+1) - secondPhaseParticipants := make([]int, 0, len(queries)+1) - - // while merging results in the second phase we will alternate between these two lists - // to avoid having to create new lists all the time - var secondPhaseResultsA []internal.IterEvent - var secondPhaseResultsB []internal.IterEvent - var secondPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating - var secondPhaseHasResultsPending bool - - remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing - batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) - firstPhaseTotalPulled := 0 - - exhaust := func(q int) { - exhausted[q] = true - remainingUnexhausted-- - if q == oldest.Q { - oldest = internal.IterEvent{Q: -1} - } + results, err := il.query(txn, filter, limit) + + for _, ie := range results { + ch <- ie.Event } - var firstPhaseResults []internal.IterEvent + return err + }) - for q := range queries { - cursor, err := txn.OpenCursor(queries[q].dbi) - if err != nil { - return err - } - iterators[q] = &iterator{cursor: cursor} - defer cursor.Close() - iterators[q].seek(queries[q].startingPoint) - results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2) + return ch, nil +} + +func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) { + queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := il.prepareQueries(filter) + if err != nil { + return nil, err + } + + iterators := make([]*iterator, len(queries)) + exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore + results := make([][]internal.IterEvent, len(queries)) + pulledPerQuery := make([]int, len(queries)) + + // these are kept updated so we never pull from the iterator that is at further distance + // (i.e. the one that has the oldest event among all) + // we will continue to pull from it as soon as some other iterator takes the position + oldest := internal.IterEvent{Q: -1} + + secondPhase := false // after we have gathered enough events we will change the way we iterate + secondBatch := make([][]internal.IterEvent, 0, len(queries)+1) + secondPhaseParticipants := make([]int, 0, len(queries)+1) + + // while merging results in the second phase we will alternate between these two lists + // to avoid having to create new lists all the time + var secondPhaseResultsA []internal.IterEvent + var secondPhaseResultsB []internal.IterEvent + var secondPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating + var secondPhaseHasResultsPending bool + + remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing + batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) + firstPhaseTotalPulled := 0 + + exhaust := func(q int) { + exhausted[q] = true + remainingUnexhausted-- + if q == oldest.Q { + oldest = internal.IterEvent{Q: -1} } + } + + var firstPhaseResults []internal.IterEvent - // fmt.Println("queries", len(queries)) + for q := range queries { + cursor, err := txn.OpenCursor(queries[q].dbi) + if err != nil { + return nil, err + } + iterators[q] = &iterator{cursor: cursor} + defer cursor.Close() + iterators[q].seek(queries[q].startingPoint) + results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2) + } - for c := 0; ; c++ { - batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) + // fmt.Println("queries", len(queries)) - // fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery) - // we will go through all the iterators in batches until we have pulled all the required results - for q, query := range queries { - if exhausted[q] { - continue - } - if oldest.Q == q && remainingUnexhausted > 1 { - continue + for c := 0; ; c++ { + batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) + + // fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery) + // we will go through all the iterators in batches until we have pulled all the required results + for q, query := range queries { + if exhausted[q] { + continue + } + if oldest.Q == q && remainingUnexhausted > 1 { + continue + } + // fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q])) + + it := iterators[q] + pulledThisIteration := 0 + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if it.err != nil || + len(it.key) != query.keySize || + !bytes.HasPrefix(it.key, query.prefix) { + // either iteration has errored or we reached the end of this prefix + // fmt.Println(" reached end", it.key, query.keySize, query.prefix) + exhaust(q) + break } - // fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q])) - - it := iterators[q] - pulledThisIteration := 0 - - for { - // we already have a k and a v and an err from the cursor setup, so check and use these - if it.err != nil || - len(it.key) != query.keySize || - !bytes.HasPrefix(it.key, query.prefix) { - // either iteration has errored or we reached the end of this prefix - // fmt.Println(" reached end", it.key, query.keySize, query.prefix) + + // "id" indexes don't contain a timestamp + if query.timestampSize == 4 { + createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) + if createdAt < since { + // fmt.Println(" reached since", createdAt, "<", since) exhaust(q) break } + } - // "id" indexes don't contain a timestamp - if query.timestampSize == 4 { - createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) - if createdAt < since { - // fmt.Println(" reached since", createdAt, "<", since) - exhaust(q) - break - } - } - - // fetch actual event - pos := positionFromBytes(it.posb) - bin := il.mmmm.mmapf[pos.start : pos.start+uint64(pos.size)] + // fetch actual event + pos := positionFromBytes(it.posb) + bin := il.mmmm.mmapf[pos.start : pos.start+uint64(pos.size)] - // check it against pubkeys without decoding the entire thing - if extraAuthors != nil && !slices.Contains(extraAuthors, [32]byte(bin[39:71])) { - it.next() - continue - } + // check it against pubkeys without decoding the entire thing + if extraAuthors != nil && !slices.Contains(extraAuthors, [32]byte(bin[39:71])) { + it.next() + continue + } - // check it against kinds without decoding the entire thing - if extraKinds != nil && !slices.Contains(extraKinds, [2]byte(bin[1:3])) { - it.next() - continue - } + // check it against kinds without decoding the entire thing + if extraKinds != nil && !slices.Contains(extraKinds, [2]byte(bin[1:3])) { + it.next() + continue + } - // decode the entire thing (TODO: do a conditional decode while also checking the extra tag) - event := &nostr.Event{} - if err := betterbinary.Unmarshal(bin, event); err != nil { - log.Printf("mmm: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n", bin[0:32], - query.prefix, query.startingPoint, query.dbi, err) - return fmt.Errorf("event read error: %w", err) - } + // decode the entire thing (TODO: do a conditional decode while also checking the extra tag) + event := &nostr.Event{} + if err := betterbinary.Unmarshal(bin, event); err != nil { + log.Printf("mmm: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n", bin[0:32], + query.prefix, query.startingPoint, query.dbi, err) + return nil, fmt.Errorf("event read error: %w", err) + } - // fmt.Println(" event", hex.EncodeToString(val[0:4]), "kind", binary.BigEndian.Uint16(val[132:134]), "author", hex.EncodeToString(val[32:36]), "ts", nostr.Timestamp(binary.BigEndian.Uint32(val[128:132])), hex.EncodeToString(it.key), it.valIdx) + // fmt.Println(" event", hex.EncodeToString(val[0:4]), "kind", binary.BigEndian.Uint16(val[132:134]), "author", hex.EncodeToString(val[32:36]), "ts", nostr.Timestamp(binary.BigEndian.Uint32(val[128:132])), hex.EncodeToString(it.key), it.valIdx) - // if there is still a tag to be checked, do it now - if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { - it.next() - continue - } + // if there is still a tag to be checked, do it now + if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { + it.next() + continue + } - // this event is good to be used - evt := internal.IterEvent{Event: event, Q: q} - // - // - if secondPhase { - // do the process described below at HIWAWVRTP. - // if we've reached here this means we've already passed the `since` check. - // now we have to eliminate the event currently at the `since` threshold. - nextThreshold := firstPhaseResults[len(firstPhaseResults)-2] - if oldest.Event == nil { - // fmt.Println(" b1", evt.ID[0:8]) - // BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE) - // when we don't have the oldest set, we will keep the results - // and not change the cutting point -- it's bad, but hopefully not that bad. - results[q] = append(results[q], evt) - secondPhaseHasResultsPending = true - } else if nextThreshold.CreatedAt > oldest.CreatedAt { - // fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8]) - // one of the events we have stored is the actual next threshold - // eliminate last, update since with oldest - firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] - since = uint32(oldest.CreatedAt) - // fmt.Println(" new since", since, evt.ID[0:8]) - // we null the oldest Event as we can't rely on it anymore - // (we'll fall under BWWDHTOE above) until we have a new oldest set. - oldest = internal.IterEvent{Q: -1} - // anything we got that would be above this won't trigger an update to - // the oldest anyway, because it will be discarded as being after the limit. - // - // finally - // add this to the results to be merged later - results[q] = append(results[q], evt) - secondPhaseHasResultsPending = true - } else if nextThreshold.CreatedAt < evt.CreatedAt { - // the next last event in the firstPhaseResults is the next threshold - // fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8]) - // eliminate last, update since with the antelast - firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] - since = uint32(nextThreshold.CreatedAt) - // fmt.Println(" new since", since) - // add this to the results to be merged later - results[q] = append(results[q], evt) - secondPhaseHasResultsPending = true - // update the oldest event - if evt.CreatedAt < oldest.CreatedAt { - oldest = evt - } - } else { - // fmt.Println(" b4", evt.ID[0:8]) - // oops, _we_ are the next `since` threshold - firstPhaseResults[len(firstPhaseResults)-1] = evt - since = uint32(evt.CreatedAt) - // fmt.Println(" new since", since) - // do not add us to the results to be merged later - // as we're already inhabiting the firstPhaseResults slice - } - } else { + // this event is good to be used + evt := internal.IterEvent{Event: event, Q: q} + // + // + if secondPhase { + // do the process described below at HIWAWVRTP. + // if we've reached here this means we've already passed the `since` check. + // now we have to eliminate the event currently at the `since` threshold. + nextThreshold := firstPhaseResults[len(firstPhaseResults)-2] + if oldest.Event == nil { + // fmt.Println(" b1", evt.ID[0:8]) + // BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE) + // when we don't have the oldest set, we will keep the results + // and not change the cutting point -- it's bad, but hopefully not that bad. results[q] = append(results[q], evt) - firstPhaseTotalPulled++ - + secondPhaseHasResultsPending = true + } else if nextThreshold.CreatedAt > oldest.CreatedAt { + // fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8]) + // one of the events we have stored is the actual next threshold + // eliminate last, update since with oldest + firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] + since = uint32(oldest.CreatedAt) + // fmt.Println(" new since", since, evt.ID[0:8]) + // we null the oldest Event as we can't rely on it anymore + // (we'll fall under BWWDHTOE above) until we have a new oldest set. + oldest = internal.IterEvent{Q: -1} + // anything we got that would be above this won't trigger an update to + // the oldest anyway, because it will be discarded as being after the limit. + // + // finally + // add this to the results to be merged later + results[q] = append(results[q], evt) + secondPhaseHasResultsPending = true + } else if nextThreshold.CreatedAt < evt.CreatedAt { + // the next last event in the firstPhaseResults is the next threshold + // fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8]) + // eliminate last, update since with the antelast + firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] + since = uint32(nextThreshold.CreatedAt) + // fmt.Println(" new since", since) + // add this to the results to be merged later + results[q] = append(results[q], evt) + secondPhaseHasResultsPending = true // update the oldest event - if oldest.Event == nil || evt.CreatedAt < oldest.CreatedAt { + if evt.CreatedAt < oldest.CreatedAt { oldest = evt } + } else { + // fmt.Println(" b4", evt.ID[0:8]) + // oops, _we_ are the next `since` threshold + firstPhaseResults[len(firstPhaseResults)-1] = evt + since = uint32(evt.CreatedAt) + // fmt.Println(" new since", since) + // do not add us to the results to be merged later + // as we're already inhabiting the firstPhaseResults slice } + } else { + results[q] = append(results[q], evt) + firstPhaseTotalPulled++ - pulledPerQuery[q]++ - pulledThisIteration++ - if pulledThisIteration > batchSizePerQuery { - // batch filled - it.next() - // fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx) - break - } - if pulledPerQuery[q] >= limit { - // batch filled + reached limit for this query (which is the global limit) - exhaust(q) - it.next() - break + // update the oldest event + if oldest.Event == nil || evt.CreatedAt < oldest.CreatedAt { + oldest = evt } + } + pulledPerQuery[q]++ + pulledThisIteration++ + if pulledThisIteration > batchSizePerQuery { + // batch filled it.next() + // fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx) + break + } + if pulledPerQuery[q] >= limit { + // batch filled + reached limit for this query (which is the global limit) + exhaust(q) + it.next() + break } - } - - // we will do this check if we don't accumulated the requested number of events yet - // fmt.Println("oldest", oldest.Event, "from iter", oldest.Q) - if secondPhase && secondPhaseHasResultsPending && (oldest.Event == nil || remainingUnexhausted == 0) { - // fmt.Println("second phase aggregation!") - // when we are in the second phase we will aggressively aggregate results on every iteration - // - secondBatch = secondBatch[:0] - for s := 0; s < len(secondPhaseParticipants); s++ { - q := secondPhaseParticipants[s] - if len(results[q]) > 0 { - secondBatch = append(secondBatch, results[q]) - } + it.next() + } + } - if exhausted[q] { - secondPhaseParticipants = internal.SwapDelete(secondPhaseParticipants, s) - s-- - } + // we will do this check if we don't accumulated the requested number of events yet + // fmt.Println("oldest", oldest.Event, "from iter", oldest.Q) + if secondPhase && secondPhaseHasResultsPending && (oldest.Event == nil || remainingUnexhausted == 0) { + // fmt.Println("second phase aggregation!") + // when we are in the second phase we will aggressively aggregate results on every iteration + // + secondBatch = secondBatch[:0] + for s := 0; s < len(secondPhaseParticipants); s++ { + q := secondPhaseParticipants[s] + + if len(results[q]) > 0 { + secondBatch = append(secondBatch, results[q]) } - // every time we get here we will alternate between these A and B lists - // combining everything we have into a new partial results list. - // after we've done that we can again set the oldest. - // fmt.Println(" xxx", secondPhaseResultsToggle) - if secondPhaseResultsToggle { - secondBatch = append(secondBatch, secondPhaseResultsB) - secondPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsA) - oldest = secondPhaseResultsA[len(secondPhaseResultsA)-1] - // fmt.Println(" new aggregated a", len(secondPhaseResultsB)) - } else { - secondBatch = append(secondBatch, secondPhaseResultsA) - secondPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsB) - oldest = secondPhaseResultsB[len(secondPhaseResultsB)-1] - // fmt.Println(" new aggregated b", len(secondPhaseResultsB)) + if exhausted[q] { + secondPhaseParticipants = internal.SwapDelete(secondPhaseParticipants, s) + s-- } - secondPhaseResultsToggle = !secondPhaseResultsToggle - - since = uint32(oldest.CreatedAt) - // fmt.Println(" new since", since) + } - // reset the `results` list so we can keep using it - results = results[:len(queries)] - for _, q := range secondPhaseParticipants { - results[q] = results[q][:0] - } - } else if !secondPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 { - // fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted) - - // we will exclude this oldest number as it is not relevant anymore - // (we now want to keep track only of the oldest among the remaining iterators) - oldest = internal.IterEvent{Q: -1} - - // HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP) - // now we can combine the results we have and check what is our current oldest event. - // we also discard anything that is after the current cutting point (`limit`). - // so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total - // we can just keep [1,2,3,7,10,15] and discard [20, 21, 49], - // and also adjust our `since` parameter to `15`, discarding anything we get after it - // and immediately declaring that iterator exhausted. - // also every time we get result that is more recent than this updated `since` we can - // keep it but also discard the previous since, moving the needle one back -- for example, - // if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15` - // in the process. - all := make([][]internal.IterEvent, len(results)) - copy(all, results) // we have to use this otherwise internal.MergeSortMultiple will scramble our results slice - firstPhaseResults = internal.MergeSortMultiple(all, limit, nil) - oldest = firstPhaseResults[limit-1] - since = uint32(oldest.CreatedAt) - // fmt.Println("new since", since) - - for q := range queries { - if exhausted[q] { - continue - } + // every time we get here we will alternate between these A and B lists + // combining everything we have into a new partial results list. + // after we've done that we can again set the oldest. + // fmt.Println(" xxx", secondPhaseResultsToggle) + if secondPhaseResultsToggle { + secondBatch = append(secondBatch, secondPhaseResultsB) + secondPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsA) + oldest = secondPhaseResultsA[len(secondPhaseResultsA)-1] + // fmt.Println(" new aggregated a", len(secondPhaseResultsB)) + } else { + secondBatch = append(secondBatch, secondPhaseResultsA) + secondPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsB) + oldest = secondPhaseResultsB[len(secondPhaseResultsB)-1] + // fmt.Println(" new aggregated b", len(secondPhaseResultsB)) + } + secondPhaseResultsToggle = !secondPhaseResultsToggle - // we also automatically exhaust any of the iterators that have already passed the - // cutting point (`since`) - if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt { - exhausted[q] = true - remainingUnexhausted-- - continue - } + since = uint32(oldest.CreatedAt) + // fmt.Println(" new since", since) - // for all the remaining iterators, - // since we have merged all the events in this `firstPhaseResults` slice, we can empty the - // current `results` slices and reuse them. - results[q] = results[q][:0] + // reset the `results` list so we can keep using it + results = results[:len(queries)] + for _, q := range secondPhaseParticipants { + results[q] = results[q][:0] + } + } else if !secondPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 { + // fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted) + + // we will exclude this oldest number as it is not relevant anymore + // (we now want to keep track only of the oldest among the remaining iterators) + oldest = internal.IterEvent{Q: -1} + + // HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP) + // now we can combine the results we have and check what is our current oldest event. + // we also discard anything that is after the current cutting point (`limit`). + // so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total + // we can just keep [1,2,3,7,10,15] and discard [20, 21, 49], + // and also adjust our `since` parameter to `15`, discarding anything we get after it + // and immediately declaring that iterator exhausted. + // also every time we get result that is more recent than this updated `since` we can + // keep it but also discard the previous since, moving the needle one back -- for example, + // if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15` + // in the process. + all := make([][]internal.IterEvent, len(results)) + copy(all, results) // we have to use this otherwise internal.MergeSortMultiple will scramble our results slice + firstPhaseResults = internal.MergeSortMultiple(all, limit, nil) + oldest = firstPhaseResults[limit-1] + since = uint32(oldest.CreatedAt) + // fmt.Println("new since", since) + + for q := range queries { + if exhausted[q] { + continue + } - // build this index of indexes with everybody who remains - secondPhaseParticipants = append(secondPhaseParticipants, q) + // we also automatically exhaust any of the iterators that have already passed the + // cutting point (`since`) + if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt { + exhausted[q] = true + remainingUnexhausted-- + continue } - // we create these two lists and alternate between them so we don't have to create a - // a new one every time - secondPhaseResultsA = make([]internal.IterEvent, 0, limit*2) - secondPhaseResultsB = make([]internal.IterEvent, 0, limit*2) + // for all the remaining iterators, + // since we have merged all the events in this `firstPhaseResults` slice, we can empty the + // current `results` slices and reuse them. + results[q] = results[q][:0] - // from now on we won't run this block anymore - secondPhase = true + // build this index of indexes with everybody who remains + secondPhaseParticipants = append(secondPhaseParticipants, q) } - // fmt.Println("remaining", remainingUnexhausted) - if remainingUnexhausted == 0 { - break - } + // we create these two lists and alternate between them so we don't have to create a + // a new one every time + secondPhaseResultsA = make([]internal.IterEvent, 0, limit*2) + secondPhaseResultsB = make([]internal.IterEvent, 0, limit*2) + + // from now on we won't run this block anymore + secondPhase = true } - // fmt.Println("is secondPhase?", secondPhase) + // fmt.Println("remaining", remainingUnexhausted) + if remainingUnexhausted == 0 { + break + } + } - var combinedResults []internal.IterEvent + // fmt.Println("is secondPhase?", secondPhase) - if secondPhase { - // fmt.Println("ending second phase") - // when we reach this point either secondPhaseResultsA or secondPhaseResultsB will be full of stuff, - // the other will be empty - var secondPhaseResults []internal.IterEvent - // fmt.Println("xxx", secondPhaseResultsToggle, len(secondPhaseResultsA), len(secondPhaseResultsB)) - if secondPhaseResultsToggle { - secondPhaseResults = secondPhaseResultsB - combinedResults = secondPhaseResultsA[0:limit] // reuse this - // fmt.Println(" using b", len(secondPhaseResultsA)) - } else { - secondPhaseResults = secondPhaseResultsA - combinedResults = secondPhaseResultsB[0:limit] // reuse this - // fmt.Println(" using a", len(secondPhaseResultsA)) - } + var combinedResults []internal.IterEvent - all := [][]internal.IterEvent{firstPhaseResults, secondPhaseResults} - combinedResults = internal.MergeSortMultiple(all, limit, combinedResults) - // fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit) + if secondPhase { + // fmt.Println("ending second phase") + // when we reach this point either secondPhaseResultsA or secondPhaseResultsB will be full of stuff, + // the other will be empty + var secondPhaseResults []internal.IterEvent + // fmt.Println("xxx", secondPhaseResultsToggle, len(secondPhaseResultsA), len(secondPhaseResultsB)) + if secondPhaseResultsToggle { + secondPhaseResults = secondPhaseResultsB + combinedResults = secondPhaseResultsA[0:limit] // reuse this + // fmt.Println(" using b", len(secondPhaseResultsA)) } else { - combinedResults = make([]internal.IterEvent, limit) - combinedResults = internal.MergeSortMultiple(results, limit, combinedResults) + secondPhaseResults = secondPhaseResultsA + combinedResults = secondPhaseResultsB[0:limit] // reuse this + // fmt.Println(" using a", len(secondPhaseResultsA)) } - for _, evt := range combinedResults { - ch <- evt.Event - } - - return nil - }) + all := [][]internal.IterEvent{firstPhaseResults, secondPhaseResults} + combinedResults = internal.MergeSortMultiple(all, limit, combinedResults) + // fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit) + } else { + combinedResults = make([]internal.IterEvent, limit) + combinedResults = internal.MergeSortMultiple(results, limit, combinedResults) + } - return ch, nil + return combinedResults, nil } diff --git a/mmm/replace.go b/mmm/replace.go new file mode 100644 index 0000000..c8a4935 --- /dev/null +++ b/mmm/replace.go @@ -0,0 +1,54 @@ +package mmm + +import ( + "context" + "fmt" + "math" + + "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/fiatjaf/eventstore/internal" + "github.com/nbd-wtf/go-nostr" +) + +func (il *IndexingLayer) ReplaceEvent(ctx context.Context, evt *nostr.Event) error { + // sanity checking + if evt.CreatedAt > math.MaxUint32 || evt.Kind > math.MaxUint16 { + return fmt.Errorf("event with values out of expected boundaries") + } + + filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}} + if nostr.IsAddressableKind(evt.Kind) { + // when addressable, add the "d" tag to the filter + filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}} + } + + return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error { + mmmtxn.RawRead = true + + return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { + // now we fetch the past events, whatever they are, delete them and then save the new + results, err := il.query(iltxn, filter, 10) // in theory limit could be just 1 and this should work + if err != nil { + return fmt.Errorf("failed to query past events with %s: %w", filter, err) + } + + shouldStore := true + for _, previous := range results { + if internal.IsOlder(previous.Event, evt) { + if err := il.delete(mmmtxn, iltxn, previous.Event); err != nil { + return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err) + } + } else { + // there is a newer event already stored, so we won't store this + shouldStore = false + } + } + if shouldStore { + _, err := il.mmmm.storeOn(mmmtxn, []*IndexingLayer{il}, []*lmdb.Txn{iltxn}, evt) + return err + } + + return nil + }) + }) +} diff --git a/mmm/save.go b/mmm/save.go index a926438..459f3e3 100644 --- a/mmm/save.go +++ b/mmm/save.go @@ -16,11 +16,8 @@ import ( "github.com/nbd-wtf/go-nostr" ) -func (b *MultiMmapManager) Store(ctx context.Context, evt *nostr.Event) (stored bool, err error) { - // sanity checking - if evt.CreatedAt > maxuint32 || evt.Kind > maxuint16 { - return false, fmt.Errorf("event with values out of expected boundaries") - } +func (b *MultiMmapManager) StoreGlobal(ctx context.Context, evt *nostr.Event) (stored bool, err error) { + someoneWantsIt := false b.mutex.Lock() defer b.mutex.Unlock() @@ -28,30 +25,97 @@ func (b *MultiMmapManager) Store(ctx context.Context, evt *nostr.Event) (stored defer runtime.UnlockOSThread() // do this just so it's cleaner, we're already locking the thread and the mutex anyway - txn, err := b.lmdbEnv.BeginTxn(nil, 0) + mmmtxn, err := b.lmdbEnv.BeginTxn(nil, 0) if err != nil { return false, fmt.Errorf("failed to begin global transaction: %w", err) } - txn.RawRead = true + mmmtxn.RawRead = true - // this ensures we'll commit all transactions or rollback them - txnsToClose := make([]*lmdb.Txn, 1, 1+len(b.layers)) - txnsToClose[0] = txn - defer func() { - if err != nil { - for _, txn := range txnsToClose { - txn.Abort() - } - } else { - for _, txn := range txnsToClose { - txn.Commit() + iltxns := make([]*lmdb.Txn, 0, len(b.layers)) + ils := make([]*IndexingLayer, 0, len(b.layers)) + + // ask if any of the indexing layers want this + for _, il := range b.layers { + if il.ShouldIndex(ctx, evt) { + someoneWantsIt = true + + iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + mmmtxn.Abort() + for _, txn := range iltxns { + txn.Abort() + } + return false, fmt.Errorf("failed to start txn on %s: %w", il.name, err) } + + ils = append(ils, il) + iltxns = append(iltxns, iltxn) + } + } + + if !someoneWantsIt { + // no one wants it + mmmtxn.Abort() + return false, fmt.Errorf("not wanted") + } + + stored, err = b.storeOn(mmmtxn, ils, iltxns, evt) + if stored { + mmmtxn.Commit() + for _, txn := range iltxns { + txn.Commit() + } + } else { + mmmtxn.Abort() + for _, txn := range iltxns { + txn.Abort() } - }() + } + + return stored, err +} + +func (il *IndexingLayer) SaveEvent(ctx context.Context, evt *nostr.Event) error { + il.mmmm.mutex.Lock() + defer il.mmmm.mutex.Unlock() + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // do this just so it's cleaner, we're already locking the thread and the mutex anyway + mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return fmt.Errorf("failed to begin global transaction: %w", err) + } + mmmtxn.RawRead = true + + iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + mmmtxn.Abort() + return fmt.Errorf("failed to start txn on %s: %w", il.name, err) + } + + if _, err := il.mmmm.storeOn(mmmtxn, []*IndexingLayer{il}, []*lmdb.Txn{iltxn}, evt); err != nil { + mmmtxn.Abort() + if iltxn != nil { + iltxn.Abort() + } + return err + } + + mmmtxn.Commit() + iltxn.Commit() + return nil +} + +func (b *MultiMmapManager) storeOn(mmmtxn *lmdb.Txn, ils []*IndexingLayer, iltxns []*lmdb.Txn, evt *nostr.Event) (stored bool, err error) { + // sanity checking + if evt.CreatedAt > maxuint32 || evt.Kind > maxuint16 { + return false, fmt.Errorf("event with values out of expected boundaries") + } // check if we already have this id idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) - val, err := txn.Get(b.indexId, idPrefix8) + val, err := mmmtxn.Get(b.indexId, idPrefix8) if err == nil { // we found the event, which means it is already indexed by every layer who wanted to index it return false, nil @@ -69,31 +133,17 @@ func (b *MultiMmapManager) Store(ctx context.Context, evt *nostr.Event) (stored // these are the bytes we must fill in with the position once we have it reservedResults := make([][]byte, 0, len(b.layers)) - // ask if any of the indexing layers want this - someoneWantsIt := false - for _, il := range b.layers { - if il.ShouldIndex(ctx, evt) { - // start a txn here and close it at the end only as we will have to use the valReserve arrays later - iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) + for i, il := range ils { + iltxn := iltxns[i] + // start a txn here and close it at the end only as we will have to use the valReserve arrays later + for k := range il.getIndexKeysForEvent(evt) { + valReserve, err := iltxn.PutReserve(k.dbi, k.key, 12, 0) if err != nil { - return false, fmt.Errorf("failed to start txn on %s: %w", il.name, err) + b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer") } - defer iltxn.Commit() - - for k := range il.getIndexKeysForEvent(evt) { - valReserve, err := iltxn.PutReserve(k.dbi, k.key, 12, 0) - if err != nil { - b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer") - } - reservedResults = append(reservedResults, valReserve) - } - val = binary.BigEndian.AppendUint16(val, il.id) - someoneWantsIt = true + reservedResults = append(reservedResults, valReserve) } - } - if !someoneWantsIt { - // no one wants it - return false, fmt.Errorf("not wanted") + val = binary.BigEndian.AppendUint16(val, il.id) } // find a suitable place for this to be stored in @@ -118,7 +168,7 @@ func (b *MultiMmapManager) Store(ctx context.Context, evt *nostr.Event) (stored }) } - if err := b.saveFreeRanges(txn); err != nil { + if err := b.saveFreeRanges(mmmtxn); err != nil { return false, fmt.Errorf("failed to save modified free ranges: %w", err) } @@ -153,7 +203,7 @@ func (b *MultiMmapManager) Store(ctx context.Context, evt *nostr.Event) (stored } // store the id index with the refcounts - if err := txn.Put(b.indexId, idPrefix8, val, 0); err != nil { + if err := mmmtxn.Put(b.indexId, idPrefix8, val, 0); err != nil { panic(fmt.Errorf("failed to store %x by id: %w", idPrefix8, err)) }