Skip to content

Commit

Permalink
lmdb: make the querying work like badger, with a single goroutine and…
Browse files Browse the repository at this point in the history
… smarter iteration.
  • Loading branch information
fiatjaf committed Nov 17, 2024
1 parent 2390632 commit 06d2cfe
Show file tree
Hide file tree
Showing 10 changed files with 494 additions and 317 deletions.
101 changes: 0 additions & 101 deletions badger/helpers.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package badger

import (
"cmp"
"encoding/binary"
"encoding/hex"
"iter"
"math"
"strconv"
"strings"

mergesortedslices "fiatjaf.com/lib/merge-sorted-slices"
"github.com/nbd-wtf/go-nostr"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -155,82 +152,6 @@ func getAddrTagElements(tagValue string) (kind uint16, pkb []byte, d string) {
return 0, nil, ""
}

// mergeSortMultipleBatches takes the results of multiple iterators, which are already sorted,
// and merges them into a single big sorted slice
func mergeSortMultiple(batches [][]iterEvent, limit int, dst []iterEvent) []iterEvent {
// clear up empty lists here while simultaneously computing the total count.
// this helps because if there are a bunch of empty lists then this pre-clean
// step will get us in the faster 'merge' branch otherwise we would go to the other.
// we would have to do the cleaning anyway inside it.
// and even if we still go on the other we save one iteration by already computing the
// total count.
total := 0
for i := len(batches) - 1; i >= 0; i-- {
if len(batches[i]) == 0 {
batches = swapDelete(batches, i)
} else {
total += len(batches[i])
}
}

if limit == -1 {
limit = total
}

// this amazing equation will ensure that if one of the two sides goes very small (like 1 or 2)
// the other can go very high (like 500) and we're still in the 'merge' branch.
// if values go somewhere in the middle then they may match the 'merge' branch (batches=20,limit=70)
// or not (batches=25, limit=60)
if math.Log(float64(len(batches)*2))+math.Log(float64(limit)) < 8 {
if dst == nil {
dst = make([]iterEvent, limit)
} else if cap(dst) < limit {
dst = slices.Grow(dst, limit-len(dst))
}
dst = dst[0:limit]
return mergesortedslices.MergeFuncNoEmptyListsIntoSlice(dst, batches, compareIterEvent)
} else {
if dst == nil {
dst = make([]iterEvent, total)
} else if cap(dst) < total {
dst = slices.Grow(dst, total-len(dst))
}
dst = dst[0:total]

// use quicksort in a dumb way that will still be fast because it's cheated
lastIndex := 0
for _, batch := range batches {
copy(dst[lastIndex:], batch)
lastIndex += len(batch)
}

slices.SortFunc(dst, compareIterEvent)

for i, j := 0, total-1; i < j; i, j = i+1, j-1 {
dst[i], dst[j] = dst[j], dst[i]
}

if limit < len(dst) {
return dst[0:limit]
}
return dst
}
}

// batchSizePerNumberOfQueries tries to make an educated guess for the batch size given the total filter limit and
// the number of abstract queries we'll be conducting at the same time
func batchSizePerNumberOfQueries(totalFilterLimit int, numberOfQueries int) int {
if numberOfQueries == 1 || totalFilterLimit*numberOfQueries < 50 {
return totalFilterLimit
}

return int(
math.Ceil(
math.Pow(float64(totalFilterLimit), 0.80) / math.Pow(float64(numberOfQueries), 0.71),
),
)
}

func filterMatchesTags(ef *nostr.Filter, event *nostr.Event) bool {
for f, v := range ef.Tags {
if v != nil && !event.Tags.ContainsAny(f, v) {
Expand All @@ -239,25 +160,3 @@ func filterMatchesTags(ef *nostr.Filter, event *nostr.Event) bool {
}
return true
}

func swapDelete[A any](arr []A, i int) []A {
arr[i] = arr[len(arr)-1]
return arr[:len(arr)-1]
}

func compareIterEvent(a, b iterEvent) int {
if a.Event == nil {
if b.Event == nil {
return 0
} else {
return -1
}
} else if b.Event == nil {
return 1
}

if a.CreatedAt == b.CreatedAt {
return strings.Compare(a.ID, b.ID)
}
return cmp.Compare(a.CreatedAt, b.CreatedAt)
}
70 changes: 33 additions & 37 deletions badger/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,13 @@ import (

"github.com/dgraph-io/badger/v4"
"github.com/fiatjaf/eventstore"
"github.com/fiatjaf/eventstore/internal"
bin "github.com/fiatjaf/eventstore/internal/binary"
"github.com/nbd-wtf/go-nostr"
"golang.org/x/exp/slices"
)

type iterEvent struct {
*nostr.Event
q int
}

var BatchFilled = errors.New("batch-filled")
var batchFilled = errors.New("batch-filled")

func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
Expand Down Expand Up @@ -61,38 +57,38 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c

iterators := make([]*badger.Iterator, len(queries))
exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore
results := make([][]iterEvent, len(queries))
results := make([][]internal.IterEvent, len(queries))
pulledPerQuery := make([]int, len(queries))

// these are kept updated so we never pull from the iterator that is at further distance
// (i.e. the one that has the oldest event among all)
// we will continue to pull from it as soon as some other iterator takes the position
oldest := iterEvent{q: -1}
oldest := internal.IterEvent{Q: -1}

secondPhase := false // after we have gathered enough events we will change the way we iterate
secondBatch := make([][]iterEvent, 0, len(queries)+1)
secondBatch := make([][]internal.IterEvent, 0, len(queries)+1)
secondPhaseParticipants := make([]int, 0, len(queries)+1)

// while merging results in the second phase we will alternate between these two lists
// to avoid having to create new lists all the time
var secondPhaseResultsA []iterEvent
var secondPhaseResultsB []iterEvent
var secondPhaseResultsA []internal.IterEvent
var secondPhaseResultsB []internal.IterEvent
var secondPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating
var secondPhaseHasResultsPending bool

remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing
batchSizePerQuery := batchSizePerNumberOfQueries(limit, remainingUnexhausted)
batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)
firstPhaseTotalPulled := 0

exhaust := func(q int) {
exhausted[q] = true
remainingUnexhausted--
if q == oldest.q {
oldest = iterEvent{q: -1}
if q == oldest.Q {
oldest = internal.IterEvent{Q: -1}
}
}

var firstPhaseResults []iterEvent
var firstPhaseResults []internal.IterEvent

for q := range queries {
iterators[q] = txn.NewIterator(badger.IteratorOptions{
Expand All @@ -102,7 +98,7 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
})
defer iterators[q].Close()
iterators[q].Seek(queries[q].startingPoint)
results[q] = make([]iterEvent, 0, batchSizePerQuery*2)
results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2)
}

// we will reuse this throughout the iteration
Expand All @@ -111,15 +107,15 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
// fmt.Println("queries", len(queries))

for c := 0; ; c++ {
batchSizePerQuery = batchSizePerNumberOfQueries(limit, remainingUnexhausted)
batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted)

// fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery)
// we will go through all the iterators in batches until we have pulled all the required results
for q, query := range queries {
if exhausted[q] {
continue
}
if oldest.q == q && remainingUnexhausted > 1 {
if oldest.Q == q && remainingUnexhausted > 1 {
continue
}
// fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q]))
Expand Down Expand Up @@ -193,7 +189,7 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
}

// this event is good to be used
evt := iterEvent{Event: event, q: q}
evt := internal.IterEvent{Event: event, Q: q}
//
//
if secondPhase {
Expand All @@ -217,7 +213,7 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
// fmt.Println(" new since", since)
// we null the oldest Event as we can't rely on it anymore
// (we'll fall under BWWDHTOE above) until we have a new oldest set.
oldest = iterEvent{q: -1}
oldest = internal.IterEvent{Q: -1}
// anything we got that would be above this won't trigger an update to
// the oldest anyway, because it will be discarded as being after the limit.
//
Expand Down Expand Up @@ -261,15 +257,15 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
pulledPerQuery[q]++
pulledThisIteration++
if pulledThisIteration > batchSizePerQuery {
return BatchFilled
return batchFilled
}
if pulledPerQuery[q] >= limit {
exhaust(q)
return BatchFilled
return batchFilled
}

return nil
}); err == BatchFilled {
}); err == batchFilled {
// fmt.Println(" #")
it.Next()
break
Expand All @@ -296,7 +292,7 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
}

if exhausted[q] {
secondPhaseParticipants = swapDelete(secondPhaseParticipants, s)
secondPhaseParticipants = internal.SwapDelete(secondPhaseParticipants, s)
s--
}
}
Expand All @@ -307,12 +303,12 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
// fmt.Println(" xxx", secondPhaseResultsToggle)
if secondPhaseResultsToggle {
secondBatch = append(secondBatch, secondPhaseResultsB)
secondPhaseResultsA = mergeSortMultiple(secondBatch, limit, secondPhaseResultsA)
secondPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsA)
oldest = secondPhaseResultsA[len(secondPhaseResultsA)-1]
// fmt.Println(" new aggregated a", len(secondPhaseResultsB))
} else {
secondBatch = append(secondBatch, secondPhaseResultsA)
secondPhaseResultsB = mergeSortMultiple(secondBatch, limit, secondPhaseResultsB)
secondPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, secondPhaseResultsB)
oldest = secondPhaseResultsB[len(secondPhaseResultsB)-1]
// fmt.Println(" new aggregated b", len(secondPhaseResultsB))
}
Expand All @@ -331,7 +327,7 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c

