diff --git a/go.mod b/go.mod index 2041182..2165a45 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect + github.com/erigontech/mdbx-go v0.38.5 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -63,6 +64,7 @@ require ( github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect diff --git a/go.sum b/go.sum index 92e9cf1..d4f3298 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/erigontech/mdbx-go v0.38.5 h1:WJ7xNtwwHjxqyuq/iG+gQFssC+bz/9V8+Ja7kN8n6Yw= +github.com/erigontech/mdbx-go v0.38.5/go.mod h1:lkqHAZqXtFaIPlvTaGAx3VUDuGYZcuhve1l4JVVN1Z0= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fergusstrange/embedded-postgres v1.28.0 h1:Atixd24HCuBHBavnG4eiZAjRizOViwUahKGSjJdz1SU= @@ -151,6 +153,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d h1:Azx2B59D4+zpVVtuYb8Oe3uOLi/ift4xfwKdhBX0Cy0= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb v1.7.6/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/jgroeneveld/schema v1.0.0 h1:J0E10CrOkiSEsw6dfb1IfrDJD14pf6QLVJ3tRPl/syI= diff --git a/mdbx/count.go b/mdbx/count.go new file mode 100644 index 0000000..e650fcb --- /dev/null +++ b/mdbx/count.go @@ -0,0 +1,270 @@ +package mdbx + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/hex" + + "github.com/erigontech/mdbx-go/mdbx" + bin "github.com/fiatjaf/eventstore/internal/binary" + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip45" + "github.com/nbd-wtf/go-nostr/nip45/hyperloglog" + "golang.org/x/exp/slices" +) + +func (b *MDBXBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { + var count int64 = 0 + + queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter) + if err != nil { + return 0, err + } + + err = b.mdbxEnv.View(func(txn *mdbx.Txn) error { + // actually iterate + for _, q := range queries { + cursor, err := txn.OpenCursor(q.dbi) + if err != nil { + continue + } + + var k []byte + var idx []byte + var iterr error + + if _, _, errsr := cursor.Get(q.startingPoint, nil, mdbx.SetRange); errsr != nil { + if operr, ok := errsr.(*mdbx.OpError); !ok || operr.Errno != mdbx.NotFound { + // in this case it's really an error + panic(operr) + } else { + // we're at the end and we just want notes before this, + // so we just need to set the cursor the last key, this is not a real error + k, idx, iterr = cursor.Get(nil, nil, mdbx.Last) + } + } else { + // move one back as the first step + k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev) + } + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if iterr != nil || + len(k) != q.keySize || + !bytes.HasPrefix(k, q.prefix) { + // either iteration has errored or we reached the end of this prefix + break // stop this cursor and move to the next one + } + + // "id" indexes don't contain a timestamp + if q.timestampSize == 4 { + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) + if createdAt < since { + break + } + } + + if extraAuthors == nil && extraKinds == nil && extraTagValues == nil { + count++ + } else { + // fetch actual event + val, err := txn.Get(b.rawEventStore, idx) + if err != nil { + panic(err) + } + + // check it against pubkeys without decoding the entire thing + if !slices.Contains(extraAuthors, [32]byte(val[32:64])) { + goto loopend + } + + // check it against kinds without decoding the entire thing + if !slices.Contains(extraKinds, [2]byte(val[132:134])) { + goto loopend + } + + evt := &nostr.Event{} + if err := bin.Unmarshal(val, evt); err != nil { + goto loopend + } + + // if there is still a tag to be checked, do it now + if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) { + goto loopend + } + + count++ + } + + // move one back (we'll look into k and v and err in the next iteration) + loopend: + k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev) + } + } + + return nil + }) + + return count, err +} + +// CountEventsHLL is like CountEvents, but it will build a hyperloglog value while iterating through results, following NIP-45 +func (b *MDBXBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error) { + if useCache, _ := b.EnableHLLCacheFor(filter.Kinds[0]); useCache { + return b.countEventsHLLCached(filter) + } + + var count int64 = 0 + + // this is different than CountEvents because some of these extra checks are not applicable in HLL-valid filters + queries, _, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter) + if err != nil { + return 0, nil, err + } + + hll := hyperloglog.New(offset) + + err = b.mdbxEnv.View(func(txn *mdbx.Txn) error { + // actually iterate + for _, q := range queries { + cursor, err := txn.OpenCursor(q.dbi) + if err != nil { + continue + } + + var k []byte + var idx []byte + var iterr error + + if _, _, errsr := cursor.Get(q.startingPoint, nil, mdbx.SetRange); errsr != nil { + if operr, ok := errsr.(*mdbx.OpError); !ok || operr.Errno != mdbx.NotFound { + // in this case it's really an error + panic(operr) + } else { + // we're at the end and we just want notes before this, + // so we just need to set the cursor the last key, this is not a real error + k, idx, iterr = cursor.Get(nil, nil, mdbx.Last) + } + } else { + // move one back as the first step + k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev) + } + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if iterr != nil || + len(k) != q.keySize || + !bytes.HasPrefix(k, q.prefix) { + // either iteration has errored or we reached the end of this prefix + break // stop this cursor and move to the next one + } + + // "id" indexes don't contain a timestamp + if q.timestampSize == 4 { + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) + if createdAt < since { + break + } + } + + // fetch actual event (we need it regardless because we need the pubkey for the hll) + val, err := txn.Get(b.rawEventStore, idx) + if err != nil { + panic(err) + } + + if extraKinds == nil && extraTagValues == nil { + // nothing extra to check + count++ + hll.AddBytes(val[32:64]) + } else { + // check it against kinds without decoding the entire thing + if !slices.Contains(extraKinds, [2]byte(val[132:134])) { + goto loopend + } + + evt := &nostr.Event{} + if err := bin.Unmarshal(val, evt); err != nil { + goto loopend + } + + // if there is still a tag to be checked, do it now + if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) { + goto loopend + } + + count++ + hll.Add(evt.PubKey) + } + + // move one back (we'll look into k and v and err in the next iteration) + loopend: + k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev) + } + } + + return nil + }) + + return count, hll, err +} + +// countEventsHLLCached will just return a cached value from disk (and presumably we don't even have the events required to compute this anymore). +func (b *MDBXBackend) countEventsHLLCached(filter nostr.Filter) (int64, *hyperloglog.HyperLogLog, error) { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(filter.Kinds[0])) + switch filter.Kinds[0] { + case 3: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["p"][0][0:8*2])) + case 7: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["e"][0][0:8*2])) + } + + var count int64 + var hll *hyperloglog.HyperLogLog + + err := b.mdbxEnv.View(func(txn *mdbx.Txn) error { + val, err := txn.Get(b.hllCache, cacheKey) + if err != nil { + if mdbx.IsNotFound(err) { + return nil + } + return err + } + hll = hyperloglog.NewWithRegisters(val, 0) // offset doesn't matter here + count = int64(hll.Count()) + return nil + }) + + return count, hll, err +} + +func (b *MDBXBackend) updateHyperLogLogCachedValues(txn *mdbx.Txn, evt *nostr.Event) error { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(evt.Kind)) + + for ref, offset := range nip45.HyperLogLogEventPubkeyOffsetsAndReferencesForEvent(evt) { + // setup cache key (reusing buffer) + hex.Decode(cacheKey[2:2+8], []byte(ref[0:8*2])) + + // fetch hll value from cache db + hll := hyperloglog.New(offset) + val, err := txn.Get(b.hllCache, cacheKey) + if err == nil { + hll.SetRegisters(val) + } else if !mdbx.IsNotFound(err) { + return err + } + + // add this event + hll.Add(evt.PubKey) + + // save values back again + if err := txn.Put(b.hllCache, cacheKey, hll.GetRegisters(), 0); err != nil { + return err + } + } + + return nil +} diff --git a/mdbx/delete.go b/mdbx/delete.go new file mode 100644 index 0000000..5914109 --- /dev/null +++ b/mdbx/delete.go @@ -0,0 +1,43 @@ +package mdbx + +import ( + "context" + "encoding/hex" + "fmt" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/nbd-wtf/go-nostr" +) + +func (b *MDBXBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error { + return b.mdbxEnv.Update(func(txn *mdbx.Txn) error { + return b.delete(txn, evt) + }) +} + +func (b *MDBXBackend) delete(txn *mdbx.Txn, evt *nostr.Event) error { + idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) + idx, err := txn.Get(b.indexId, idPrefix8) + if mdbx.IsNotFound(err) { + // we already do not have this + return nil + } + if err != nil { + return fmt.Errorf("failed to get current idx for deleting %x: %w", evt.ID[0:8*2], err) + } + + // calculate all index keys we have for this event and delete them + for k := range b.getIndexKeysForEvent(evt) { + err := txn.Del(k.dbi, k.key, idx) + if err != nil { + return fmt.Errorf("failed to delete index entry %s for %x: %w", b.keyName(k), evt.ID[0:8*2], err) + } + } + + // delete the raw event + if err := txn.Del(b.rawEventStore, idx, nil); err != nil { + return fmt.Errorf("failed to delete raw event %x (idx %x): %w", evt.ID[0:8*2], idx, err) + } + + return nil +} diff --git a/mdbx/fuzz_test.go b/mdbx/fuzz_test.go new file mode 100644 index 0000000..46c418f --- /dev/null +++ b/mdbx/fuzz_test.go @@ -0,0 +1,137 @@ +package mdbx + +import ( + "cmp" + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "os" + "testing" + "time" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/fiatjaf/eventstore" + "github.com/nbd-wtf/go-nostr" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +func FuzzQuery(f *testing.F) { + ctx := context.Background() + + f.Add(uint(200), uint(50), uint(13), uint(2), uint(2), uint(0), uint(1)) + f.Fuzz(func(t *testing.T, total, limit, authors, timestampAuthorFactor, seedFactor, kinds, kindFactor uint) { + total++ + authors++ + seedFactor++ + kindFactor++ + if kinds == 1 { + kinds++ + } + if limit == 0 { + return + } + + // ~ setup db + if err := os.RemoveAll("/tmp/mdbxtest"); err != nil { + t.Fatal(err) + return + } + db := &MDBXBackend{} + db.Path = "/tmp/mdbxtest" + db.extraFlags = mdbx.TxNoSync + db.MaxLimit = 500 + if err := db.Init(); err != nil { + t.Fatal(err) + return + } + defer db.Close() + + // ~ start actual test + + filter := nostr.Filter{ + Authors: make([]string, authors), + Limit: int(limit), + } + maxKind := 1 + if kinds > 0 { + filter.Kinds = make([]int, kinds) + for i := range filter.Kinds { + filter.Kinds[i] = int(kindFactor) * i + } + maxKind = filter.Kinds[len(filter.Kinds)-1] + } + + for i := 0; i < int(authors); i++ { + sk := make([]byte, 32) + binary.BigEndian.PutUint32(sk, uint32(i%int(authors*seedFactor))+1) + pk, _ := nostr.GetPublicKey(hex.EncodeToString(sk)) + filter.Authors[i] = pk + } + + expected := make([]*nostr.Event, 0, total) + for i := 0; i < int(total); i++ { + skseed := uint32(i%int(authors*seedFactor)) + 1 + sk := make([]byte, 32) + binary.BigEndian.PutUint32(sk, skseed) + + evt := &nostr.Event{ + CreatedAt: nostr.Timestamp(skseed)*nostr.Timestamp(timestampAuthorFactor) + nostr.Timestamp(i), + Content: fmt.Sprintf("unbalanced %d", i), + Tags: nostr.Tags{}, + Kind: i % maxKind, + } + err := evt.Sign(hex.EncodeToString(sk)) + require.NoError(t, err) + + err = db.SaveEvent(ctx, evt) + require.NoError(t, err) + + if filter.Matches(evt) { + expected = append(expected, evt) + } + } + + slices.SortFunc(expected, nostr.CompareEventPtrReverse) + if len(expected) > int(limit) { + expected = expected[0:limit] + } + + w := eventstore.RelayWrapper{Store: db} + + start := time.Now() + + res, err := w.QuerySync(ctx, filter) + end := time.Now() + + require.NoError(t, err) + require.Equal(t, len(expected), len(res), "number of results is different than expected") + + require.Less(t, end.Sub(start).Milliseconds(), int64(1500), "query took too long") + nresults := len(expected) + + getTimestamps := func(events []*nostr.Event) []nostr.Timestamp { + res := make([]nostr.Timestamp, len(events)) + for i, evt := range events { + res[i] = evt.CreatedAt + } + return res + } + + fmt.Println(" expected result") + for i := range expected { + fmt.Println(" ", expected[i].CreatedAt, expected[i].ID[0:8], " ", res[i].CreatedAt, res[i].ID[0:8], " ", i) + } + + require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong") + require.Equal(t, expected[nresults-1].CreatedAt, res[nresults-1].CreatedAt, "last result (%d) is wrong", nresults-1) + require.Equal(t, getTimestamps(expected), getTimestamps(res)) + + for _, evt := range res { + require.True(t, filter.Matches(evt), "event %s doesn't match filter %s", evt, filter) + } + + require.True(t, slices.IsSortedFunc(res, func(a, b *nostr.Event) int { return cmp.Compare(b.CreatedAt, a.CreatedAt) }), "results are not sorted") + }) +} diff --git a/mdbx/helpers.go b/mdbx/helpers.go new file mode 100644 index 0000000..1c3bdbd --- /dev/null +++ b/mdbx/helpers.go @@ -0,0 +1,213 @@ +package mdbx + +import ( + "crypto/md5" + "encoding/binary" + "encoding/hex" + "fmt" + "iter" + "strconv" + "strings" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/nbd-wtf/go-nostr" + "golang.org/x/exp/slices" +) + +// this iterator always goes backwards +type iterator struct { + cursor *mdbx.Cursor + key []byte + valIdx []byte + err error +} + +func (it *iterator) seek(key []byte) { + if _, _, errsr := it.cursor.Get(key, nil, mdbx.SetRange); errsr != nil { + if operr, ok := errsr.(*mdbx.OpError); !ok || operr.Errno != mdbx.NotFound { + // in this case it's really an error + panic(operr) + } else { + // we're at the end and we just want notes before this, + // so we just need to set the cursor the last key, this is not a real error + it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, mdbx.Last) + } + } else { + // move one back as the first step + it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, mdbx.Prev) + } +} + +func (it *iterator) next() { + // move one back (we'll look into k and v and err in the next iteration) + it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, mdbx.Prev) +} + +type key struct { + dbi mdbx.DBI + key []byte +} + +func (b *MDBXBackend) keyName(key key) string { + return fmt.Sprintf("", b.dbiName(key.dbi), key.key) +} + +func (b *MDBXBackend) getIndexKeysForEvent(evt *nostr.Event) iter.Seq[key] { + return func(yield func(key) bool) { + { + // ~ by id + k := make([]byte, 8) + hex.Decode(k[0:8], []byte(evt.ID[0:8*2])) + if !yield(key{dbi: b.indexId, key: k[0:8]}) { + return + } + } + + { + // ~ by pubkey+date + k := make([]byte, 8+4) + hex.Decode(k[0:8], []byte(evt.PubKey[0:8*2])) + binary.BigEndian.PutUint32(k[8:8+4], uint32(evt.CreatedAt)) + if !yield(key{dbi: b.indexPubkey, key: k[0 : 8+4]}) { + return + } + } + + { + // ~ by kind+date + k := make([]byte, 2+4) + binary.BigEndian.PutUint16(k[0:2], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[2:2+4], uint32(evt.CreatedAt)) + if !yield(key{dbi: b.indexKind, key: k[0 : 2+4]}) { + return + } + } + + { + // ~ by pubkey+kind+date + k := make([]byte, 8+2+4) + hex.Decode(k[0:8], []byte(evt.PubKey[0:8*2])) + binary.BigEndian.PutUint16(k[8:8+2], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[8+2:8+2+4], uint32(evt.CreatedAt)) + if !yield(key{dbi: b.indexPubkeyKind, key: k[0 : 8+2+4]}) { + return + } + } + + // ~ by tagvalue+date + // ~ by p-tag+kind+date + for i, tag := range evt.Tags { + if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { + // not indexable + continue + } + firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { return len(t) >= 2 && t[1] == tag[1] }) + if firstIndex != i { + // duplicate + continue + } + + // get key prefix (with full length) and offset where to write the created_at + dbi, k, offset := b.getTagIndexPrefix(tag[1]) + binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt)) + if !yield(key{dbi: dbi, key: k}) { + return + } + + // now the p-tag+kind+date + if dbi == b.indexTag32 && tag[0] == "p" { + k := make([]byte, 8+2+4) + hex.Decode(k[0:8], []byte(tag[1][0:8*2])) + binary.BigEndian.PutUint16(k[8:8+2], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[8+2:8+2+4], uint32(evt.CreatedAt)) + dbi := b.indexPTagKind + if !yield(key{dbi: dbi, key: k[0 : 8+2+4]}) { + return + } + } + } + + { + // ~ by date only + k := make([]byte, 4) + binary.BigEndian.PutUint32(k[0:4], uint32(evt.CreatedAt)) + if !yield(key{dbi: b.indexCreatedAt, key: k[0:4]}) { + return + } + } + } +} + +func (b *MDBXBackend) getTagIndexPrefix(tagValue string) (mdbx.DBI, []byte, int) { + var k []byte // the key with full length for created_at and idx at the end, but not filled with these + var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start + var dbi mdbx.DBI + + // if it's 32 bytes as hex, save it as bytes + if len(tagValue) == 64 { + // but we actually only use the first 8 bytes + k = make([]byte, 8+4) + if _, err := hex.Decode(k[0:8], []byte(tagValue[0:8*2])); err == nil { + offset = 8 + dbi = b.indexTag32 + return dbi, k[0 : 8+4], offset + } + } + + // if it looks like an "a" tag, index it in this special format + spl := strings.Split(tagValue, ":") + if len(spl) == 3 && len(spl[1]) == 64 { + k = make([]byte, 2+8+30) + if _, err := hex.Decode(k[2:2+8], []byte(tagValue[0:8*2])); err == nil { + if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil { + k[0] = byte(kind >> 8) + k[1] = byte(kind) + // limit "d" identifier to 30 bytes (so we don't have to grow our byte slice) + n := copy(k[2+8:2+8+30], spl[2]) + offset = 2 + 8 + n + return dbi, k[0 : offset+4], offset + } + } + } + + // index whatever else as a md5 hash of the contents + h := md5.New() + h.Write([]byte(tagValue)) + k = make([]byte, 0, 16+4) + k = h.Sum(k) + offset = 16 + dbi = b.indexTag + + return dbi, k[0 : 16+4], offset +} + +func (b *MDBXBackend) dbiName(dbi mdbx.DBI) string { + switch dbi { + case b.hllCache: + return "hllCache" + case b.settingsStore: + return "settingsStore" + case b.rawEventStore: + return "rawEventStore" + case b.indexCreatedAt: + return "indexCreatedAt" + case b.indexId: + return "indexId" + case b.indexKind: + return "indexKind" + case b.indexPubkey: + return "indexPubkey" + case b.indexPubkeyKind: + return "indexPubkeyKind" + case b.indexTag: + return "indexTag" + case b.indexTag32: + return "indexTag32" + case b.indexTagAddr: + return "indexTagAddr" + case b.indexPTagKind: + return "indexPTagKind" + default: + return "" + } +} diff --git a/mdbx/lib.go b/mdbx/lib.go new file mode 100644 index 0000000..20a319f --- /dev/null +++ b/mdbx/lib.go @@ -0,0 +1,179 @@ +package mdbx + +import ( + "encoding/binary" + "os" + "sync/atomic" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/fiatjaf/eventstore" +) + +var _ eventstore.Store = (*MDBXBackend)(nil) + +type MDBXBackend struct { + Path string + MaxLimit int + MaxLimitNegentropy int + MapSize int64 + + mdbxEnv *mdbx.Env + extraFlags uint // (for debugging and testing) + + settingsStore mdbx.DBI + rawEventStore mdbx.DBI + indexCreatedAt mdbx.DBI + indexId mdbx.DBI + indexKind mdbx.DBI + indexPubkey mdbx.DBI + indexPubkeyKind mdbx.DBI + indexTag mdbx.DBI + indexTag32 mdbx.DBI + indexTagAddr mdbx.DBI + indexPTagKind mdbx.DBI + + hllCache mdbx.DBI + EnableHLLCacheFor func(kind int) (useCache bool, skipSavingActualEvent bool) + + lastId atomic.Uint32 +} + +func (b *MDBXBackend) Init() error { + if b.MaxLimit != 0 { + b.MaxLimitNegentropy = b.MaxLimit + } else { + b.MaxLimit = 500 + if b.MaxLimitNegentropy == 0 { + b.MaxLimitNegentropy = 16777216 + } + } + + // create directory if it doesn't exist and open it + if err := os.MkdirAll(b.Path, 0755); err != nil { + return err + } + + return b.initialize() +} + +func (b *MDBXBackend) Close() { + b.mdbxEnv.Close() +} + +func (b *MDBXBackend) Serial() []byte { + v := b.lastId.Add(1) + vb := make([]byte, 4) + binary.BigEndian.PutUint32(vb[:], uint32(v)) + return vb +} + +func (b *MDBXBackend) initialize() error { + env, err := mdbx.NewEnv() + if err != nil { + return err + } + + b.mdbxEnv = env + if err := b.mdbxEnv.SetOption(mdbx.OptMaxDB, 12); err != nil { + return err + } + if err := b.mdbxEnv.SetOption(mdbx.OptMaxReaders, 1000); err != nil { + return err + } + + if err := env.Open(b.Path, mdbx.NoTLS|mdbx.WriteMap|b.extraFlags, 0644); err != nil { + return err + } + + var multiIndexCreationFlags uint = mdbx.Create | mdbx.DupSort | mdbx.DupFixed + + // open each db + if err := b.mdbxEnv.Update(func(txn *mdbx.Txn) error { + if dbi, err := txn.OpenDBISimple("settings", mdbx.Create); err != nil { + return err + } else { + b.settingsStore = dbi + } + if dbi, err := txn.OpenDBISimple("raw", mdbx.Create); err != nil { + return err + } else { + b.rawEventStore = dbi + } + if dbi, err := txn.OpenDBISimple("created_at", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexCreatedAt = dbi + } + if dbi, err := txn.OpenDBISimple("id", mdbx.Create); err != nil { + return err + } else { + b.indexId = dbi + } + if dbi, err := txn.OpenDBISimple("kind", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexKind = dbi + } + if dbi, err := txn.OpenDBISimple("pubkey", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexPubkey = dbi + } + if dbi, err := txn.OpenDBISimple("pubkeyKind", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexPubkeyKind = dbi + } + if dbi, err := txn.OpenDBISimple("tag", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexTag = dbi + } + if dbi, err := txn.OpenDBISimple("tag32", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexTag32 = dbi + } + if dbi, err := txn.OpenDBISimple("tagaddr", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexTagAddr = dbi + } + if dbi, err := txn.OpenDBISimple("ptagKind", multiIndexCreationFlags); err != nil { + return err + } else { + b.indexPTagKind = dbi + } + if dbi, err := txn.OpenDBISimple("hllCache", mdbx.Create); err != nil { + return err + } else { + b.hllCache = dbi + } + return nil + }); err != nil { + return err + } + + // get lastId + if err := b.mdbxEnv.View(func(txn *mdbx.Txn) error { + cursor, err := txn.OpenCursor(b.rawEventStore) + if err != nil { + return err + } + defer cursor.Close() + k, _, err := cursor.Get(nil, nil, mdbx.Last) + if operr, ok := err.(*mdbx.OpError); ok && operr.Errno == mdbx.NotFound { + // nothing found, so we're at zero + return nil + } + if err != nil { + } + b.lastId.Store(binary.BigEndian.Uint32(k)) + + return nil + }); err != nil { + return err + } + + return b.runMigrations() +} diff --git a/mdbx/migration.go b/mdbx/migration.go new file mode 100644 index 0000000..c7a4d19 --- /dev/null +++ b/mdbx/migration.go @@ -0,0 +1,63 @@ +package mdbx + +import ( + "encoding/binary" + "fmt" + + "github.com/erigontech/mdbx-go/mdbx" +) + +const ( + DB_VERSION byte = 'v' +) + +func (b *MDBXBackend) runMigrations() error { + return b.mdbxEnv.Update(func(txn *mdbx.Txn) error { + var version uint16 + v, err := txn.Get(b.settingsStore, []byte{DB_VERSION}) + if err != nil { + if mdbxErr, ok := err.(*mdbx.OpError); ok && mdbxErr.Errno == mdbx.NotFound { + version = 0 + } else if v == nil { + return fmt.Errorf("failed to read database version: %w", err) + } + } else { + version = binary.BigEndian.Uint16(v) + } + + if version < 0 { + // if there is any data in the relay we will just set the version to the max without saying anything + cursor, err := txn.OpenCursor(b.rawEventStore) + if err != nil { + return fmt.Errorf("failed to open cursor in migration: %w", err) + } + defer cursor.Close() + + hasAnyEntries := false + _, _, err = cursor.Get(nil, nil, mdbx.First) + for err == nil { + hasAnyEntries = true + break + } + + if !hasAnyEntries { + b.setVersion(txn, 0) + return nil + } + } + + // do the migrations in increasing steps (there is no rollback) + // + + if version < 0 { + } + + return nil + }) +} + +func (b *MDBXBackend) setVersion(txn *mdbx.Txn, version uint16) error { + buf, err := txn.PutReserve(b.settingsStore, []byte{DB_VERSION}, 4, 0) + binary.BigEndian.PutUint16(buf, version) + return err +} diff --git a/mdbx/query.go b/mdbx/query.go new file mode 100644 index 0000000..eff5253 --- /dev/null +++ b/mdbx/query.go @@ -0,0 +1,409 @@ +package mdbx + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "log" + "slices" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/fiatjaf/eventstore" + "github.com/fiatjaf/eventstore/internal" + bin "github.com/fiatjaf/eventstore/internal/binary" + "github.com/nbd-wtf/go-nostr" +) + +func (b *MDBXBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { + ch := make(chan *nostr.Event) + + if filter.Search != "" { + close(ch) + return ch, nil + } + + // max number of events we'll return + maxLimit := b.MaxLimit + var limit int + if eventstore.IsNegentropySession(ctx) { + maxLimit = b.MaxLimitNegentropy + limit = maxLimit + } else { + limit = maxLimit / 4 + } + if filter.Limit > 0 && filter.Limit <= maxLimit { + limit = filter.Limit + } + if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 { + close(ch) + return ch, nil + } else if tlimit > 0 { + limit = tlimit + } + + go b.mdbxEnv.View(func(txn *mdbx.Txn) error { + defer close(ch) + results, err := b.query(txn, filter, limit) + + for _, ie := range results { + ch <- ie.Event + } + + return err + }) + + return ch, nil +} + +func (b *MDBXBackend) query(txn *mdbx.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) { + queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter) + if err != nil { + return nil, err + } + + iterators := make([]*iterator, len(queries)) + exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore + results := make([][]internal.IterEvent, len(queries)) + pulledPerQuery := make([]int, len(queries)) + + // these are kept updated so we never pull from the iterator that is at further distance + // (i.e. the one that has the oldest event among all) + // we will continue to pull from it as soon as some other iterator takes the position + oldest := internal.IterEvent{Q: -1} + + secondPhase := false // after we have gathered enough events we will change the way we iterate + secondBatch := make([][]internal.IterEvent, 0, len(queries)+1) + secondPhaseParticipants := make([]int, 0, len(queries)+1) + + // while merging results in the second phase we will alternate between these two lists + // to avoid having to create new lists all the time + var secondPhaseResultsA []internal.IterEvent + var secondPhaseResultsB []internal.IterEvent + var secondPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating + var secondPhaseHasResultsPending bool + + remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing + batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) + firstPhaseTotalPulled := 0 + + exhaust := func(q int) { + exhausted[q] = true + remainingUnexhausted-- + if q == oldest.Q { + oldest = internal.IterEvent{Q: -1} + } + } + + var firstPhaseResults []internal.IterEvent + + for q := range queries { + cursor, err := txn.OpenCursor(queries[q].dbi) + if err != nil { + return nil, err + } + iterators[q] = &iterator{cursor: cursor} + defer cursor.Close() + iterators[q].seek(queries[q].startingPoint) + results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2) + } + + // fmt.Println("queries", len(queries)) + + for c := 0; ; c++ { + batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) + + // fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery) + // we will go through all the iterators in batches until we have pulled all the required results + for q, query := range queries { + if exhausted[q] { + continue + } + if oldest.Q == q && remainingUnexhausted > 1 { + continue + } + // fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q])) + + it := iterators[q] + pulledThisIteration := 0 + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if it.err != nil || + len(it.key) != query.keySize || + !bytes.HasPrefix(it.key, query.prefix) { + // either iteration has errored or we reached the end of this prefix + // fmt.Println(" reached end", it.key, query.keySize, query.prefix) + exhaust(q) + break + } + + // "id" indexes don't contain a timestamp + if query.timestampSize == 4 { + createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) + if createdAt < since { + // fmt.Println(" reached since", createdAt, "<", since) + exhaust(q) + break + } + } + + // fetch actual event + val, err := txn.Get(b.rawEventStore, it.valIdx) + if err != nil { + log.Printf( + "mdbx: failed to get %x based on prefix %x, index key %x from raw event store: %s\n", + it.valIdx, query.prefix, it.key, err) + return nil, fmt.Errorf("iteration error: %w", err) + } + + // check it against pubkeys without decoding the entire thing + if extraAuthors != nil && !slices.Contains(extraAuthors, [32]byte(val[32:64])) { + it.next() + continue + } + + // check it against kinds without decoding the entire thing + if extraKinds != nil && !slices.Contains(extraKinds, [2]byte(val[132:134])) { + it.next() + continue + } + + // decode the entire thing + event := &nostr.Event{} + if err := bin.Unmarshal(val, event); err != nil { + log.Printf("mdbx: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n", val[0:32], + query.prefix, query.startingPoint, query.dbi, err) + return nil, fmt.Errorf("event read error: %w", err) + } + + // fmt.Println(" event", hex.EncodeToString(val[0:4]), "kind", binary.BigEndian.Uint16(val[132:134]), "author", hex.EncodeToString(val[32:36]), "ts", nostr.Timestamp(binary.BigEndian.Uint32(val[128:132])), hex.EncodeToString(it.key), it.valIdx) + + // if there is still a tag to be checked, do it now + if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { + it.next() + continue + } + + // this event is good to be used + evt := internal.IterEvent{Event: event, Q: q} + // + // + if secondPhase { + // do the process described below at HIWAWVRTP. + // if we've reached here this means we've already passed the `since` check. + // now we have to eliminate the event currently at the `since` threshold. + nextThreshold := firstPhaseResults[len(firstPhaseResults)-2] + if oldest.Event == nil { + // fmt.Println(" b1", evt.ID[0:8]) + // BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE) + // when we don't have the oldest set, we will keep the results + // and not change the cutting point -- it's bad, but hopefully not that bad. + results[q] = append(results[q], evt) + secondPhaseHasResultsPending = true + } else if nextThreshold.CreatedAt > oldest.CreatedAt { + // fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8]) + // one of the events we have stored is the actual next threshold + // eliminate last, update since with oldest + firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] + since = uint32(oldest.CreatedAt) + // fmt.Println(" new since", since, evt.ID[0:8]) + // we null the oldest Event as we can't rely on it anymore + // (we'll fall under BWWDHTOE above) until we have a new oldest set. + oldest = internal.IterEvent{Q: -1} + // anything we got that would be above this won't trigger an update to + // the oldest anyway, because it will be discarded as being after the limit. + // + // finally + // add this to the results to be merged later + results[q] = append(results[q], evt) + secondPhaseHasResultsPending = true + } else if nextThreshold.CreatedAt < evt.CreatedAt { + // the next last event in the firstPhaseResults is the next threshold + // fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8]) + // eliminate last, update since with the antelast + firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] + since = uint32(nextThreshold.CreatedAt) + // fmt.Println(" new since", since) + // add this to the results to be merged later + results[q] = append(results[q], evt) + secondPhaseHasResultsPending = true + // update the oldest event + if evt.CreatedAt < oldest.CreatedAt { + oldest = evt + } + } else { + // fmt.Println(" b4", evt.ID[0:8]) + // oops, _we_ are the next `since` threshold + firstPhaseResults[len(firstPhaseResults)-1] = evt + since = uint32(evt.CreatedAt) + // fmt.Println(" new since", since) + // do not add us to the results to be merged later + // as we're already inhabiting the firstPhaseResults slice + } + } else { + results[q] = append(results[q], evt) + firstPhaseTotalPulled++ + + // update the oldest event + if oldest.Event == nil || evt.CreatedAt < oldest.CreatedAt { + oldest = evt + } + } + + pulledPerQuery[q]++ + pulledThisIteration++ + if pulledThisIteration > batchSizePerQuery { + // batch filled + it.next() + // fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx) + break + } + if pulledPerQuery[q] >= limit { + // batch filled + reached limit for this query (which is the global limit) + exhaust(q) + it.next() + break + } + + it.next() + } + } + + // we will do this check if we don't accumulated the requested number of events yet + // fmt.Println("oldest", oldest.Event, "from iter", oldest.Q) + if secondPhase && secondPhaseHasResultsPending && (oldest.Event == nil || remainingUnexhausted == 0) { + // fmt.Println("second phase aggregation!") + // when we are in the second phase we will aggressively aggregate results on every iteration + // + secondBatch = secondBatch[:0] + for s := 0; s < len(secondPhaseParticipants); s++ { + q := secondPhaseParticipants[s] + + if len(results[q]) > 0 { + secondBatch = append(secondBatch, results[q]) + } + + if exhausted[q] { + secondPhaseParticipants = internal.SwapDelete(secondPhaseParticipants, s) + s-- + } + } + + // every time we get here we will alternate between these A and B lists + // combining everything we have into a new partial results list. + // after we've done that we can again set the oldest. + // fmt.Println(" xxx", secondPhaseResultsToggle) + if secondPhaseResultsToggle { + secondBatch = append(secondBatch, secondPhaseResultsB) + secondPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsA) + oldest = secondPhaseResultsA[len(secondPhaseResultsA)-1] + // fmt.Println(" new aggregated a", len(secondPhaseResultsB)) + } else { + secondBatch = append(secondBatch, secondPhaseResultsA) + secondPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsB) + oldest = secondPhaseResultsB[len(secondPhaseResultsB)-1] + // fmt.Println(" new aggregated b", len(secondPhaseResultsB)) + } + secondPhaseResultsToggle = !secondPhaseResultsToggle + + since = uint32(oldest.CreatedAt) + // fmt.Println(" new since", since) + + // reset the `results` list so we can keep using it + results = results[:len(queries)] + for _, q := range secondPhaseParticipants { + results[q] = results[q][:0] + } + } else if !secondPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 { + // fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted) + + // we will exclude this oldest number as it is not relevant anymore + // (we now want to keep track only of the oldest among the remaining iterators) + oldest = internal.IterEvent{Q: -1} + + // HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP) + // now we can combine the results we have and check what is our current oldest event. + // we also discard anything that is after the current cutting point (`limit`). + // so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total + // we can just keep [1,2,3,7,10,15] and discard [20, 21, 49], + // and also adjust our `since` parameter to `15`, discarding anything we get after it + // and immediately declaring that iterator exhausted. + // also every time we get result that is more recent than this updated `since` we can + // keep it but also discard the previous since, moving the needle one back -- for example, + // if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15` + // in the process. + all := make([][]internal.IterEvent, len(results)) + copy(all, results) // we have to use this otherwise internal.MergeSortMultiple will scramble our results slice + firstPhaseResults = internal.MergeSortMultiple(all, limit, nil) + oldest = firstPhaseResults[limit-1] + since = uint32(oldest.CreatedAt) + // fmt.Println("new since", since) + + for q := range queries { + if exhausted[q] { + continue + } + + // we also automatically exhaust any of the iterators that have already passed the + // cutting point (`since`) + if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt { + exhausted[q] = true + remainingUnexhausted-- + continue + } + + // for all the remaining iterators, + // since we have merged all the events in this `firstPhaseResults` slice, we can empty the + // current `results` slices and reuse them. + results[q] = results[q][:0] + + // build this index of indexes with everybody who remains + secondPhaseParticipants = append(secondPhaseParticipants, q) + } + + // we create these two lists and alternate between them so we don't have to create a + // a new one every time + secondPhaseResultsA = make([]internal.IterEvent, 0, limit*2) + secondPhaseResultsB = make([]internal.IterEvent, 0, limit*2) + + // from now on we won't run this block anymore + secondPhase = true + } + + // fmt.Println("remaining", remainingUnexhausted) + if remainingUnexhausted == 0 { + break + } + } + + // fmt.Println("is secondPhase?", secondPhase) + + var combinedResults []internal.IterEvent + + if secondPhase { + // fmt.Println("ending second phase") + // when we reach this point either secondPhaseResultsA or secondPhaseResultsB will be full of stuff, + // the other will be empty + var secondPhaseResults []internal.IterEvent + // fmt.Println("xxx", secondPhaseResultsToggle, len(secondPhaseResultsA), len(secondPhaseResultsB)) + if secondPhaseResultsToggle { + secondPhaseResults = secondPhaseResultsB + combinedResults = secondPhaseResultsA[0:limit] // reuse this + // fmt.Println(" using b", len(secondPhaseResultsA)) + } else { + secondPhaseResults = secondPhaseResultsA + combinedResults = secondPhaseResultsB[0:limit] // reuse this + // fmt.Println(" using a", len(secondPhaseResultsA)) + } + + all := [][]internal.IterEvent{firstPhaseResults, secondPhaseResults} + combinedResults = internal.MergeSortMultiple(all, limit, combinedResults) + // fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit) + } else { + combinedResults = make([]internal.IterEvent, limit) + combinedResults = internal.MergeSortMultiple(results, limit, combinedResults) + } + + return combinedResults, nil +} diff --git a/mdbx/query_planner.go b/mdbx/query_planner.go new file mode 100644 index 0000000..d7c95da --- /dev/null +++ b/mdbx/query_planner.go @@ -0,0 +1,218 @@ +package mdbx + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/fiatjaf/eventstore/internal" + "github.com/nbd-wtf/go-nostr" +) + +type query struct { + i int + dbi mdbx.DBI + prefix []byte + results chan *nostr.Event + keySize int + timestampSize int + startingPoint []byte +} + +func (b *MDBXBackend) prepareQueries(filter nostr.Filter) ( + queries []query, + extraAuthors [][32]byte, + extraKinds [][2]byte, + extraTagKey string, + extraTagValues []string, + since uint32, + err error, +) { + // we will apply this to every query we return + defer func() { + if queries == nil { + return + } + + var until uint32 = 4294967295 + if filter.Until != nil { + if fu := uint32(*filter.Until); fu < until { + until = fu + 1 + } + } + for i, q := range queries { + sp := make([]byte, len(q.prefix)) + sp = sp[0:len(q.prefix)] + copy(sp, q.prefix) + queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until)) + queries[i].results = make(chan *nostr.Event, 12) + } + }() + + if filter.IDs != nil { + // when there are ids we ignore everything else + queries = make([]query, len(filter.IDs)) + for i, idHex := range filter.IDs { + if len(idHex) != 64 { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid id '%s'", idHex) + } + prefix := make([]byte, 8) + if _, err := hex.Decode(prefix[0:8], []byte(idHex[0:8*2])); err != nil { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid id '%s'", idHex) + } + queries[i] = query{i: i, dbi: b.indexId, prefix: prefix[0:8], keySize: 8, timestampSize: 0} + } + return queries, nil, nil, "", nil, 0, nil + } + + // this is where we'll end the iteration + if filter.Since != nil { + if fs := uint32(*filter.Since); fs > since { + since = fs + } + } + + if len(filter.Tags) > 0 { + // we will select ONE tag to query for and ONE extra tag to do further narrowing, if available + tagKey, tagValues, goodness := internal.ChooseNarrowestTag(filter) + + // we won't use a tag index for this as long as we have something else to match with + if goodness < 2 && (len(filter.Authors) > 0 || len(filter.Kinds) > 0) { + goto pubkeyMatching + } + + // only "p" tag has a goodness of 2, so + if goodness == 2 { + // this means we got a "p" tag, so we will use the ptag-kind index + i := 0 + if filter.Kinds != nil { + queries = make([]query, len(tagValues)*len(filter.Kinds)) + for _, value := range tagValues { + if len(value) != 64 { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value) + } + + for _, kind := range filter.Kinds { + k := make([]byte, 8+2) + if _, err := hex.Decode(k[0:8], []byte(value[0:8*2])); err != nil { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value) + } + binary.BigEndian.PutUint16(k[8:8+2], uint16(kind)) + queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0 : 8+2], keySize: 8 + 2 + 4, timestampSize: 4} + i++ + } + } + } else { + // even if there are no kinds, in that case we will just return any kind and not care + queries = make([]query, len(tagValues)) + for i, value := range tagValues { + if len(value) != 64 { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value) + } + + k := make([]byte, 8) + if _, err := hex.Decode(k[0:8], []byte(value[0:8*2])); err != nil { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid 'p' tag '%s'", value) + } + queries[i] = query{i: i, dbi: b.indexPTagKind, prefix: k[0:8], keySize: 8 + 2 + 4, timestampSize: 4} + } + } + } else { + // otherwise we will use a plain tag index + queries = make([]query, len(tagValues)) + for i, value := range tagValues { + // get key prefix (with full length) and offset where to write the created_at + dbi, k, offset := b.getTagIndexPrefix(value) + // remove the last parts part to get just the prefix we want here + prefix := k[0:offset] + queries[i] = query{i: i, dbi: dbi, prefix: prefix, keySize: len(prefix) + 4, timestampSize: 4} + i++ + } + + // add an extra kind filter if available (only do this on plain tag index, not on ptag-kind index) + if filter.Kinds != nil { + extraKinds = make([][2]byte, len(filter.Kinds)) + for i, kind := range filter.Kinds { + binary.BigEndian.PutUint16(extraKinds[i][0:2], uint16(kind)) + } + } + } + + // add an extra author search if possible + if filter.Authors != nil { + extraAuthors = make([][32]byte, len(filter.Authors)) + for i, pk := range filter.Authors { + hex.Decode(extraAuthors[i][:], []byte(pk)) + } + } + + // add an extra useless tag if available + filter.Tags = internal.CopyMapWithoutKey(filter.Tags, tagKey) + if len(filter.Tags) > 0 { + extraTagKey, extraTagValues, _ = internal.ChooseNarrowestTag(filter) + } + + return queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, nil + } + +pubkeyMatching: + if len(filter.Authors) > 0 { + if len(filter.Kinds) == 0 { + // will use pubkey index + queries = make([]query, len(filter.Authors)) + for i, pubkeyHex := range filter.Authors { + if len(pubkeyHex) != 64 { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid author '%s'", pubkeyHex) + } + prefix := make([]byte, 8) + if _, err := hex.Decode(prefix[0:8], []byte(pubkeyHex[0:8*2])); err != nil { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid author '%s'", pubkeyHex) + } + queries[i] = query{i: i, dbi: b.indexPubkey, prefix: prefix[0:8], keySize: 8 + 4, timestampSize: 4} + } + } else { + // will use pubkeyKind index + queries = make([]query, len(filter.Authors)*len(filter.Kinds)) + i := 0 + for _, pubkeyHex := range filter.Authors { + for _, kind := range filter.Kinds { + if len(pubkeyHex) != 64 { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid author '%s'", pubkeyHex) + } + prefix := make([]byte, 8+2) + if _, err := hex.Decode(prefix[0:8], []byte(pubkeyHex[0:8*2])); err != nil { + return nil, nil, nil, "", nil, 0, fmt.Errorf("invalid author '%s'", pubkeyHex) + } + binary.BigEndian.PutUint16(prefix[8:8+2], uint16(kind)) + queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix[0 : 8+2], keySize: 10 + 4, timestampSize: 4} + i++ + } + } + } + + // potentially with an extra useless tag filtering + extraTagKey, extraTagValues, _ = internal.ChooseNarrowestTag(filter) + return queries, nil, nil, extraTagKey, extraTagValues, since, nil + } + + if len(filter.Kinds) > 0 { + // will use a kind index + queries = make([]query, len(filter.Kinds)) + for i, kind := range filter.Kinds { + prefix := make([]byte, 2) + binary.BigEndian.PutUint16(prefix[0:2], uint16(kind)) + queries[i] = query{i: i, dbi: b.indexKind, prefix: prefix[0:2], keySize: 2 + 4, timestampSize: 4} + } + + // potentially with an extra useless tag filtering + tagKey, tagValues, _ := internal.ChooseNarrowestTag(filter) + return queries, nil, nil, tagKey, tagValues, since, nil + } + + // if we got here our query will have nothing to filter with + queries = make([]query, 1) + prefix := make([]byte, 0) + queries[0] = query{i: 0, dbi: b.indexCreatedAt, prefix: prefix, keySize: 0 + 4, timestampSize: 4} + return queries, nil, nil, "", nil, since, nil +} diff --git a/mdbx/replace.go b/mdbx/replace.go new file mode 100644 index 0000000..2fa5d7d --- /dev/null +++ b/mdbx/replace.go @@ -0,0 +1,49 @@ +package mdbx + +import ( + "context" + "fmt" + "math" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/fiatjaf/eventstore/internal" + "github.com/nbd-wtf/go-nostr" +) + +func (b *MDBXBackend) ReplaceEvent(ctx context.Context, evt *nostr.Event) error { + // sanity checking + if evt.CreatedAt > math.MaxUint32 || evt.Kind > math.MaxUint16 { + return fmt.Errorf("event with values out of expected boundaries") + } + + return b.mdbxEnv.Update(func(txn *mdbx.Txn) error { + filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}} + if nostr.IsAddressableKind(evt.Kind) { + // when addressable, add the "d" tag to the filter + filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}} + } + + // now we fetch the past events, whatever they are, delete them and then save the new + results, err := b.query(txn, filter, 10) // in theory limit could be just 1 and this should work + if err != nil { + return fmt.Errorf("failed to query past events with %s: %w", filter, err) + } + + shouldStore := true + for _, previous := range results { + if internal.IsOlder(previous.Event, evt) { + if err := b.delete(txn, previous.Event); err != nil { + return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err) + } + } else { + // there is a newer event already stored, so we won't store this + shouldStore = false + } + } + if shouldStore { + return b.save(txn, evt) + } + + return nil + }) +} diff --git a/mdbx/save.go b/mdbx/save.go new file mode 100644 index 0000000..09c28c7 --- /dev/null +++ b/mdbx/save.go @@ -0,0 +1,69 @@ +package mdbx + +import ( + "context" + "encoding/hex" + "fmt" + "math" + + "github.com/erigontech/mdbx-go/mdbx" + "github.com/fiatjaf/eventstore" + bin "github.com/fiatjaf/eventstore/internal/binary" + "github.com/nbd-wtf/go-nostr" +) + +func (b *MDBXBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { + // sanity checking + if evt.CreatedAt > math.MaxUint32 || evt.Kind > math.MaxUint16 { + return fmt.Errorf("event with values out of expected boundaries") + } + + return b.mdbxEnv.Update(func(txn *mdbx.Txn) error { + // modify hyperloglog caches relative to this + useCache, skipSaving := b.EnableHLLCacheFor(evt.Kind) + + if useCache { + err := b.updateHyperLogLogCachedValues(txn, evt) + if err != nil { + return fmt.Errorf("failed to update hll cache: %w", err) + } + if skipSaving { + return nil + } + } + + // check if we already have this id + id, _ := hex.DecodeString(evt.ID) + _, err := txn.Get(b.indexId, id) + if operr, ok := err.(*mdbx.OpError); ok && operr.Errno != mdbx.NotFound { + // we will only proceed if we get a NotFound + return eventstore.ErrDupEvent + } + + return b.save(txn, evt) + }) +} + +func (b *MDBXBackend) save(txn *mdbx.Txn, evt *nostr.Event) error { + // encode to binary form so we'll save it + bin, err := bin.Marshal(evt) + if err != nil { + return err + } + + idx := b.Serial() + // raw event store + if err := txn.Put(b.rawEventStore, idx, bin, 0); err != nil { + return err + } + + // put indexes + for k := range b.getIndexKeysForEvent(evt) { + err := txn.Put(k.dbi, k.key, idx, 0) + if err != nil { + return err + } + } + + return nil +} diff --git a/mdbx/testdata/fuzz/FuzzQuery b/mdbx/testdata/fuzz/FuzzQuery new file mode 120000 index 0000000..eed0ba0 --- /dev/null +++ b/mdbx/testdata/fuzz/FuzzQuery @@ -0,0 +1 @@ +../../../internal/testdata/fuzz/FuzzQuery \ No newline at end of file