Skip to content

Commit

Permalink
badger/lmdb: make negentropy work better and guard against giant make…
Browse files Browse the repository at this point in the history
…slices when that is completely unnecessary.
  • Loading branch information
fiatjaf committed Nov 3, 2024
1 parent 1e245c5 commit 37d4373
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 75 deletions.
8 changes: 7 additions & 1 deletion badger/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var _ eventstore.Store = (*BadgerBackend)(nil)
type BadgerBackend struct {
Path string
MaxLimit int
MaxLimitNegentropy int
BadgerOptionsModifier func(badger.Options) badger.Options

// Experimental
Expand Down Expand Up @@ -56,8 +57,13 @@ func (b *BadgerBackend) Init() error {
return fmt.Errorf("error running migrations: %w", err)
}

if b.MaxLimit == 0 {
if b.MaxLimit != 0 {
b.MaxLimitNegentropy = b.MaxLimit
} else {
b.MaxLimit = 500
if b.MaxLimitNegentropy == 0 {
b.MaxLimitNegentropy = 16777216
}
}

if err := b.DB.View(func(txn *badger.Txn) error {
Expand Down
18 changes: 16 additions & 2 deletions badger/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"

"github.com/dgraph-io/badger/v4"
"github.com/fiatjaf/eventstore"
bin "github.com/fiatjaf/eventstore/internal/binary"
"github.com/nbd-wtf/go-nostr"
"golang.org/x/exp/slices"
Expand All @@ -35,10 +36,23 @@ func (b *BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
}

// max number of events we'll return
limit := b.MaxLimit / 4
if filter.Limit > 0 && filter.Limit <= b.MaxLimit {
maxLimit := b.MaxLimit
var limit int
if eventstore.IsNegentropySession(ctx) {
maxLimit = b.MaxLimitNegentropy
limit = maxLimit
} else {
limit = maxLimit / 4
}
if filter.Limit > 0 && filter.Limit <= maxLimit {
limit = filter.Limit
}
if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 {
close(ch)
return ch, nil
} else if tlimit > 0 {
limit = tlimit
}

// fmt.Println("limit", limit)

Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/fiatjaf/eventstore

go 1.23.0
go 1.23.1

toolchain go1.23.2

require (
fiatjaf.com/lib v0.2.0
Expand All @@ -16,7 +18,7 @@ require (
github.com/lib/pq v1.10.9
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-sqlite3 v1.14.18
github.com/nbd-wtf/go-nostr v0.38.1
github.com/nbd-wtf/go-nostr v0.42.0
github.com/opensearch-project/opensearch-go/v4 v4.0.0
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/nbd-wtf/go-nostr v0.38.1 h1:D0moEtIpjhWs2zbgeRyokA4TOLzBdumtpL1/O7/frww=
github.com/nbd-wtf/go-nostr v0.38.1/go.mod h1:TGKGj00BmJRXvRe0LlpDN3KKbELhhPXgBwUEhzu3Oq0=
github.com/nbd-wtf/go-nostr v0.42.0 h1:EofWfXEhKic9AYVf4RHuXZr+kKUZE2jVyJtJByNe1rE=
github.com/nbd-wtf/go-nostr v0.42.0/go.mod h1:FBa4FBJO7NuANvkeKSlrf0BIyxGufmrUbuelr6Q4Ick=
github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf0CBgWP5XUaSp0HMsRBlA=
github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
14 changes: 10 additions & 4 deletions lmdb/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ const (
var _ eventstore.Store = (*LMDBBackend)(nil)

type LMDBBackend struct {
Path string
MaxLimit int
MapSize int64
Path string
MaxLimit int
MaxLimitNegentropy int
MapSize int64

lmdbEnv *lmdb.Env

Expand All @@ -39,8 +40,13 @@ type LMDBBackend struct {
}

func (b *LMDBBackend) Init() error {
if b.MaxLimit == 0 {
if b.MaxLimit != 0 {
b.MaxLimitNegentropy = b.MaxLimit
} else {
b.MaxLimit = 500
if b.MaxLimitNegentropy == 0 {
b.MaxLimitNegentropy = 16777216
}
}

env, err := lmdb.NewEnv()
Expand Down
22 changes: 18 additions & 4 deletions lmdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"

"github.com/PowerDNS/lmdb-go/lmdb"
"github.com/fiatjaf/eventstore"
bin "github.com/fiatjaf/eventstore/internal/binary"
"github.com/nbd-wtf/go-nostr"
)
Expand All @@ -22,20 +23,33 @@ type queryEvent struct {
func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)

if filter.Search != "" {
close(ch)
return ch, nil
}

queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := b.prepareQueries(filter)
if err != nil {
return nil, err
}

// max number of events we'll return
limit := b.MaxLimit / 4
if filter.Limit > 0 && filter.Limit <= b.MaxLimit {
maxLimit := b.MaxLimit
var limit int
if eventstore.IsNegentropySession(ctx) {
maxLimit = b.MaxLimitNegentropy
limit = maxLimit
} else {
limit = maxLimit / 4
}
if filter.Limit > 0 && filter.Limit <= maxLimit {
limit = filter.Limit
}

if filter.Search != "" {
if tlimit := nostr.GetTheoreticalLimit(filter); tlimit == 0 {
close(ch)
return ch, nil
} else if tlimit > 0 {
limit = tlimit
}

go func() {
Expand Down
13 changes: 13 additions & 0 deletions negentropy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package eventstore

import "context"

var negentropySessionKey = struct{}{}

func IsNegentropySession(ctx context.Context) bool {
return ctx.Value(negentropySessionKey) != nil
}

func SetNegentropy(ctx context.Context) context.Context {
return context.WithValue(ctx, negentropySessionKey, struct{}{})
}
93 changes: 38 additions & 55 deletions relay_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,69 +14,52 @@ type RelayWrapper struct {
var _ nostr.RelayStore = (*RelayWrapper)(nil)

func (w RelayWrapper) Publish(ctx context.Context, evt nostr.Event) error {
if nostr.IsEphemeralKind(evt.Kind) {
// do not store ephemeral events
return nil
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if 20000 <= evt.Kind && evt.Kind < 30000 {
// do not store ephemeral events
return nil
} else if evt.Kind == 0 || evt.Kind == 3 || (10000 <= evt.Kind && evt.Kind < 20000) {
// replaceable event, delete before storing
ch, err := w.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}})
if err != nil {
return fmt.Errorf("failed to query before replacing: %w", err)
}
isNewer := true
for previous := range ch {
if previous == nil {
continue
}
if isOlder(previous, &evt) {
if err := w.Store.DeleteEvent(ctx, previous); err != nil {
return fmt.Errorf("failed to delete event for replacing: %w", err)
}
} else {
// already, newer event is stored.
isNewer = false
break
}
}
if !isNewer {
return nil
}
} else if 30000 <= evt.Kind && evt.Kind < 40000 {
// parameterized replaceable event, delete before storing
d := evt.Tags.GetFirst([]string{"d", ""})
if d == nil {
return fmt.Errorf("failed to add event missing d tag for parameterized replacing")
}
ch, err := w.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}, Tags: nostr.TagMap{"d": []string{d.Value()}}})
if err != nil {
return fmt.Errorf("failed to query before parameterized replacing: %w", err)
if nostr.IsRegularKind(evt.Kind) {
// regular events are just saved directly
if err := w.SaveEvent(ctx, &evt); err != nil && err != ErrDupEvent {
return fmt.Errorf("failed to save: %w", err)
}
isNewer := true
for previous := range ch {
if previous == nil {
continue
}
}

if !isOlder(previous, &evt) {
if err := w.Store.DeleteEvent(ctx, previous); err != nil {
return fmt.Errorf("failed to delete event for parameterized replacing: %w", err)
}
} else {
// already, newer event is stored.
isNewer = false
break
}
// from now on we know they are replaceable or addressable
filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}}
if nostr.IsAddressableKind(evt.Kind) {
// when addressable, add the "d" tag to the filter
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
}

// now we fetch the event, whatever it is, delete it and then save the new
ch, err := w.Store.QueryEvents(ctx, filter)
if err != nil {
return fmt.Errorf("failed to query before replacing: %w", err)
}

shouldStore := true
for previous := range ch {
if previous == nil {
continue
}
if !isNewer {
return nil
if isOlder(previous, &evt) {
if err := w.Store.DeleteEvent(ctx, previous); err != nil {
return fmt.Errorf("failed to delete event for replacing: %w", err)
}
} else {
// already, newer event is stored.
shouldStore = false
}
}

if err := w.SaveEvent(ctx, &evt); err != nil && err != ErrDupEvent {
return fmt.Errorf("failed to save: %w", err)
if shouldStore {
if err := w.SaveEvent(ctx, &evt); err != nil && err != ErrDupEvent {
return fmt.Errorf("failed to save: %w", err)
}
}

return nil
Expand Down
9 changes: 4 additions & 5 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ type Store interface {
// Close must be called after you're done using the store, to free up resources and so on.
Close()

// QueryEvents is invoked upon a client's REQ as described in NIP-01.
// it should return a channel with the events as they're recovered from a database.
// the channel should be closed after the events are all delivered.
// QueryEvents should return a channel with the events as they're recovered from a database.
// the channel should be closed after the events are all delivered.
QueryEvents(context.Context, nostr.Filter) (chan *nostr.Event, error)
// DeleteEvent is used to handle deletion events, as per NIP-09.
// DeleteEvent just deletes an event, no side-effects.
DeleteEvent(context.Context, *nostr.Event) error
// SaveEvent is called once Relay.AcceptEvent reports true.
// SaveEvent just saves an event, no side-effects.
SaveEvent(context.Context, *nostr.Event) error
}

Expand Down

0 comments on commit 37d4373

Please sign in to comment.