Skip to content

Commit

Permalink
badger: nice fuzz tests and some fixes to query.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Oct 3, 2024
1 parent a741bda commit 14c3818
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 16 deletions.
146 changes: 146 additions & 0 deletions badger/fuzz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package badger

import (
"cmp"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"testing"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/fiatjaf/eventstore"
"github.com/nbd-wtf/go-nostr"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

func FuzzQuery(f *testing.F) {
ctx := context.Background()

f.Add(uint(200), uint(50), uint(13), uint(2), uint(2), uint(0), uint(1))
f.Fuzz(func(t *testing.T, total, limit, authors, timestampAuthorFactor, seedFactor, kinds, kindFactor uint) {
total++
authors++
seedFactor++
kindFactor++
if kinds == 1 {
kinds++
}
if limit == 0 {
return
}

// ~ setup db

bdb, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
if err != nil {
t.Fatalf("failed to create database: %s", err)
return
}
db := &BadgerBackend{}
db.DB = bdb
db.seq, err = db.GetSequence([]byte("events"), 1000)
if err != nil {
t.Fatalf("error: %s", err)
return
}

if err := db.runMigrations(); err != nil {
t.Fatalf("error: %s", err)
return
}

db.MaxLimit = 500
defer db.Close()

// ~ start actual test

filter := nostr.Filter{
Authors: make([]string, authors),
Limit: int(limit),
}
maxKind := 1
if kinds > 0 {
filter.Kinds = make([]int, kinds)
for i := range filter.Kinds {
filter.Kinds[i] = int(kindFactor) * i
}
maxKind = filter.Kinds[len(filter.Kinds)-1]
}

for i := 0; i < int(authors); i++ {
sk := make([]byte, 32)
binary.BigEndian.PutUint32(sk, uint32(i%int(authors*seedFactor))+1)
pk, _ := nostr.GetPublicKey(hex.EncodeToString(sk))
filter.Authors[i] = pk
}

expected := make([]*nostr.Event, 0, total)
for i := 0; i < int(total); i++ {
skseed := uint32(i%int(authors*seedFactor)) + 1
sk := make([]byte, 32)
binary.BigEndian.PutUint32(sk, skseed)

evt := &nostr.Event{
CreatedAt: nostr.Timestamp(skseed)*nostr.Timestamp(timestampAuthorFactor) + nostr.Timestamp(i),
Content: fmt.Sprintf("unbalanced %d", i),
Tags: nostr.Tags{},
Kind: i % maxKind,
}
err := evt.Sign(hex.EncodeToString(sk))
require.NoError(t, err)

err = db.SaveEvent(ctx, evt)
require.NoError(t, err)

if filter.Matches(evt) {
expected = append(expected, evt)
}
}

slices.SortFunc(expected, nostr.CompareEventPtrReverse)
if len(expected) > int(limit) {
expected = expected[0:limit]
}

w := eventstore.RelayWrapper{Store: db}

start := time.Now()
fmt.Println(filter)
res, err := w.QuerySync(ctx, filter)
end := time.Now()

require.NoError(t, err)
require.Equal(t, len(expected), len(res), "number of results is different than expected")

require.Less(t, end.Sub(start).Milliseconds(), int64(1500), "query took too long")
require.True(t, slices.IsSortedFunc(res, func(a, b *nostr.Event) int { return cmp.Compare(b.CreatedAt, a.CreatedAt) }), "results are not sorted")

nresults := len(expected)

getTimestamps := func(events []*nostr.Event) []nostr.Timestamp {
res := make([]nostr.Timestamp, len(events))
for i, evt := range events {
res[i] = evt.CreatedAt
}
return res
}

// fmt.Println(" expected result")
// ets := getTimestamps(expected)
// rts := getTimestamps(res)
// for i := range ets {
// fmt.Println(" ", ets[i], " ", rts[i], " ", i)
// }

require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong")
require.Equal(t, expected[nresults-1].CreatedAt, res[nresults-1].CreatedAt, "last result is wrong")
require.Equal(t, getTimestamps(expected), getTimestamps(res))

for _, evt := range res {
require.True(t, filter.Matches(evt), "event %s doesn't match filter %s", evt, filter)
}
})
}
29 changes: 13 additions & 16 deletions badger/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
var secondPhaseResultsA []iterEvent
var secondPhaseResultsB []iterEvent
var secondPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating
var secondPhaseHasResultsPending bool

remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing
batchSizePerQuery := batchSizePerNumberOfQueries(limit, remainingUnexhausted)
Expand Down Expand Up @@ -100,25 +101,23 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
for c := 0; ; c++ {
batchSizePerQuery = batchSizePerNumberOfQueries(limit, remainingUnexhausted)

// fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batch size", batchSizePerQuery)
// fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery)
// we will go through all the iterators in batches until we have pulled all the required results
for q, query := range queries {
if exhausted[q] {
// fmt.Println(" exhausted")
continue
}
if oldest.q == q && remainingUnexhausted > 1 {
// fmt.Println(" skipping for now")
continue
}
// fmt.Println(" query", q, unsafe.Pointer(&results[q]), len(results[q]))
// fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q]))

it := iterators[q]
pulledThisIteration := 0

for {
if !it.Valid() {
// fmt.Println(" reached end")
// fmt.Println(" reached end")
exhaust(q)
it.Next()
break
Expand Down Expand Up @@ -198,6 +197,7 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
// when we don't have the oldest set, we will keep the results
// and not change the cutting point -- it's bad, but hopefully not that bad.
results[q] = append(results[q], evt)
secondPhaseHasResultsPending = true
} else if nextThreshold.CreatedAt > oldest.CreatedAt {
// fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt)
// one of the events we have stored is the actual next threshold
Expand All @@ -214,6 +214,7 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
// finally
// add this to the results to be merged later
results[q] = append(results[q], evt)
secondPhaseHasResultsPending = true
} else if nextThreshold.CreatedAt < evt.CreatedAt {
// the next last event in the firstPhaseResults is the next threshold
// fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt)
Expand All @@ -223,6 +224,7 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
// fmt.Println(" new since", since)
// add this to the results to be merged later
results[q] = append(results[q], evt)
secondPhaseHasResultsPending = true
// update the oldest event
if evt.CreatedAt < oldest.CreatedAt {
oldest = evt
Expand Down Expand Up @@ -271,7 +273,7 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch

// we will do this check if we don't accumulated the requested number of events yet
// fmt.Println("oldest", oldest.Event, "from iter", oldest.q)
if secondPhase {
if secondPhase && secondPhaseHasResultsPending && (oldest.Event == nil || remainingUnexhausted == 0) {
// fmt.Println("second phase aggregation!")
// when we are in the second phase we will aggressively aggregate results on every iteration
//
Expand Down Expand Up @@ -314,8 +316,8 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
for _, q := range secondPhaseParticipants {
results[q] = results[q][:0]
}
} else if firstPhaseTotalPulled >= limit {
// fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit)
} else if !secondPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 {
// fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted)

// we will exclude this oldest number as it is not relevant anymore
// (we now want to keep track only of the oldest among the remaining iterators)
Expand All @@ -332,7 +334,9 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
// keep it but also discard the previous since, moving the needle one back -- for example,
// if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15`
// in the process.
firstPhaseResults = mergeSortMultiple(results, limit, nil)
all := make([][]iterEvent, len(results))
copy(all, results) // we have to use this otherwise mergeSortMultiple will scramble our results slice
firstPhaseResults = mergeSortMultiple(all, limit, nil)
oldest = firstPhaseResults[limit-1]
since = uint32(oldest.CreatedAt)
// fmt.Println("new since", since)
Expand Down Expand Up @@ -366,8 +370,6 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch

// from now on we won't run this block anymore
secondPhase = true
} else {
// fmt.Println("not enough", firstPhaseTotalPulled, "/", limit)
}

// fmt.Println("remaining", remainingUnexhausted)
Expand All @@ -376,10 +378,6 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
}
}

// fmt.Println("results", len(results))
// for q, res := range results {
// fmt.Println(" ", q, len(res))
// }
// fmt.Println("is secondPhase?", secondPhase)

var combinedResults []iterEvent
Expand All @@ -401,7 +399,6 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
}

all := [][]iterEvent{firstPhaseResults, secondPhaseResults}
// fmt.Println("~", len(all[0]), len(all[1]))
combinedResults = mergeSortMultiple(all, limit, combinedResults)
// fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit)
} else {
Expand Down
8 changes: 8 additions & 0 deletions badger/testdata/fuzz/FuzzQuery/2387982a59ec5d22
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
uint(256)
uint(31)
uint(260)
uint(2)
uint(69)
uint(385)
uint(1)
8 changes: 8 additions & 0 deletions badger/testdata/fuzz/FuzzQuery/25234b78dd36a5fd
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
uint(267)
uint(50)
uint(355)
uint(2)
uint(69)
uint(213)
uint(1)
8 changes: 8 additions & 0 deletions badger/testdata/fuzz/FuzzQuery/35a474e7be3cdc57
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
uint(280)
uint(0)
uint(13)
uint(2)
uint(2)
uint(0)
uint(0)
8 changes: 8 additions & 0 deletions badger/testdata/fuzz/FuzzQuery/6e88633b00eff43d
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
uint(259)
uint(126)
uint(5)
uint(23)
uint(0)
uint(0)
uint(92)
8 changes: 8 additions & 0 deletions badger/testdata/fuzz/FuzzQuery/70a3844d6c7ec116
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
uint(201)
uint(50)
uint(13)
uint(97)
uint(0)
uint(0)
uint(77)
8 changes: 8 additions & 0 deletions badger/testdata/fuzz/FuzzQuery/98cca88a26b20e30
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
uint(164)
uint(50)
uint(13)
uint(1)
uint(2)
uint(13)
uint(0)

0 comments on commit 14c3818

Please sign in to comment.