diff --git a/go.mod b/go.mod index f87ae7d..2041182 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/lib/pq v1.10.9 github.com/mailru/easyjson v0.7.7 github.com/mattn/go-sqlite3 v1.14.24 - github.com/nbd-wtf/go-nostr v0.42.3 + github.com/nbd-wtf/go-nostr v0.43.0 github.com/opensearch-project/opensearch-go/v4 v4.3.0 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.10.0 @@ -64,10 +64,13 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 135e6c9..92e9cf1 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -160,6 +161,8 @@ github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= @@ -186,10 +189,14 @@ github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBW github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nbd-wtf/go-nostr v0.42.3 h1:wimwmXLhF9ScrNTG4by3eSj2p7HUGkLUospX4bHjxQk= -github.com/nbd-wtf/go-nostr v0.42.3/go.mod h1:p29g9i1UiSBKdyXkNa6V8rFqE+wrIn4UY0Emabwdu6A= +github.com/nbd-wtf/go-nostr v0.43.0 h1:KJh/HXjKkhNCejRswHWVg8IgoAyLjw4iWxu/JDUZzqM= +github.com/nbd-wtf/go-nostr v0.43.0/go.mod h1:8YfmT9tBuRT+4nWHuMBDh+xSIZqAdZC6QIOgQfBgWxU= github.com/opensearch-project/opensearch-go/v4 v4.3.0 h1:gmQ+ILFJW6AJimivf+lHGVqCS2SCr/PBBf2Qr1xOCgE= github.com/opensearch-project/opensearch-go/v4 v4.3.0/go.mod h1:+w6KAvEX3S0fVVmZciNLN0CkXhxxem26+F6Y7DoPp04= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -221,6 +228,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/lmdb/count.go b/lmdb/count.go index 6e89146..e841592 100644 --- a/lmdb/count.go +++ b/lmdb/count.go @@ -4,10 +4,13 @@ import ( "bytes" "context" "encoding/binary" + "encoding/hex" "github.com/PowerDNS/lmdb-go/lmdb" bin "github.com/fiatjaf/eventstore/internal/binary" "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip45" + "github.com/nbd-wtf/go-nostr/nip45/hyperloglog" "golang.org/x/exp/slices" ) @@ -105,3 +108,158 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int return count, err } + +// CountEventsHLL is like CountEvents, but it will build a hyperloglog value while iterating through results, following NIP-45 +func (b *LMDBBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error) { + if useCache, _ := b.EnableHLLCacheFor(filter.Kinds[0]); useCache { + return b.countEventsHLLCached(filter) + } + + var count int64 = 0 + + // this is different than CountEvents because some of these extra checks are not applicable in HLL-valid filters + queries, _, extraKinds, _, _, since, err := b.prepareQueries(filter) + if err != nil { + return 0, nil, err + } + + hll := hyperloglog.New(offset) + + err = b.lmdbEnv.View(func(txn *lmdb.Txn) error { + // actually iterate + for _, q := range queries { + cursor, err := txn.OpenCursor(q.dbi) + if err != nil { + continue + } + + var k []byte + var idx []byte + var iterr error + + if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil { + if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { + // in this case it's really an error + panic(operr) + } else { + // we're at the end and we just want notes before this, + // so we just need to set the cursor the last key, this is not a real error + k, idx, iterr = cursor.Get(nil, nil, lmdb.Last) + } + } else { + // move one back as the first step + k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) + } + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if iterr != nil || + len(k) != q.keySize || + !bytes.HasPrefix(k, q.prefix) { + // either iteration has errored or we reached the end of this prefix + break // stop this cursor and move to the next one + } + + // "id" indexes don't contain a timestamp + if q.timestampSize == 4 { + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) + if createdAt < since { + break + } + } + + // fetch actual event (we need it regardless because we need the pubkey for the hll) + val, err := txn.Get(b.rawEventStore, idx) + if err != nil { + panic(err) + } + + if extraKinds == nil { + // nothing extra to check + count++ + hll.AddBytes(val[32:64]) + } else { + // check it against kinds without decoding the entire thing + if !slices.Contains(extraKinds, [2]byte(val[132:134])) { + goto loopend + } + + evt := &nostr.Event{} + if err := bin.Unmarshal(val, evt); err != nil { + goto loopend + } + + count++ + hll.Add(evt.PubKey) + } + + // move one back (we'll look into k and v and err in the next iteration) + loopend: + k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) + } + } + + return nil + }) + + return count, hll, err +} + +// countEventsHLLCached will just return a cached value from disk (and presumably we don't even have the events required to compute this anymore). +func (b *LMDBBackend) countEventsHLLCached(filter nostr.Filter) (int64, *hyperloglog.HyperLogLog, error) { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(filter.Kinds[0])) + switch filter.Kinds[0] { + case 3: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["p"][0][0:8*2])) + case 7: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["e"][0][0:8*2])) + } + + var count int64 + var hll *hyperloglog.HyperLogLog + + err := b.lmdbEnv.View(func(txn *lmdb.Txn) error { + val, err := txn.Get(b.hllCache, cacheKey) + if err != nil { + if lmdb.IsNotFound(err) { + return nil + } + return err + } + hll = hyperloglog.NewWithRegisters(val, 0) // offset doesn't matter here + count = int64(hll.Count()) + return nil + }) + + return count, hll, err +} + +func (b *LMDBBackend) updateHyperLogLogCachedValues(txn *lmdb.Txn, evt *nostr.Event) error { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(evt.Kind)) + + for ref, offset := range nip45.HyperLogLogEventPubkeyOffsetsAndReferencesForEvent(evt) { + // setup cache key (reusing buffer) + hex.Decode(cacheKey[2:2+8], []byte(ref[0:8*2])) + + // fetch hll value from cache db + hll := hyperloglog.New(offset) + val, err := txn.Get(b.hllCache, cacheKey) + if err == nil { + hll.SetRegisters(val) + } else if !lmdb.IsNotFound(err) { + return err + } + + // add this event + hll.Add(evt.PubKey) + + // save values back again + if err := txn.Put(b.hllCache, cacheKey, hll.GetRegisters(), 0); err != nil { + return err + } + } + + return nil +} diff --git a/lmdb/lib.go b/lmdb/lib.go index f425ba2..2d8c23a 100644 --- a/lmdb/lib.go +++ b/lmdb/lib.go @@ -37,6 +37,9 @@ type LMDBBackend struct { indexTagAddr lmdb.DBI indexPTagKind lmdb.DBI + hllCache lmdb.DBI + EnableHLLCacheFor func(kind int) (useCache bool, skipSavingActualEvent bool) + lastId atomic.Uint32 } @@ -55,7 +58,7 @@ func (b *LMDBBackend) Init() error { return err } - env.SetMaxDBs(11) + env.SetMaxDBs(12) env.SetMaxReaders(1000) if b.MapSize == 0 { env.SetMapSize(1 << 38) // ~273GB @@ -133,6 +136,11 @@ func (b *LMDBBackend) Init() error { } else { b.indexPTagKind = dbi } + if dbi, err := txn.OpenDBI("hllCache", lmdb.Create); err != nil { + return err + } else { + b.hllCache = dbi + } return nil }); err != nil { return err diff --git a/lmdb/save.go b/lmdb/save.go index 13a0861..37c70d9 100644 --- a/lmdb/save.go +++ b/lmdb/save.go @@ -18,6 +18,19 @@ func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { } return b.lmdbEnv.Update(func(txn *lmdb.Txn) error { + // modify hyperloglog caches relative to this + useCache, skipSaving := b.EnableHLLCacheFor(evt.Kind) + + if useCache { + err := b.updateHyperLogLogCachedValues(txn, evt) + if err != nil { + return fmt.Errorf("failed to update hll cache: %w", err) + } + if skipSaving { + return nil + } + } + // check if we already have this id id, _ := hex.DecodeString(evt.ID) _, err := txn.Get(b.indexId, id) @@ -38,6 +51,7 @@ func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { return err } + // put indexes for k := range b.getIndexKeysForEvent(evt) { err := txn.Put(k.dbi, k.key, idx, 0) if err != nil {