From b5d9a176c9d3c38a2aa4069f72921f1a7b6839ed Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 7 Dec 2024 00:24:19 -0300 Subject: [PATCH] lmdb: support hyperloglog counting (cached and not). --- go.mod | 5 +- go.sum | 12 +++- lmdb/count.go | 158 ++++++++++++++++++++++++++++++++++++++++++++++++++ lmdb/lib.go | 10 +++- lmdb/save.go | 14 +++++ 5 files changed, 195 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index f71b020..ad75c92 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/lib/pq v1.10.9 github.com/mailru/easyjson v0.7.7 github.com/mattn/go-sqlite3 v1.14.18 - github.com/nbd-wtf/go-nostr v0.42.0 + github.com/nbd-wtf/go-nostr v0.43.0 github.com/opensearch-project/opensearch-go/v4 v4.0.0 github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 @@ -59,8 +59,11 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.10 // indirect github.com/kr/text v0.2.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index b4c35cc..510cedb 100644 --- a/go.sum +++ b/go.sum @@ -140,6 +140,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -152,6 +153,8 @@ github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -178,10 +181,14 @@ github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+ github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nbd-wtf/go-nostr v0.42.0 h1:EofWfXEhKic9AYVf4RHuXZr+kKUZE2jVyJtJByNe1rE= -github.com/nbd-wtf/go-nostr v0.42.0/go.mod h1:FBa4FBJO7NuANvkeKSlrf0BIyxGufmrUbuelr6Q4Ick= +github.com/nbd-wtf/go-nostr v0.43.0 h1:KJh/HXjKkhNCejRswHWVg8IgoAyLjw4iWxu/JDUZzqM= +github.com/nbd-wtf/go-nostr v0.43.0/go.mod h1:8YfmT9tBuRT+4nWHuMBDh+xSIZqAdZC6QIOgQfBgWxU= github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf0CBgWP5XUaSp0HMsRBlA= github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -206,6 +213,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/lmdb/count.go b/lmdb/count.go index 6e89146..e841592 100644 --- a/lmdb/count.go +++ b/lmdb/count.go @@ -4,10 +4,13 @@ import ( "bytes" "context" "encoding/binary" + "encoding/hex" "github.com/PowerDNS/lmdb-go/lmdb" bin "github.com/fiatjaf/eventstore/internal/binary" "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip45" + "github.com/nbd-wtf/go-nostr/nip45/hyperloglog" "golang.org/x/exp/slices" ) @@ -105,3 +108,158 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int return count, err } + +// CountEventsHLL is like CountEvents, but it will build a hyperloglog value while iterating through results, following NIP-45 +func (b *LMDBBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error) { + if useCache, _ := b.EnableHLLCacheFor(filter.Kinds[0]); useCache { + return b.countEventsHLLCached(filter) + } + + var count int64 = 0 + + // this is different than CountEvents because some of these extra checks are not applicable in HLL-valid filters + queries, _, extraKinds, _, _, since, err := b.prepareQueries(filter) + if err != nil { + return 0, nil, err + } + + hll := hyperloglog.New(offset) + + err = b.lmdbEnv.View(func(txn *lmdb.Txn) error { + // actually iterate + for _, q := range queries { + cursor, err := txn.OpenCursor(q.dbi) + if err != nil { + continue + } + + var k []byte + var idx []byte + var iterr error + + if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil { + if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { + // in this case it's really an error + panic(operr) + } else { + // we're at the end and we just want notes before this, + // so we just need to set the cursor the last key, this is not a real error + k, idx, iterr = cursor.Get(nil, nil, lmdb.Last) + } + } else { + // move one back as the first step + k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) + } + + for { + // we already have a k and a v and an err from the cursor setup, so check and use these + if iterr != nil || + len(k) != q.keySize || + !bytes.HasPrefix(k, q.prefix) { + // either iteration has errored or we reached the end of this prefix + break // stop this cursor and move to the next one + } + + // "id" indexes don't contain a timestamp + if q.timestampSize == 4 { + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) + if createdAt < since { + break + } + } + + // fetch actual event (we need it regardless because we need the pubkey for the hll) + val, err := txn.Get(b.rawEventStore, idx) + if err != nil { + panic(err) + } + + if extraKinds == nil { + // nothing extra to check + count++ + hll.AddBytes(val[32:64]) + } else { + // check it against kinds without decoding the entire thing + if !slices.Contains(extraKinds, [2]byte(val[132:134])) { + goto loopend + } + + evt := &nostr.Event{} + if err := bin.Unmarshal(val, evt); err != nil { + goto loopend + } + + count++ + hll.Add(evt.PubKey) + } + + // move one back (we'll look into k and v and err in the next iteration) + loopend: + k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) + } + } + + return nil + }) + + return count, hll, err +} + +// countEventsHLLCached will just return a cached value from disk (and presumably we don't even have the events required to compute this anymore). +func (b *LMDBBackend) countEventsHLLCached(filter nostr.Filter) (int64, *hyperloglog.HyperLogLog, error) { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(filter.Kinds[0])) + switch filter.Kinds[0] { + case 3: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["p"][0][0:8*2])) + case 7: + hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["e"][0][0:8*2])) + } + + var count int64 + var hll *hyperloglog.HyperLogLog + + err := b.lmdbEnv.View(func(txn *lmdb.Txn) error { + val, err := txn.Get(b.hllCache, cacheKey) + if err != nil { + if lmdb.IsNotFound(err) { + return nil + } + return err + } + hll = hyperloglog.NewWithRegisters(val, 0) // offset doesn't matter here + count = int64(hll.Count()) + return nil + }) + + return count, hll, err +} + +func (b *LMDBBackend) updateHyperLogLogCachedValues(txn *lmdb.Txn, evt *nostr.Event) error { + cacheKey := make([]byte, 2+8) + binary.BigEndian.PutUint16(cacheKey[0:2], uint16(evt.Kind)) + + for ref, offset := range nip45.HyperLogLogEventPubkeyOffsetsAndReferencesForEvent(evt) { + // setup cache key (reusing buffer) + hex.Decode(cacheKey[2:2+8], []byte(ref[0:8*2])) + + // fetch hll value from cache db + hll := hyperloglog.New(offset) + val, err := txn.Get(b.hllCache, cacheKey) + if err == nil { + hll.SetRegisters(val) + } else if !lmdb.IsNotFound(err) { + return err + } + + // add this event + hll.Add(evt.PubKey) + + // save values back again + if err := txn.Put(b.hllCache, cacheKey, hll.GetRegisters(), 0); err != nil { + return err + } + } + + return nil +} diff --git a/lmdb/lib.go b/lmdb/lib.go index f425ba2..2d8c23a 100644 --- a/lmdb/lib.go +++ b/lmdb/lib.go @@ -37,6 +37,9 @@ type LMDBBackend struct { indexTagAddr lmdb.DBI indexPTagKind lmdb.DBI + hllCache lmdb.DBI + EnableHLLCacheFor func(kind int) (useCache bool, skipSavingActualEvent bool) + lastId atomic.Uint32 } @@ -55,7 +58,7 @@ func (b *LMDBBackend) Init() error { return err } - env.SetMaxDBs(11) + env.SetMaxDBs(12) env.SetMaxReaders(1000) if b.MapSize == 0 { env.SetMapSize(1 << 38) // ~273GB @@ -133,6 +136,11 @@ func (b *LMDBBackend) Init() error { } else { b.indexPTagKind = dbi } + if dbi, err := txn.OpenDBI("hllCache", lmdb.Create); err != nil { + return err + } else { + b.hllCache = dbi + } return nil }); err != nil { return err diff --git a/lmdb/save.go b/lmdb/save.go index 13a0861..37c70d9 100644 --- a/lmdb/save.go +++ b/lmdb/save.go @@ -18,6 +18,19 @@ func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { } return b.lmdbEnv.Update(func(txn *lmdb.Txn) error { + // modify hyperloglog caches relative to this + useCache, skipSaving := b.EnableHLLCacheFor(evt.Kind) + + if useCache { + err := b.updateHyperLogLogCachedValues(txn, evt) + if err != nil { + return fmt.Errorf("failed to update hll cache: %w", err) + } + if skipSaving { + return nil + } + } + // check if we already have this id id, _ := hex.DecodeString(evt.ID) _, err := txn.Get(b.indexId, id) @@ -38,6 +51,7 @@ func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { return err } + // put indexes for k := range b.getIndexKeysForEvent(evt) { err := txn.Put(k.dbi, k.key, idx, 0) if err != nil {