// we will exclude this oldest number as it is not relevant anymore
// (we now want to keep track only of the oldest among the remaining iterators)
oldest = iterEvent{q: -1}
oldest = internal.IterEvent{Q: -1}

// HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP)
// now we can combine the results we have and check what is our current oldest event.
Expand All @@ -344,9 +340,9 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
// keep it but also discard the previous since, moving the needle one back -- for example,
// if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15`
// in the process.
all := make([][]iterEvent, len(results))
all := make([][]internal.IterEvent, len(results))
copy(all, results) // we have to use this otherwise mergeSortMultiple will scramble our results slice
firstPhaseResults = mergeSortMultiple(all, limit, nil)
firstPhaseResults = internal.MergeSortMultiple(all, limit, nil)
oldest = firstPhaseResults[limit-1]
since = uint32(oldest.CreatedAt)
// fmt.Println("new since", since)
Expand Down Expand Up @@ -375,8 +371,8 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c

// we create these two lists and alternate between them so we don't have to create a
// a new one every time
secondPhaseResultsA = make([]iterEvent, 0, limit*2)
secondPhaseResultsB = make([]iterEvent, 0, limit*2)
secondPhaseResultsA = make([]internal.IterEvent, 0, limit*2)
secondPhaseResultsB = make([]internal.IterEvent, 0, limit*2)

