Skip to content

Commit

Permalink
mdbx backend (copied from lmdb).
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Dec 16, 2024
1 parent 90b4b5a commit 88deb04
Show file tree
Hide file tree
Showing 13 changed files with 1,657 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect
github.com/erigontech/mdbx-go v0.38.5 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -63,6 +64,7 @@ require (
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/erigontech/mdbx-go v0.38.5 h1:WJ7xNtwwHjxqyuq/iG+gQFssC+bz/9V8+Ja7kN8n6Yw=
github.com/erigontech/mdbx-go v0.38.5/go.mod h1:lkqHAZqXtFaIPlvTaGAx3VUDuGYZcuhve1l4JVVN1Z0=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fergusstrange/embedded-postgres v1.28.0 h1:Atixd24HCuBHBavnG4eiZAjRizOViwUahKGSjJdz1SU=
Expand Down Expand Up @@ -151,6 +153,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d h1:Azx2B59D4+zpVVtuYb8Oe3uOLi/ift4xfwKdhBX0Cy0=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb v1.7.6/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/jgroeneveld/schema v1.0.0 h1:J0E10CrOkiSEsw6dfb1IfrDJD14pf6QLVJ3tRPl/syI=
Expand Down
270 changes: 270 additions & 0 deletions mdbx/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package mdbx

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"

"github.com/erigontech/mdbx-go/mdbx"
bin "github.com/fiatjaf/eventstore/internal/binary"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip45"
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
"golang.org/x/exp/slices"
)

func (b *MDBXBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
var count int64 = 0

queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter)
if err != nil {
return 0, err
}

err = b.mdbxEnv.View(func(txn *mdbx.Txn) error {
// actually iterate
for _, q := range queries {
cursor, err := txn.OpenCursor(q.dbi)
if err != nil {
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, mdbx.SetRange); errsr != nil {
if operr, ok := errsr.(*mdbx.OpError); !ok || operr.Errno != mdbx.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, mdbx.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since {
break
}
}

if extraAuthors == nil && extraKinds == nil && extraTagValues == nil {
count++
} else {
// fetch actual event
val, err := txn.Get(b.rawEventStore, idx)
if err != nil {
panic(err)
}

// check it against pubkeys without decoding the entire thing
if !slices.Contains(extraAuthors, [32]byte(val[32:64])) {
goto loopend
}

// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
}

// if there is still a tag to be checked, do it now
if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) {
goto loopend
}

count++
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}
}

return nil
})

return count, err
}

// CountEventsHLL is like CountEvents, but it will build a hyperloglog value while iterating through results, following NIP-45
func (b *MDBXBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error) {
if useCache, _ := b.EnableHLLCacheFor(filter.Kinds[0]); useCache {
return b.countEventsHLLCached(filter)
}

var count int64 = 0

// this is different than CountEvents because some of these extra checks are not applicable in HLL-valid filters
queries, _, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter)
if err != nil {
return 0, nil, err
}

hll := hyperloglog.New(offset)

err = b.mdbxEnv.View(func(txn *mdbx.Txn) error {
// actually iterate
for _, q := range queries {
cursor, err := txn.OpenCursor(q.dbi)
if err != nil {
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, mdbx.SetRange); errsr != nil {
if operr, ok := errsr.(*mdbx.OpError); !ok || operr.Errno != mdbx.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, mdbx.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since {
break
}
}

// fetch actual event (we need it regardless because we need the pubkey for the hll)
val, err := txn.Get(b.rawEventStore, idx)
if err != nil {
panic(err)
}

if extraKinds == nil && extraTagValues == nil {
// nothing extra to check
count++
hll.AddBytes(val[32:64])
} else {
// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
}

// if there is still a tag to be checked, do it now
if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) {
goto loopend
}

count++
hll.Add(evt.PubKey)
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}
}

return nil
})

return count, hll, err
}

// countEventsHLLCached will just return a cached value from disk (and presumably we don't even have the events required to compute this anymore).
func (b *MDBXBackend) countEventsHLLCached(filter nostr.Filter) (int64, *hyperloglog.HyperLogLog, error) {
cacheKey := make([]byte, 2+8)
binary.BigEndian.PutUint16(cacheKey[0:2], uint16(filter.Kinds[0]))
switch filter.Kinds[0] {
case 3:
hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["p"][0][0:8*2]))
case 7:
hex.Decode(cacheKey[2:2+8], []byte(filter.Tags["e"][0][0:8*2]))
}

var count int64
var hll *hyperloglog.HyperLogLog

err := b.mdbxEnv.View(func(txn *mdbx.Txn) error {
val, err := txn.Get(b.hllCache, cacheKey)
if err != nil {
if mdbx.IsNotFound(err) {
return nil
}
return err
}
hll = hyperloglog.NewWithRegisters(val, 0) // offset doesn't matter here
count = int64(hll.Count())
return nil
})

return count, hll, err
}

func (b *MDBXBackend) updateHyperLogLogCachedValues(txn *mdbx.Txn, evt *nostr.Event) error {
cacheKey := make([]byte, 2+8)
binary.BigEndian.PutUint16(cacheKey[0:2], uint16(evt.Kind))

for ref, offset := range nip45.HyperLogLogEventPubkeyOffsetsAndReferencesForEvent(evt) {
// setup cache key (reusing buffer)
hex.Decode(cacheKey[2:2+8], []byte(ref[0:8*2]))

// fetch hll value from cache db
hll := hyperloglog.New(offset)
val, err := txn.Get(b.hllCache, cacheKey)
if err == nil {
hll.SetRegisters(val)
} else if !mdbx.IsNotFound(err) {
return err
}

// add this event
hll.Add(evt.PubKey)

// save values back again
if err := txn.Put(b.hllCache, cacheKey, hll.GetRegisters(), 0); err != nil {
return err
}
}

return nil
}
43 changes: 43 additions & 0 deletions mdbx/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package mdbx

import (
"context"
"encoding/hex"
"fmt"

"github.com/erigontech/mdbx-go/mdbx"
"github.com/nbd-wtf/go-nostr"
)

func (b *MDBXBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error {
return b.mdbxEnv.Update(func(txn *mdbx.Txn) error {
return b.delete(txn, evt)
})
}

func (b *MDBXBackend) delete(txn *mdbx.Txn, evt *nostr.Event) error {
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
idx, err := txn.Get(b.indexId, idPrefix8)
if mdbx.IsNotFound(err) {
// we already do not have this
return nil
}
if err != nil {
return fmt.Errorf("failed to get current idx for deleting %x: %w", evt.ID[0:8*2], err)
}

// calculate all index keys we have for this event and delete them
for k := range b.getIndexKeysForEvent(evt) {
err := txn.Del(k.dbi, k.key, idx)
if err != nil {
return fmt.Errorf("failed to delete index entry %s for %x: %w", b.keyName(k), evt.ID[0:8*2], err)
}
}

// delete the raw event
if err := txn.Del(b.rawEventStore, idx, nil); err != nil {
return fmt.Errorf("failed to delete raw event %x (idx %x): %w", evt.ID[0:8*2], idx, err)
}

return nil
}
Loading

0 comments on commit 88deb04

Please sign in to comment.