Skip to content

Commit

Permalink
mmm: support inspecting if an id is in a given layer.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Feb 16, 2025
1 parent 0adaf58 commit d883fb2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
16 changes: 10 additions & 6 deletions mmm/mmmm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type MultiMmapManager struct {
Dir string
Logger *zerolog.Logger

layers []*IndexingLayer
layers IndexingLayers

mmapfPath string
mmapf mmap
Expand Down Expand Up @@ -169,9 +169,9 @@ func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error {
if err := il.runThroughEvents(txn); err != nil {
return fmt.Errorf("failed to run %s through events: %w", name, err)
}
return txn.Put(b.knownLayers, []byte(name), binary.LittleEndian.AppendUint16(nil, il.id), 0)
return txn.Put(b.knownLayers, []byte(name), binary.BigEndian.AppendUint16(nil, il.id), 0)
} else if err == nil {
il.id = binary.LittleEndian.Uint16(idv)
il.id = binary.BigEndian.Uint16(idv)

if err := il.Init(); err != nil {
return fmt.Errorf("failed to init old layer %s: %w", name, err)
Expand Down Expand Up @@ -201,9 +201,13 @@ func (b *MultiMmapManager) DropLayer(name string) error {
}
il := b.layers[idx]

// remove layer references from global indexes
// remove layer references
err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
return b.removeAllReferencesFromLayer(txn, il.id)
if err := b.removeAllReferencesFromLayer(txn, il.id); err != nil {
return err
}

return txn.Del(b.knownLayers, []byte(il.name), nil)
})
if err != nil {
return err
Expand Down Expand Up @@ -301,7 +305,7 @@ func (b *MultiMmapManager) getNextAvailableLayerId(txn *lmdb.Txn) (uint16, error
_, val, err := cursor.Get(nil, nil, lmdb.First)
for err == nil {
// something was found
used[binary.LittleEndian.Uint16(val)] = true
used[binary.BigEndian.Uint16(val)] = true
// next
_, val, err = cursor.Get(nil, nil, lmdb.Next)
}
Expand Down
39 changes: 35 additions & 4 deletions mmm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,32 @@ import (
"github.com/nbd-wtf/go-nostr"
)

func (b *MultiMmapManager) queryByIDs(_ context.Context, ch chan *nostr.Event, ids []string) {
// GetByID returns the event -- if found in this mmm -- and all the IndexingLayers it belongs to.
func (b *MultiMmapManager) GetByID(id string) (*nostr.Event, IndexingLayers) {
events := make(chan *nostr.Event)
presence := make(chan []uint16)
b.queryByIDs(events, []string{id}, presence)
for evt := range events {
p := <-presence
present := make([]*IndexingLayer, len(p))
for i, id := range p {
present[i] = b.layers.ByID(id)
}
return evt, present
}
return nil, nil
}

// queryByIDs emits the events of the given id to the given channel if they exist anywhere in this mmm.
// if presence is given it will also be used to emit slices of the ids of the IndexingLayers this event is stored in.
// it closes the channels when it ends.
func (b *MultiMmapManager) queryByIDs(ch chan *nostr.Event, ids []string, presence chan []uint16) {
go b.lmdbEnv.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
defer close(ch)
if presence != nil {
defer close(presence)
}

for _, id := range ids {
if len(id) != 64 {
Expand All @@ -30,8 +52,17 @@ func (b *MultiMmapManager) queryByIDs(_ context.Context, ch chan *nostr.Event, i
if err == nil {
pos := positionFromBytes(val[0:12])
evt := &nostr.Event{}
if err := b.loadEvent(pos, evt); err == nil {
ch <- evt
if err := b.loadEvent(pos, evt); err != nil {
panic(fmt.Errorf("failed to decode event from %v: %w", pos, err))
}
ch <- evt

if presence != nil {
layers := make([]uint16, 0, (len(val)-12)/2)
for s := 12; s < len(val); s += 2 {
layers = append(layers, binary.BigEndian.Uint16(val[s:s+2]))
}
presence <- layers
}
}
}
Expand All @@ -44,7 +75,7 @@ func (il *IndexingLayer) QueryEvents(ctx context.Context, filter nostr.Filter) (
ch := make(chan *nostr.Event)

if len(filter.IDs) > 0 {
go il.mmmm.queryByIDs(ctx, ch, filter.IDs)
il.mmmm.queryByIDs(ch, filter.IDs, nil)
return ch, nil
}

Expand Down
4 changes: 2 additions & 2 deletions mmm/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ func (b *MultiMmapManager) storeOn(
return false, nil
}

// find a suitable place for this to be stored in
// get event binary size
pos := position{
size: uint32(betterbinary.Measure(*evt)),
}

if pos.size >= 1<<16 {
return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size)
}

// find a suitable place for this to be stored in
appendToMmap := true
for f, fr := range b.freeRanges {
if fr.size >= pos.size {
Expand Down

0 comments on commit d883fb2

Please sign in to comment.