Skip to content

Commit

Permalink
lmdb: fixes for previous commit, now it works.
Browse files Browse the repository at this point in the history
benchmarks for LMDB:

goos: linux
goarch: amd64
pkg: github.com/fiatjaf/eventstore/test
cpu: AMD Ryzen 3 3200G with Radeon Vega Graphics
BenchmarkLMDB/filter/q-0-4     	  787862	      1465 ns/op	    1652 B/op	      28 allocs/op
BenchmarkLMDB/filter/q-1-4     	 1000000	      1271 ns/op	    1060 B/op	      18 allocs/op
BenchmarkLMDB/filter/q-2-4     	  754543	      1754 ns/op	    1803 B/op	      27 allocs/op
BenchmarkLMDB/filter/q-3-4     	 1000000	      1314 ns/op	     718 B/op	      10 allocs/op
BenchmarkLMDB/filter/q-4-4     	 1000000	      1296 ns/op	     716 B/op	      10 allocs/op
BenchmarkLMDB/filter/q-5-4     	  907354	      1317 ns/op	    1099 B/op	      17 allocs/op
BenchmarkLMDB/filter/q-6-4     	  747927	      1823 ns/op	    1803 B/op	      27 allocs/op
BenchmarkLMDB/filter/q-7-4     	  711120	      1824 ns/op	    1803 B/op	      27 allocs/op
BenchmarkLMDB/filter/q-8-4     	  204600	      6072 ns/op	    7148 B/op	     108 allocs/op
BenchmarkLMDB/filter/q-9-4     	  204714	      6193 ns/op	    7148 B/op	     108 allocs/op
BenchmarkLMDB/filter/q-10-4    	  217402	      6015 ns/op	    7148 B/op	     107 allocs/op
BenchmarkLMDB/filter/q-11-4    	  221179	      6032 ns/op	    7148 B/op	     107 allocs/op
BenchmarkLMDB/insert-4         	   10000	    101821 ns/op	   73984 B/op	      13 allocs/op

and before how it was:

goos: linux
goarch: amd64
pkg: github.com/fiatjaf/eventstore/test
cpu: AMD Ryzen 3 3200G with Radeon Vega Graphics
BenchmarkLMDB/filter/q-0-4     	  132798	      7966 ns/op	    4476 B/op	      58 allocs/op
BenchmarkLMDB/filter/q-1-4     	  643911	      5815 ns/op	    2860 B/op	      34 allocs/op
BenchmarkLMDB/filter/q-2-4     	  344509	      8224 ns/op	    4511 B/op	      58 allocs/op
BenchmarkLMDB/filter/q-3-4     	 1000000	     20966 ns/op	    1967 B/op	      20 allocs/op
BenchmarkLMDB/filter/q-4-4
signal: killed
FAIL	github.com/fiatjaf/eventstore/test	128.395s
FAIL

i.e. it couldn't even complete because some stuff would happen, probably with all the goroutines trying to read
from the same disk places or whatever.

now it is so much better.

weirdly, though, if we run the benchmarks with a fixed number of runs for each, like -benchtime=100x, then
both versions will complete everything, and in similar time, so I don't know what to conclude. maybe that
the new version performs much better under heavy load? I don't know.

this is badger without a fixed number of runs, for comparison:

badger 2024/11/18 17:49:39 INFO: All 1 tables opened in 2ms
badger 2024/11/18 17:49:39 INFO: Discard stats nextEmptySlot: 0
badger 2024/11/18 17:49:39 INFO: Set nextTxnTs to 20002
badger 2024/11/18 17:49:39 INFO: Deleting empty file: /tmp/eventstore-testbadger/000001.vlog
goos: linux
goarch: amd64
pkg: github.com/fiatjaf/eventstore/test
cpu: AMD Ryzen 3 3200G with Radeon Vega Graphics
BenchmarkBadger/filter/q-0-4     	   49177	     93349 ns/op	  125044 B/op	    1609 allocs/op
BenchmarkBadger/filter/q-1-4     	signal: killed
FAIL	github.com/fiatjaf/eventstore/test	61.454s
FAIL

total failure.
  • Loading branch information
