Skip to content

Commit

Permalink
mdbx and lmdb related fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Dec 16, 2024
1 parent 88deb04 commit f731c34
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 142 deletions.
87 changes: 28 additions & 59 deletions lmdb/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,21 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil {
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, lmdb.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
it := &iterator{cursor: cursor}
it.seek(q.startingPoint)

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
if it.err != nil ||
len(it.key) != q.keySize ||
!bytes.HasPrefix(it.key, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
if createdAt < since {
break
}
Expand All @@ -69,37 +54,37 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int
count++
} else {
// fetch actual event
val, err := txn.Get(b.rawEventStore, idx)
val, err := txn.Get(b.rawEventStore, it.valIdx)
if err != nil {
panic(err)
}

// check it against pubkeys without decoding the entire thing
if !slices.Contains(extraAuthors, [32]byte(val[32:64])) {
goto loopend
it.next()
continue
}

// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
it.next()
continue
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
it.next()
continue
}

// if there is still a tag to be checked, do it now
if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) {
goto loopend
it.next()
continue
}

count++
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
}

Expand Down Expand Up @@ -133,43 +118,28 @@ func (b *LMDBBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, o
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil {
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, lmdb.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
it := &iterator{cursor: cursor}
it.seek(q.startingPoint)

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
if it.err != nil ||
len(it.key) != q.keySize ||
!bytes.HasPrefix(it.key, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
if createdAt < since {
break
}
}

// fetch actual event (we need it regardless because we need the pubkey for the hll)
val, err := txn.Get(b.rawEventStore, idx)
val, err := txn.Get(b.rawEventStore, it.valIdx)
if err != nil {
panic(err)
}
Expand All @@ -181,26 +151,25 @@ func (b *LMDBBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, o
} else {
// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
it.next()
continue
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
it.next()
continue
}

// if there is still a tag to be checked, do it now
if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) {
goto loopend
it.next()
continue
}

count++
hll.Add(evt.PubKey)
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
}
}

Expand Down
3 changes: 2 additions & 1 deletion lmdb/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ func (b *LMDBBackend) initialize() error {
}
defer cursor.Close()
k, _, err := cursor.Get(nil, nil, lmdb.Last)
if operr, ok := err.(*lmdb.OpError); ok && operr.Errno == lmdb.NotFound {
if lmdb.IsNotFound(err) {
// nothing found, so we're at zero
return nil
}
if err != nil {
return err
}
b.lastId.Store(binary.BigEndian.Uint32(k))

Expand Down
2 changes: 1 addition & 1 deletion lmdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (b *LMDBBackend) runMigrations() error {
var version uint16
v, err := txn.Get(b.settingsStore, []byte{DB_VERSION})
if err != nil {
if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno == lmdb.NotFound {
if lmdb.IsNotFound(err) {
version = 0
} else if v == nil {
return fmt.Errorf("failed to read database version: %w", err)
Expand Down
20 changes: 11 additions & 9 deletions lmdb/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ func (b *LMDBBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
}

return b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
// modify hyperloglog caches relative to this
useCache, skipSaving := b.EnableHLLCacheFor(evt.Kind)
if b.EnableHLLCacheFor != nil {
// modify hyperloglog caches relative to this
useCache, skipSaving := b.EnableHLLCacheFor(evt.Kind)

if useCache {
err := b.updateHyperLogLogCachedValues(txn, evt)
if err != nil {
return fmt.Errorf("failed to update hll cache: %w", err)
}
if skipSaving {
return nil
if useCache {
err := b.updateHyperLogLogCachedValues(txn, evt)
if err != nil {
return fmt.Errorf("failed to update hll cache: %w", err)
}
if skipSaving {
return nil
}
}
}

Expand Down
85 changes: 28 additions & 57 deletions mdbx/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,21 @@ func (b *MDBXBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, mdbx.SetRange); errsr != nil {
if operr, ok := errsr.(*mdbx.OpError); !ok || operr.Errno != mdbx.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, mdbx.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}
it := &iterator{cursor: cursor}
it.seek(q.startingPoint)

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
if it.err != nil ||
len(it.key) != q.keySize ||
!bytes.HasPrefix(it.key, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
if createdAt < since {
break
}
Expand All @@ -69,37 +54,39 @@ func (b *MDBXBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int
count++
} else {
// fetch actual event
val, err := txn.Get(b.rawEventStore, idx)
val, err := txn.Get(b.rawEventStore, it.valIdx)
if err != nil {
panic(err)
}

// check it against pubkeys without decoding the entire thing
if !slices.Contains(extraAuthors, [32]byte(val[32:64])) {
goto loopend
it.next()
continue
}

// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
it.next()
continue
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
it.next()
continue
}

// if there is still a tag to be checked, do it now
if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) {
goto loopend
it.next()
continue
}

count++
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}
}

Expand Down Expand Up @@ -133,43 +120,28 @@ func (b *MDBXBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, o
continue
}

var k []byte
var idx []byte
var iterr error

if _, _, errsr := cursor.Get(q.startingPoint, nil, mdbx.SetRange); errsr != nil {
if operr, ok := errsr.(*mdbx.OpError); !ok || operr.Errno != mdbx.NotFound {
// in this case it's really an error
panic(operr)
} else {
// we're at the end and we just want notes before this,
// so we just need to set the cursor the last key, this is not a real error
k, idx, iterr = cursor.Get(nil, nil, mdbx.Last)
}
} else {
// move one back as the first step
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}
it := &iterator{cursor: cursor}
it.seek(q.startingPoint)

for {
// we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil ||
len(k) != q.keySize ||
!bytes.HasPrefix(k, q.prefix) {
if it.err != nil ||
len(it.key) != q.keySize ||
!bytes.HasPrefix(it.key, q.prefix) {
// either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one
}

// "id" indexes don't contain a timestamp
if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:])
if createdAt < since {
break
}
}

// fetch actual event (we need it regardless because we need the pubkey for the hll)
val, err := txn.Get(b.rawEventStore, idx)
val, err := txn.Get(b.rawEventStore, it.valIdx)
if err != nil {
panic(err)
}
Expand All @@ -181,26 +153,25 @@ func (b *MDBXBackend) CountEventsHLL(ctx context.Context, filter nostr.Filter, o
} else {
// check it against kinds without decoding the entire thing
if !slices.Contains(extraKinds, [2]byte(val[132:134])) {
goto loopend
it.next()
continue
}

evt := &nostr.Event{}
if err := bin.Unmarshal(val, evt); err != nil {
goto loopend
it.next()
continue
}

// if there is still a tag to be checked, do it now
if !evt.Tags.ContainsAny(extraTagKey, extraTagValues) {
goto loopend
it.next()
continue
}

count++
hll.Add(evt.PubKey)
}

// move one back (we'll look into k and v and err in the next iteration)
loopend:
k, idx, iterr = cursor.Get(nil, nil, mdbx.Prev)
}
}

Expand Down
Loading

0 comments on commit f731c34

Please sign in to comment.