// from now on we won't run this block anymore
secondPhase = true
Expand All @@ -390,13 +386,13 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c

// fmt.Println("is secondPhase?", secondPhase)

var combinedResults []iterEvent
var combinedResults []internal.IterEvent

if secondPhase {
// fmt.Println("ending second phase")
// when we reach this point either secondPhaseResultsA or secondPhaseResultsB will be full of stuff,
// the other will be empty
var secondPhaseResults []iterEvent
var secondPhaseResults []internal.IterEvent
// fmt.Println("xxx", secondPhaseResultsToggle, len(secondPhaseResultsA), len(secondPhaseResultsB))
if secondPhaseResultsToggle {
secondPhaseResults = secondPhaseResultsB
Expand All @@ -408,12 +404,12 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
// fmt.Println(" using a", len(secondPhaseResultsA))
}

all := [][]iterEvent{firstPhaseResults, secondPhaseResults}
combinedResults = mergeSortMultiple(all, limit, combinedResults)
all := [][]internal.IterEvent{firstPhaseResults, secondPhaseResults}
combinedResults = internal.MergeSortMultiple(all, limit, combinedResults)
// fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit)
} else {
combinedResults = make([]iterEvent, limit)
combinedResults = mergeSortMultiple(results, limit, combinedResults)
combinedResults = make([]internal.IterEvent, limit)
combinedResults = internal.MergeSortMultiple(results, limit, combinedResults)
}

for _, evt := range combinedResults {
Expand Down
Loading

0 comments on commit 06d2cfe

Please sign in to comment.