Skip to content

Commit

Permalink
lmdb: version 5, new ptagKind index and other optimizations.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Sep 24, 2024
1 parent ca5dc88 commit 727ff16
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 160 deletions.
4 changes: 2 additions & 2 deletions lmdb/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int
for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.prefixSize+q.timestampSize ||
!bytes.Equal(k[:q.prefixSize], q.prefix) {
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}
Expand Down
4 changes: 3 additions & 1 deletion lmdb/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func (b *LMDBBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error {

// calculate all index keys we have for this event and delete them
for _, k := range b.getIndexKeysForEvent(evt) {
if err := txn.Del(k.dbi, k.key, idx); err != nil {
err := txn.Del(k.dbi, k.key, idx)
k.free()
if err != nil {
return err
}
}
Expand Down
133 changes: 82 additions & 51 deletions lmdb/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,26 @@ package lmdb
import (
"encoding/binary"
"encoding/hex"
"strconv"
"strings"
"sync"

"github.com/PowerDNS/lmdb-go/lmdb"
"github.com/fiatjaf/eventstore"
"github.com/nbd-wtf/go-nostr"
"golang.org/x/exp/slices"
)

func (b *LMDBBackend) getTagIndexPrefix(tagValue string) (lmdb.DBI, []byte, int) {
var k []byte // the key with full length for created_at and idx at the end, but not filled with these
var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start
var dbi lmdb.DBI
var indexKeyPool = sync.Pool{
New: func() any { return make([]byte, 2+8+4+30) },
}

if kind, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 {
// store value in the new special "a" tag index
k = make([]byte, 2+8+len(d)+4)
binary.BigEndian.PutUint16(k[1:], kind)
copy(k[2:], pkb[0:8])
copy(k[2+8:], d)
offset = 2 + 8 + len(d)
dbi = b.indexTagAddr
} else if vb, _ := hex.DecodeString(tagValue); len(vb) == 32 {
// store value as bytes
k = make([]byte, 8+4)
copy(k, vb[0:8])
offset = 8
dbi = b.indexTag32
} else {
// store whatever as utf-8
k = make([]byte, len(tagValue)+4)
copy(k, tagValue)
offset = len(tagValue)
dbi = b.indexTag
}
type key struct {
dbi lmdb.DBI
key []byte
}

return dbi, k, offset
func (key key) free() {
indexKeyPool.Put(key.key)
}

func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key {
Expand All @@ -46,39 +31,38 @@ func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key {
// indexes
{
// ~ by id
idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2])
k := idPrefix8
keys = append(keys, key{dbi: b.indexId, key: k})
k := indexKeyPool.Get().([]byte)
hex.Decode(k[0:8], []byte(evt.ID[0:8*2]))
keys = append(keys, key{dbi: b.indexId, key: k[0:8]})
}

{
// ~ by pubkey+date
pubkeyPrefix8, _ := hex.DecodeString(evt.PubKey[0 : 8*2])
k := make([]byte, 8+4)
copy(k[:], pubkeyPrefix8)
binary.BigEndian.PutUint32(k[8:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexPubkey, key: k})
k := indexKeyPool.Get().([]byte)
hex.Decode(k[0:8], []byte(evt.PubKey[0:8*2]))
binary.BigEndian.PutUint32(k[8:8+4], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexPubkey, key: k[0 : 8+4]})
}

{
// ~ by kind+date
k := make([]byte, 2+4)
binary.BigEndian.PutUint16(k[:], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexKind, key: k})
k := indexKeyPool.Get().([]byte)
binary.BigEndian.PutUint16(k[0:2], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[2:2+4], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexKind, key: k[0 : 2+4]})
}

{
// ~ by pubkey+kind+date
pubkeyPrefix8, _ := hex.DecodeString(evt.PubKey[0 : 8*2])
k := make([]byte, 8+2+4)
copy(k[:], pubkeyPrefix8)
binary.BigEndian.PutUint16(k[8:], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[8+2:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexPubkeyKind, key: k})
k := indexKeyPool.Get().([]byte)
hex.Decode(k[0:8], []byte(evt.PubKey[0:8*2]))
binary.BigEndian.PutUint16(k[8:8+2], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[8+2:8+2+4], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexPubkeyKind, key: k[0 : 8+2+4]})
}

