Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Nov 24, 2024
1 parent 65632c8 commit 9175e8b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 3 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,5 @@ require (
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/nbd-wtf/go-nostr => ../go-nostr
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/nbd-wtf/go-nostr v0.42.0 h1:EofWfXEhKic9AYVf4RHuXZr+kKUZE2jVyJtJByNe1rE=
github.com/nbd-wtf/go-nostr v0.42.0/go.mod h1:FBa4FBJO7NuANvkeKSlrf0BIyxGufmrUbuelr6Q4Ick=
github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf0CBgWP5XUaSp0HMsRBlA=
github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
97 changes: 97 additions & 0 deletions lmdb/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/PowerDNS/lmdb-go/lmdb"
bin "github.com/fiatjaf/eventstore/internal/binary"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -105,3 +106,99 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int

return count, err
}

func (b *LMDBBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter) (int64, *hyperloglog.HyperLogLog, error) {
cacheReference := ""

if filter.Authors == 1 {
cacheReference = filter.Authors[0]
} else if filter.

filter.Tags

err = b.lmdbEnv.View(func(txn *lmdb.Txn) error {
// actually iterate
for _, q := range queries {
cursor, err := txn.OpenCursor(q.dbi)
if err != nil {
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil {
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, lmdb.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since {
break
}
}

if extraAuthors == nil && extraKinds == nil && extraTagValues == nil {
count++
} else {
// fetch actual event
val, err := txn.Get(b.rawEventStore, idx)
if err != nil {
panic(err)
}

// check it against pubkeys without decoding the entire thing
if !slices.Contains(extraAuthors, [32]byte(val[32:64])) {
goto loopend
}

// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
}

// if there is still a tag to be checked, do it now
if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) {
goto loopend
}

count++
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
}

return nil
})

return count, err
}
9 changes: 8 additions & 1 deletion lmdb/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type LMDBBackend struct {
indexTagAddr lmdb.DBI
indexPTagKind lmdb.DBI

hllCache lmdb.DBI

lastId atomic.Uint32
}

Expand All @@ -55,7 +57,7 @@ func (b *LMDBBackend) Init() error {
return err
}

env.SetMaxDBs(11)
env.SetMaxDBs(12)
env.SetMaxReaders(1000)
if b.MapSize == 0 {
env.SetMapSize(1 << 38) // ~273GB
Expand Down Expand Up @@ -133,6 +135,11 @@ func (b *LMDBBackend) Init() error {
} else {
b.indexPTagKind = dbi
}
if dbi, err := txn.OpenDBI("hllCache", lmdb.Create); err != nil {
return err
} else {
b.hllCache = dbi
}
return nil
}); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"

Check failure on line 7 in store.go

View workflow job for this annotation

GitHub Actions / test

github.com/nbd-wtf/[email protected]: replacement directory ../go-nostr does not exist
)

// Store is a persistence layer for nostr events handled by a relay.
Expand All @@ -27,3 +28,7 @@ type Store interface {
type Counter interface {
CountEvents(context.Context, nostr.Filter) (int64, error)
}

type HLLCounter interface {
CountEventsHLL(context.Context, nostr.Filter) (int64, *hyperloglog.HyperLogLog, error)
}

0 comments on commit 9175e8b

Please sign in to comment.