Skip to content

Commit

Permalink
Merge branch 'hyperloglog'
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Dec 7, 2024
2 parents 43a32d1 + b5d9a17 commit 30c7f08
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 4 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/lib/pq v1.10.9
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-sqlite3 v1.14.24
github.com/nbd-wtf/go-nostr v0.42.3
github.com/nbd-wtf/go-nostr v0.43.0
github.com/opensearch-project/opensearch-go/v4 v4.3.0
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -64,10 +64,13 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
12 changes: 10 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand All @@ -160,6 +161,8 @@ github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
Expand All @@ -186,10 +189,14 @@ github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBW
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/nbd-wtf/go-nostr v0.42.3 h1:wimwmXLhF9ScrNTG4by3eSj2p7HUGkLUospX4bHjxQk=
github.com/nbd-wtf/go-nostr v0.42.3/go.mod h1:p29g9i1UiSBKdyXkNa6V8rFqE+wrIn4UY0Emabwdu6A=
github.com/nbd-wtf/go-nostr v0.43.0 h1:KJh/HXjKkhNCejRswHWVg8IgoAyLjw4iWxu/JDUZzqM=
github.com/nbd-wtf/go-nostr v0.43.0/go.mod h1:8YfmT9tBuRT+4nWHuMBDh+xSIZqAdZC6QIOgQfBgWxU=
github.com/opensearch-project/opensearch-go/v4 v4.3.0 h1:gmQ+ILFJW6AJimivf+lHGVqCS2SCr/PBBf2Qr1xOCgE=
github.com/opensearch-project/opensearch-go/v4 v4.3.0/go.mod h1:+w6KAvEX3S0fVVmZciNLN0CkXhxxem26+F6Y7DoPp04=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down Expand Up @@ -221,6 +228,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
158 changes: 158 additions & 0 deletions lmdb/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"

"github.com/PowerDNS/lmdb-go/lmdb"
bin "github.com/fiatjaf/eventstore/internal/binary"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip45"
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -105,3 +108,158 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int

return count, err
}

// CountEventsHLL is like CountEvents, but it will build a hyperloglog value while iterating through results, following NIP-45
func (b *LMDBBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error) {
if useCache, _ := b.EnableHLLCacheFor(filter.Kinds[0]); useCache {
return b.countEventsHLLCached(filter)
}

var count int64 = 0

// this is different than CountEvents because some of these extra checks are not applicable in HLL-valid filters
queries, _, extraKinds, _, _, since, err := b.prepareQueries(filter)
if err != nil {
return 0, nil, err
}

hll := hyperloglog.New(offset)

err = b.lmdbEnv.View(func(txn *lmdb.Txn) error {
// actually iterate
for _, q := range queries {
cursor, err := txn.OpenCursor(q.dbi)
if err != nil {
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil {
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, lmdb.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since {
break
}
}

// fetch actual event (we need it regardless because we need the pubkey for the hll)
val, err := txn.Get(b.rawEventStore, idx)
if err != nil {
panic(err)
}

if extraKinds == nil {
// nothing extra to check
count++
hll.AddBytes(val[32:64])
} else {
// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
}

count++
hll.Add(evt.PubKey)
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
}

return nil
})

return count, hll, err
}

// countEventsHLLCached will just return a cached value from disk (and presumably we don't even have the events required to compute this anymore).
func (b *LMDBBackend) countEventsHLLCached(filter nostr.Filter) (int64, *hyperloglog.HyperLogLog, error) {
cacheKey := make([]byte, 2+8)
binary.BigEndian.PutUint16(cacheKey[0:2], uint16(filter.Kinds[0]))
switch filter.Kinds[0] {
case 3:
hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["p"][0][0:8*2]))
case 7:
hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["e"][0][0:8*2]))
}

var count int64
var hll *hyperloglog.HyperLogLog

err := b.lmdbEnv.View(func(txn *lmdb.Txn) error {
val, err := txn.Get(b.hllCache, cacheKey)
if err != nil {
if lmdb.IsNotFound(err) {
return nil
}
return err
}
hll = hyperloglog.NewWithRegisters(val, 0) // offset doesn't matter here
count = int64(hll.Count())
return nil
})

return count, hll, err
}

func (b *LMDBBackend) updateHyperLogLogCachedValues(txn *lmdb.Txn, evt *nostr.Event) error {
cacheKey := make([]byte, 2+8)
binary.BigEndian.PutUint16(cacheKey[0:2], uint16(evt.Kind))

for ref, offset := range nip45.HyperLogLogEventPubkeyOffsetsAndReferencesForEvent(evt) {
// setup cache key (reusing buffer)
hex.Decode(cacheKey[2:2+8], []byte(ref[0:8*2]))

// fetch hll value from cache db
hll := hyperloglog.New(offset)
val, err := txn.Get(b.hllCache, cacheKey)
if err == nil {
hll.SetRegisters(val)
} else if !lmdb.IsNotFound(err) {
return err
}

// add this event
hll.Add(evt.PubKey)

// save values back again
if err := txn.Put(b.hllCache, cacheKey, hll.GetRegisters(), 0); err != nil {
return err
}
}

return nil
}
10 changes: 9 additions & 1 deletion lmdb/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type LMDBBackend struct {
indexTagAddr lmdb.DBI
indexPTagKind lmdb.DBI

hllCache lmdb.DBI
EnableHLLCacheFor func(kind int) (useCache bool, skipSavingActualEvent bool)

lastId atomic.Uint32
}

Expand All @@ -55,7 +58,7 @@ func (b *LMDBBackend) Init() error {
return err
}

env.SetMaxDBs(11)
env.SetMaxDBs(12)
env.SetMaxReaders(1000)
if b.MapSize == 0 {
env.SetMapSize(1 << 38) // ~273GB
Expand Down Expand Up @@ -133,6 +136,11 @@ func (b *LMDBBackend) Init() error {
} else {
b.indexPTagKind = dbi
}
if dbi, err := txn.OpenDBI("hllCache", lmdb.Create); err != nil {
return err
} else {
b.hllCache = dbi
}
return nil
}); err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions lmdb/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
}

return b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
// modify hyperloglog caches relative to this
useCache, skipSaving := b.EnableHLLCacheFor(evt.Kind)

if useCache {
err := b.updateHyperLogLogCachedValues(txn, evt)
if err != nil {
return fmt.Errorf("failed to update hll cache: %w", err)
}
if skipSaving {
return nil
}
}

// check if we already have this id
id, _ := hex.DecodeString(evt.ID)
_, err := txn.Get(b.indexId, id)
Expand All @@ -38,6 +51,7 @@ func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
return err
}

// put indexes
for k := range b.getIndexKeysForEvent(evt) {
err := txn.Put(k.dbi, k.key, idx, 0)
if err != nil {
Expand Down

0 comments on commit 30c7f08

Please sign in to comment.