Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func startStore(
TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
DocBlocksZstdLevel: cfg.Compression.DocBlockZstdCompressionLevel,
DocBlockSize: int(cfg.DocsSorting.DocBlockSize),
SkipFsync: cfg.Resources.SkipFsync,
},
Fraction: frac.Config{
Search: frac.SearchConfig{
Expand All @@ -283,6 +284,7 @@ func startStore(
},
SkipSortDocs: !cfg.DocsSorting.Enabled,
KeepMetaFile: false,
SkipFsync: cfg.Resources.SkipFsync,
},
OffloadingEnabled: cfg.Offloading.Enabled,
OffloadingRetention: cfg.Offloading.Retention,
Expand Down
14 changes: 7 additions & 7 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func NewActive(
cfg *Config,
skipMaskProvider skipMaskProvider,
) *Active {
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync)
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, cfg.SkipFsync)

metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats)
metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats, cfg.SkipFsync)

f := &Active{
TokenList: NewActiveTokenList(config.IndexWorkers),
Expand Down Expand Up @@ -117,24 +117,24 @@ func mustOpenMetaWriter(
readLimiter *storage.ReadLimiter,
docsFile *os.File,
docsStats os.FileInfo,
) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
skipFsync bool) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
legacyMetaFileName := baseFileName + consts.MetaFileSuffix

if _, err := os.Stat(legacyMetaFileName); err == nil {
// .meta file exists
metaFile, metaStats := mustOpenFile(legacyMetaFileName, config.SkipFsync)
metaFile, metaStats := mustOpenFile(legacyMetaFileName, skipFsync)
metaSize := uint64(metaStats.Size())
metaReader := storage.NewDocBlocksReader(readLimiter, metaFile)
writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), skipFsync)
logger.Info("using legacy meta file format", zap.String("fraction", baseFileName))
return metaFile, writer, &metaReader, nil, metaSize
}

logger.Info("using new WAL format", zap.String("fraction", baseFileName))
walFileName := baseFileName + consts.WalFileSuffix
metaFile, metaStats := mustOpenFile(walFileName, config.SkipFsync)
metaFile, metaStats := mustOpenFile(walFileName, skipFsync)
metaSize := uint64(metaStats.Size())
writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), skipFsync)
walReader, err := storage.NewWalReader(readLimiter, metaFile, baseFileName)
if err != nil {
logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err))
Expand Down
55 changes: 27 additions & 28 deletions frac/active_sealing_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,24 @@ type (
)

type ActiveSealingSource struct {
params common.SealParams // Sealing parameters

info *common.Info // fraction Info
created time.Time // Creation time of the source

blocksOffsets []uint64 // Document block offsets

sortedLIDs []uint32 // Sorted LIDs (Local ID)
oldToNewLIDs []uint32 // Mapping from old LIDs to new ones (after sorting)

mids *UInt64s // MIDs
rids *UInt64s // RIDs

fields []string // Sorted field names
fieldTIDs [][]uint32 // Each field contains sorted TIDs based on token value

tokens [][]byte // Tokens (values) by TID
lids []*TokenLIDs // LID lists for each token

docPosMap map[seq.ID]seq.DocPos // Original document positions
docPosSorted []seq.DocPos // Document positions after sorting
docsReader *storage.DocsReader // Document storage reader
params common.SealParams // Sealing parameters
info *common.Info // fraction Info
created time.Time // Creation time of the source
sortedLIDs []uint32 // Sorted LIDs (Local ID)
oldToNewLIDs []uint32 // Mapping from old LIDs to new ones (after sorting)
mids *UInt64s // MIDs
rids *UInt64s // RIDs
fields []string // Sorted field names
fieldsMaxTIDs []uint32 // Maximum TIDs for each field
tids []uint32 // Sorted TIDs (Token ID)
tokens [][]byte // Tokens (values) by TID
lids []*TokenLIDs // LID lists for each token
docPosMap map[seq.ID]seq.DocPos // Original document positions
docPosSorted []seq.DocPos // Document positions after sorting
blocksOffsets []uint64 // Document block offsets
docsReader *storage.DocsReader // Document storage reader
lastErr error // Last error
skipFsync bool
}

func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSealingSource, error) {
Expand Down Expand Up @@ -81,6 +77,7 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe
docPosMap: active.DocsPositions.idToPos,
blocksOffsets: active.DocBlocks.vals,
docsReader: &active.sortReader,
skipFsync: active.Config.SkipFsync,
}

src.prepareInfo()
Expand Down Expand Up @@ -313,9 +310,10 @@ func (src *ActiveSealingSource) SortDocs() error {
}
src.info.DocsOnDisk = uint64(stat.Size())

// Synchronize and rename file
if err := sdocsFile.Sync(); err != nil {
return err
if !src.skipFsync {
if err := sdocsFile.Sync(); err != nil {
return err
}
}

if err := sdocsFile.Close(); err != nil {
Expand All @@ -325,9 +323,10 @@ func (src *ActiveSealingSource) SortDocs() error {
if err := os.Rename(sdocsFile.Name(), src.info.Path+consts.SdocsFileSuffix); err != nil {
return err
}

if err := util.SyncPath(filepath.Dir(src.info.Path)); err != nil {
return err
if !src.skipFsync {
if err := util.SyncPath(filepath.Dir(src.info.Path)); err != nil {
return err
}
}

// Log compression statistics
Expand Down
2 changes: 2 additions & 0 deletions frac/common/seal_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ type SealParams struct {

DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block.
DocBlockSize int // DocBlockSize is decompressed payload size of document block.

SkipFsync bool
}
1 change: 1 addition & 0 deletions frac/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ type Config struct {

SkipSortDocs bool
KeepMetaFile bool
SkipFsync bool
}

type SearchConfig struct {
Expand Down
17 changes: 9 additions & 8 deletions frac/sealed/sealing/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {
return nil, err
}

if err := createAndWrite(
info.Path+consts.IDTmpFileSuffix,
info.Path+consts.IDFileSuffix,
func(f *os.File) error { return sealer.WriteIDFile(f, src) },
); err != nil {
return nil, err
if !params.SkipFsync {
// Ensure data is flushed to disk
if err := indexFile.Sync(); err != nil {
return nil, err
}
}

if err := createAndWriteBoth(
Expand Down Expand Up @@ -99,8 +98,10 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {
totalSize += uint64(st.Size())
}

info.IndexOnDisk = totalSize
lidsTable := sealer.LIDsTable()
if !params.SkipFsync {
// Ensure directory metadata is synced to disk
util.MustSyncPath(filepath.Dir(info.Path))
}

preloaded := &sealed.PreloadedData{
Info: info,
Expand Down