fiatjaf committed Nov 18, 2024
1 parent 06d2cfe commit 6adb45c
Show file tree
Hide file tree
Showing 16 changed files with 41 additions and 31 deletions.
8 changes: 3 additions & 5 deletions badger/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func FuzzQuery(f *testing.F) {
w := eventstore.RelayWrapper{Store: db}

start := time.Now()
fmt.Println(filter)
// fmt.Println(filter)
res, err := w.QuerySync(ctx, filter)
end := time.Now()

Expand All @@ -143,10 +143,8 @@ func FuzzQuery(f *testing.F) {
}

// fmt.Println(" expected result")
// ets := getTimestamps(expected)
// rts := getTimestamps(res)
// for i := range ets {
// fmt.Println(" ", ets[i], " ", rts[i], " ", i)
// for i := range expected {
// fmt.Println(" ", expected[i].CreatedAt, expected[i].ID[0:8], " ", res[i].CreatedAt, res[i].ID[0:8], " ", i)
// }

require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong")
Expand Down
2 changes: 1 addition & 1 deletion badger/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
}

// we will do this check if we don't accumulated the requested number of events yet
// fmt.Println("oldest", oldest.Event, "from iter", oldest.q)
// fmt.Println("oldest", oldest.Event, "from iter", oldest.Q)
if secondPhase && secondPhaseHasResultsPending && (oldest.Event == nil || remainingUnexhausted == 0) {
// fmt.Println("second phase aggregation!")
// when we are in the second phase we will aggressively aggregate results on every iteration
Expand Down
1 change: 1 addition & 0 deletions badger/testdata/fuzz/FuzzQuery
8 changes: 8 additions & 0 deletions internal/testdata/fuzz/FuzzQuery/dabb8bfe01b215a2
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
uint(200)
uint(50)
uint(13)
uint(8)
uint(2)
uint(0)
uint(1)
27 changes: 13 additions & 14 deletions lmdb/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -33,6 +34,10 @@ func FuzzQuery(f *testing.F) {
}

// ~ setup db
if err := os.RemoveAll("/tmp/lmdbtest"); err != nil {
t.Fatal(err)
return
}
db := &LMDBBackend{}
db.Path = "/tmp/lmdbtest"
db.extraFlags = lmdb.NoSync
Expand Down Expand Up @@ -96,20 +101,14 @@ func FuzzQuery(f *testing.F) {
w := eventstore.RelayWrapper{Store: db}

start := time.Now()
// fmt.Println(filter)

res, err := w.QuerySync(ctx, filter)
end := time.Now()

require.NoError(t, err)
require.Equal(t, len(expected), len(res), "number of results is different than expected")

require.Less(t, end.Sub(start).Milliseconds(), int64(1500), "query took too long")
fmt.Println("---")
for _, evt := range res {
fmt.Println(evt.CreatedAt)
}
require.True(t, slices.IsSortedFunc(res, func(a, b *nostr.Event) int { return cmp.Compare(b.CreatedAt, a.CreatedAt) }), "results are not sorted")

nresults := len(expected)

getTimestamps := func(events []*nostr.Event) []nostr.Timestamp {
Expand All @@ -120,19 +119,19 @@ func FuzzQuery(f *testing.F) {
return res
}

// fmt.Println(" expected result")
// ets := getTimestamps(expected)
// rts := getTimestamps(res)
// for i := range ets {
// fmt.Println(" ", ets[i], " ", rts[i], " ", i)
// }
fmt.Println(" expected result")
for i := range expected {
fmt.Println(" ", expected[i].CreatedAt, expected[i].ID[0:8], " ", res[i].CreatedAt, res[i].ID[0:8], " ", i)
}

require.Equal(t, expected[0].CreatedAt, res[0].CreatedAt, "first result is wrong")
require.Equal(t, expected[nresults-1].CreatedAt, res[nresults-1].CreatedAt, "last result is wrong")
require.Equal(t, expected[nresults-1].CreatedAt, res[nresults-1].CreatedAt, "last result (%d) is wrong", nresults-1)
require.Equal(t, getTimestamps(expected), getTimestamps(res))

for _, evt := range res {
require.True(t, filter.Matches(evt), "event %s doesn't match filter %s", evt, filter)
}

require.True(t, slices.IsSortedFunc(res, func(a, b *nostr.Event) int { return cmp.Compare(b.CreatedAt, a.CreatedAt) }), "results are not sorted")
})
}
4 changes: 2 additions & 2 deletions lmdb/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type iterator struct {
err error
}