// ~ by tagvalue+date
// ~ by p-tag+kind+date
for i, tag := range evt.Tags {
if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 {
// not indexable
Expand All @@ -92,19 +76,66 @@ func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key {

// get key prefix (with full length) and offset where to write the created_at
dbi, k, offset := b.getTagIndexPrefix(tag[1])

// write the created_at
binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt))

keys = append(keys, key{dbi: dbi, key: k})

// now the p-tag+kind+date
if dbi == b.indexTag32 && tag[0] == "p" {
k := indexKeyPool.Get().([]byte)
hex.Decode(k[0:8], []byte(tag[1][0:8*2]))
binary.BigEndian.PutUint16(k[8:8+2], uint16(evt.Kind))
binary.BigEndian.PutUint32(k[8+2:8+2+4], uint32(evt.CreatedAt))
dbi := b.indexPTagKind
keys = append(keys, key{dbi: dbi, key: k[0 : 8+2+4]})
}
}

{
// ~ by date only
k := make([]byte, 4)
binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexCreatedAt, key: k})
k := indexKeyPool.Get().([]byte)
binary.BigEndian.PutUint32(k[0:4], uint32(evt.CreatedAt))
keys = append(keys, key{dbi: b.indexCreatedAt, key: k[0:4]})
}

return keys
}

