diff --git a/mmm/mmmm.go b/mmm/mmmm.go index 6113c02..7cc3eeb 100644 --- a/mmm/mmmm.go +++ b/mmm/mmmm.go @@ -24,7 +24,7 @@ type MultiMmapManager struct { Dir string Logger *zerolog.Logger - layers []*IndexingLayer + layers IndexingLayers mmapfPath string mmapf mmap @@ -169,9 +169,9 @@ func (b *MultiMmapManager) EnsureLayer(name string, il *IndexingLayer) error { if err := il.runThroughEvents(txn); err != nil { return fmt.Errorf("failed to run %s through events: %w", name, err) } - return txn.Put(b.knownLayers, []byte(name), binary.LittleEndian.AppendUint16(nil, il.id), 0) + return txn.Put(b.knownLayers, []byte(name), binary.BigEndian.AppendUint16(nil, il.id), 0) } else if err == nil { - il.id = binary.LittleEndian.Uint16(idv) + il.id = binary.BigEndian.Uint16(idv) if err := il.Init(); err != nil { return fmt.Errorf("failed to init old layer %s: %w", name, err) @@ -201,9 +201,13 @@ func (b *MultiMmapManager) DropLayer(name string) error { } il := b.layers[idx] - // remove layer references from global indexes + // remove layer references err := b.lmdbEnv.Update(func(txn *lmdb.Txn) error { - return b.removeAllReferencesFromLayer(txn, il.id) + if err := b.removeAllReferencesFromLayer(txn, il.id); err != nil { + return err + } + + return txn.Del(b.knownLayers, []byte(il.name), nil) }) if err != nil { return err @@ -301,7 +305,7 @@ func (b *MultiMmapManager) getNextAvailableLayerId(txn *lmdb.Txn) (uint16, error _, val, err := cursor.Get(nil, nil, lmdb.First) for err == nil { // something was found - used[binary.LittleEndian.Uint16(val)] = true + used[binary.BigEndian.Uint16(val)] = true // next _, val, err = cursor.Get(nil, nil, lmdb.Next) } diff --git a/mmm/query.go b/mmm/query.go index 0db215f..da67084 100644 --- a/mmm/query.go +++ b/mmm/query.go @@ -15,10 +15,32 @@ import ( "github.com/nbd-wtf/go-nostr" ) -func (b *MultiMmapManager) queryByIDs(_ context.Context, ch chan *nostr.Event, ids []string) { +// GetByID returns the event -- if found in this mmm -- and all the IndexingLayers it belongs to. +func (b *MultiMmapManager) GetByID(id string) (*nostr.Event, IndexingLayers) { + events := make(chan *nostr.Event) + presence := make(chan []uint16) + b.queryByIDs(events, []string{id}, presence) + for evt := range events { + p := <-presence + present := make([]*IndexingLayer, len(p)) + for i, id := range p { + present[i] = b.layers.ByID(id) + } + return evt, present + } + return nil, nil +} + +// queryByIDs emits the events of the given id to the given channel if they exist anywhere in this mmm. +// if presence is given it will also be used to emit slices of the ids of the IndexingLayers this event is stored in. +// it closes the channels when it ends. +func (b *MultiMmapManager) queryByIDs(ch chan *nostr.Event, ids []string, presence chan []uint16) { go b.lmdbEnv.View(func(txn *lmdb.Txn) error { txn.RawRead = true defer close(ch) + if presence != nil { + defer close(presence) + } for _, id := range ids { if len(id) != 64 { @@ -30,8 +52,17 @@ func (b *MultiMmapManager) queryByIDs(_ context.Context, ch chan *nostr.Event, i if err == nil { pos := positionFromBytes(val[0:12]) evt := &nostr.Event{} - if err := b.loadEvent(pos, evt); err == nil { - ch <- evt + if err := b.loadEvent(pos, evt); err != nil { + panic(fmt.Errorf("failed to decode event from %v: %w", pos, err)) + } + ch <- evt + + if presence != nil { + layers := make([]uint16, 0, (len(val)-12)/2) + for s := 12; s < len(val); s += 2 { + layers = append(layers, binary.BigEndian.Uint16(val[s:s+2])) + } + presence <- layers } } } @@ -44,7 +75,7 @@ func (il *IndexingLayer) QueryEvents(ctx context.Context, filter nostr.Filter) ( ch := make(chan *nostr.Event) if len(filter.IDs) > 0 { - go il.mmmm.queryByIDs(ctx, ch, filter.IDs) + il.mmmm.queryByIDs(ch, filter.IDs, nil) return ch, nil } diff --git a/mmm/save.go b/mmm/save.go index 334ac7a..1b767b7 100644 --- a/mmm/save.go +++ b/mmm/save.go @@ -147,15 +147,15 @@ func (b *MultiMmapManager) storeOn( return false, nil } - // find a suitable place for this to be stored in + // get event binary size pos := position{ size: uint32(betterbinary.Measure(*evt)), } - if pos.size >= 1<<16 { return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size) } + // find a suitable place for this to be stored in appendToMmap := true for f, fr := range b.freeRanges { if fr.size >= pos.size {