func (it iterator) seek(key []byte) {
func (it *iterator) seek(key []byte) {
if _, _, errsr := it.cursor.Get(key, nil, lmdb.SetRange); errsr != nil {
if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound {
// in this case it's really an error
Expand All @@ -36,7 +36,7 @@ func (it iterator) seek(key []byte) {
}
}

func (it iterator) next() {
func (it *iterator) next() {
// move one back (we'll look into k and v and err in the next iteration)
it.key, it.valIdx, it.err = it.cursor.Get(nil, nil, lmdb.Prev)
}
Expand Down
21 changes: 12 additions & 9 deletions lmdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
txn.RawRead = true
defer close(ch)

iterators := make([]iterator, len(queries))
iterators := make([]*iterator, len(queries))
exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore
results := make([][]internal.IterEvent, len(queries))
pulledPerQuery := make([]int, len(queries))
Expand Down Expand Up @@ -91,7 +91,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
if err != nil {
return err
}
iterators[q] = iterator{cursor: cursor}
iterators[q] = &iterator{cursor: cursor}
defer cursor.Close()
iterators[q].seek(queries[q].startingPoint)
results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
len(it.key) != query.keySize ||
!bytes.HasPrefix(it.key, query.prefix) {
// either iteration has errored or we reached the end of this prefix
// fmt.Println(" reached end")
// fmt.Println(" reached end", it.key, query.keySize, query.prefix)
exhaust(q)
break
}
Expand Down Expand Up @@ -166,6 +166,8 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
return fmt.Errorf("event read error: %w", err)
}

// fmt.Println(" event", hex.EncodeToString(val[0:4]), "kind", binary.BigEndian.Uint16(val[132:134]), "author", hex.EncodeToString(val[32:36]), "ts", nostr.Timestamp(binary.BigEndian.Uint32(val[128:132])), hex.EncodeToString(it.key), it.valIdx)

// if there is still a tag to be checked, do it now
if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) {
it.next()
Expand All @@ -182,19 +184,19 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
// now we have to eliminate the event currently at the `since` threshold.
nextThreshold := firstPhaseResults[len(firstPhaseResults)-2]
if oldest.Event == nil {
// fmt.Println(" b1")
// fmt.Println(" b1", evt.ID[0:8])
// BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE)
// when we don't have the oldest set, we will keep the results
// and not change the cutting point -- it's bad, but hopefully not that bad.
results[q] = append(results[q], evt)
secondPhaseHasResultsPending = true
} else if nextThreshold.CreatedAt > oldest.CreatedAt {
// fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt)
// fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8])
// one of the events we have stored is the actual next threshold
// eliminate last, update since with oldest
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
since = uint32(oldest.CreatedAt)
// fmt.Println(" new since", since)
// fmt.Println(" new since", since, evt.ID[0:8])
// we null the oldest Event as we can't rely on it anymore
// (we'll fall under BWWDHTOE above) until we have a new oldest set.
oldest = internal.IterEvent{Q: -1}
Expand All @@ -207,7 +209,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
secondPhaseHasResultsPending = true
} else if nextThreshold.CreatedAt < evt.CreatedAt {
// the next last event in the firstPhaseResults is the next threshold
// fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt)
// fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8])
// eliminate last, update since with the antelast
firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1]
since = uint32(nextThreshold.CreatedAt)
Expand All @@ -220,7 +222,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
oldest = evt
}
} else {
// fmt.Println(" b4")
// fmt.Println(" b4", evt.ID[0:8])
// oops, _we_ are the next `since` threshold
firstPhaseResults[len(firstPhaseResults)-1] = evt
since = uint32(evt.CreatedAt)
Expand All @@ -243,6 +245,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
if pulledThisIteration > batchSizePerQuery {
// batch filled
it.next()
// fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx)
break
}
if pulledPerQuery[q] >= limit {
Expand All @@ -257,7 +260,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
}

// we will do this check if we don't accumulated the requested number of events yet
// fmt.Println("oldest", oldest.Event, "from iter", oldest.q)
// fmt.Println("oldest", oldest.Event, "from iter", oldest.Q)
if secondPhase && secondPhaseHasResultsPending && (oldest.Event == nil || remainingUnexhausted == 0) {
// fmt.Println("second phase aggregation!")
// when we are in the second phase we will aggressively aggregate results on every iteration
Expand Down
1 change: 1 addition & 0 deletions lmdb/testdata/fuzz/FuzzQuery

0 comments on commit 6adb45c

Please sign in to comment.