func (b *LMDBBackend) getTagIndexPrefix(tagValue string) (lmdb.DBI, []byte, int) {
var k []byte // the key with full length for created_at and idx at the end, but not filled with these
var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start
var dbi lmdb.DBI

k = indexKeyPool.Get().([]byte)

// if it's 32 bytes as hex, save it as bytes
if len(tagValue) == 64 {
// but we actually only use the first 8 bytes
if _, err := hex.Decode(k[0:8], []byte(tagValue[0:8*2])); err == nil {
offset = 8
dbi = b.indexTag32
return dbi, k[0 : offset+4], offset
}
}

// if it looks like an "a" tag, index it in this special format
spl := strings.Split(tagValue, ":")
if len(spl) == 3 && len(spl[1]) == 64 {
if _, err := hex.Decode(k[2:2+8], []byte(tagValue[0:8*2])); err == nil {
if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil {
k[0] = byte(kind >> 8)
k[1] = byte(kind)
// limit "d" identifier to 30 bytes (so we don't have to grow our byte slice)
n := copy(k[2+8:2+8+30], spl[2])
offset = 2 + 8 + n
return dbi, k[0 : offset+4], offset
}
}
}

// index whatever else as utf-8, but limit it to 40 bytes
n := copy(k[0:40], tagValue)
offset = n
dbi = b.indexTag

return dbi, k[0 : n+4], offset
}
15 changes: 8 additions & 7 deletions lmdb/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var _ eventstore.Store = (*LMDBBackend)(nil)
type LMDBBackend struct {
Path string
MaxLimit int
MapSize int64
MapSize int64

lmdbEnv *lmdb.Env

Expand All @@ -33,6 +33,7 @@ type LMDBBackend struct {
indexTag lmdb.DBI
indexTag32 lmdb.DBI
indexTagAddr lmdb.DBI
indexPTagKind lmdb.DBI

lastId atomic.Uint32
}
Expand All @@ -48,7 +49,7 @@ func (b *LMDBBackend) Init() error {
return err
}

env.SetMaxDBs(10)
env.SetMaxDBs(11)
env.SetMaxReaders(1000)
if b.MapSize == 0 {
env.SetMapSize(1 << 38) // ~273GB
Expand Down Expand Up @@ -121,6 +122,11 @@ func (b *LMDBBackend) Init() error {
} else {
b.indexTagAddr = dbi
}
if dbi, err := txn.OpenDBI("ptagKind", multiIndexCreationFlags); err != nil {
return err
} else {
b.indexPTagKind = dbi
}
return nil
}); err != nil {
return err
Expand Down Expand Up @@ -161,8 +167,3 @@ func (b *LMDBBackend) Serial() []byte {
binary.BigEndian.PutUint32(vb[:], uint32(v))
return vb
}

type key struct {
dbi lmdb.DBI
key []byte
}
51 changes: 50 additions & 1 deletion lmdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package lmdb

import (
"encoding/binary"
"encoding/hex"
"fmt"
"log"

"github.com/PowerDNS/lmdb-go/lmdb"
bin "github.com/fiatjaf/eventstore/internal/binary"
"github.com/nbd-wtf/go-nostr"
)

const (
Expand Down Expand Up @@ -53,8 +57,53 @@ func (b *LMDBBackend) runMigrations() error {
b.bumpVersion(txn, 4)
}

// this is when we added the ptag-kind-createdat index
if version < 5 {
// ...
log.Println("[lmdb] migration 5: reindex events with \"p\" tags for the ptagKind index")

cursor, err := txn.OpenCursor(b.indexTag32)
if err != nil {
return fmt.Errorf("failed to open cursor in migration 5: %w", err)
}
defer cursor.Close()

key, idx, err := cursor.Get(nil, nil, lmdb.First)
for err == nil {
if val, err := txn.Get(b.rawEventStore, idx); err != nil {
return fmt.Errorf("error getting binary event for %x on migration 5: %w", idx, err)
} else {
evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
return fmt.Errorf("error decoding event %x on migration 5: %w", idx, err)
}

tagFirstChars := hex.EncodeToString(key[0:8])
// we do this to prevent indexing other tags as "p" and also to not index the same event twice
// this ensure we only do one ptagKind for each tag32 entry (if the tag32 happens to be a "p")
if evt.Tags.GetFirst([]string{"p", tagFirstChars}) != nil {
log.Printf("[lmdb] applying to key %x", key)
newkey := make([]byte, 8+2+4)
copy(newkey, key[0:8])
binary.BigEndian.PutUint16(newkey[8:8+2], uint16(evt.Kind))
binary.BigEndian.PutUint32(newkey[8+2:8+2+4], uint32(evt.CreatedAt))
if err := txn.Put(b.indexPTagKind, newkey, idx, 0); err != nil {
return fmt.Errorf("error saving tag on migration 5: %w", err)
}
}
}

// next
key, idx, err = cursor.Get(nil, nil, lmdb.Next)
}
if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno != lmdb.NotFound {
// exited the loop with an error different from NOTFOUND
return err
}

// bump version
if err := b.bumpVersion(txn, 5); err != nil {
return err
}
}

return nil
Expand Down
25 changes: 13 additions & 12 deletions lmdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,6 @@ import (
"github.com/nbd-wtf/go-nostr"
)

type query struct {
i int
dbi lmdb.DBI
prefix []byte
results chan *nostr.Event
prefixSize int
timestampSize int
startingPoint []byte
}

type queryEvent struct {
*nostr.Event
query int
Expand All @@ -50,6 +40,11 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha

go func() {
defer close(ch)
defer func() {
for _, q := range queries {
q.free()
}
}()

for _, q := range queries {
q := q
Expand Down Expand Up @@ -87,8 +82,8 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.prefixSize+q.timestampSize ||
!bytes.Equal(k[:q.prefixSize], q.prefix) {
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
// either iteration has errored or we reached the end of this prefix
return nil
}
Expand Down Expand Up @@ -173,11 +168,16 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
heap.Init(&emitQueue)

// iterate until we've emitted all events required
var lastEmitted string
for {
// emit latest event in queue
latest := emitQueue[0]
if lastEmitted != "" && latest.ID == lastEmitted {
goto skip
}
select {
case ch <- latest.Event:
lastEmitted = latest.ID
case <-ctx.Done():
return
}
Expand All @@ -188,6 +188,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
break
}

skip:
// fetch a new one from query results and replace the previous one with it
if evt, ok := <-queries[latest.query].results; ok {
emitQueue[0].Event = evt
Expand Down
Loading

0 comments on commit 727ff16

Please sign in to comment.