Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (f *Sealed) Release() {

func (f *Sealed) Suicide() {
f.Release()

// Rename docs atomically first — this commits the intent to delete.
oldPath := f.BaseFileName + consts.DocsFileSuffix
newPath := f.BaseFileName + consts.DocsDelFileSuffix
Expand Down
50 changes: 42 additions & 8 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk
wg.Wait()

// finalize appender to prevent new writes
appender := lc.registry.Appender()
if err := appender.Finalize(); err != nil {
appender := lc.registry.appender()
if err := appender.finalize(); err != nil {
logger.Fatal("shutdown fraction freezing error", zap.Error(err))
}
appender.WaitWriteIdle()
appender.waitWriteIdle()

stopIdx()

Expand All @@ -96,16 +96,50 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk
return &fm, stop, nil
}

type CompactionSnapshot struct {
claimed []*refCountedSealed
}

func (cs *CompactionSnapshot) Fractions() []*frac.Sealed {
result := make([]*frac.Sealed, len(cs.claimed))
for i, f := range cs.claimed {
result[i] = f.Sealed
}
return result
}

func (cs *CompactionSnapshot) Destroy() {
for _, f := range cs.claimed {
f.Destroy()
}
}

func (fm *FracManager) SealedFractionsSnapshot() []*frac.Sealed {
return fm.lc.registry.sealedSnapshot()
}

func (fm *FracManager) ClaimForCompaction(names []string) (*CompactionSnapshot, error) {
claimed, err := fm.lc.registry.claimForCompaction(names)
if err != nil {
return nil, err
}
return &CompactionSnapshot{claimed: claimed}, nil
}

func (fm *FracManager) SubstituteWithSealed(produced *frac.Sealed, snapshot *CompactionSnapshot) {
fm.lc.registry.substituteWithSealed(produced, snapshot.claimed...)
}

func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) {
return fm.lc.registry.AcquireOneFraction(name)
return fm.lc.registry.acquireOneFraction(name)
}

func (fm *FracManager) AcquireFractions() (List, func()) {
return fm.lc.registry.AcquireAllFractions()
return fm.lc.registry.acquireAllFractions()
}

func (fm *FracManager) Oldest() uint64 {
return fm.lc.registry.OldestTotal()
return fm.lc.registry.oldestTotal()
}

func (fm *FracManager) Flags() *StateManager {
Expand All @@ -121,7 +155,7 @@ func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas
return ctx.Err()
default:
// Try to append data to the currently active fraction
err := fm.lc.registry.Appender().Append(docs, metas)
err := fm.lc.registry.appender().append(docs, metas)
if err != nil {
logger.Info("append fail", zap.Error(err))
if err == ErrFractionNotWritable {
Expand Down Expand Up @@ -167,7 +201,7 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG
logger.Info("stats loop is started")
// Run stats collection every 10 seconds
util.RunEvery(ctx.Done(), time.Second*10, func() {
stats := reg.Stats()
stats := reg.statistics()
stats.Log() // Log statistics
stats.SetMetrics() // Update Prometheus metrics
})
Expand Down
2 changes: 1 addition & 1 deletion fracmanager/fracmanager_for_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package fracmanager
import "sync"

func (fm *FracManager) WaitIdleForTests() {
fm.lc.registry.Appender().WaitWriteIdle()
fm.lc.registry.appender().waitWriteIdle()
}

func (fm *FracManager) SealForcedForTests() {
Expand Down
6 changes: 3 additions & 3 deletions fracmanager/fracmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func TestSealingOnShutdown(t *testing.T) {
cfg, fm, stop := setupFracManager(t, cfg)
appendDocsToFracManager(t, fm, 10)

activeName := fm.lc.registry.all.fractions[0].Info().Name()
activeName := fm.lc.registry.snapshot.fractions[0].Info().Name()

stop()

// second start
cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown
cfg, fm, stop = setupFracManager(t, cfg)

allFractions := fm.lc.registry.all.fractions
allFractions := fm.lc.registry.snapshot.fractions
assert.Equal(t, 1, len(allFractions), "should have one fraction")
assert.Equal(t, activeName, allFractions[0].Info().Name(), "fraction should have the same name")
_, ok := allFractions[0].(*syncAppender)
Expand All @@ -80,7 +80,7 @@ func TestSealingOnShutdown(t *testing.T) {
// third start
_, fm, stop = setupFracManager(t, cfg)

allFractions = fm.lc.registry.all.fractions
allFractions = fm.lc.registry.snapshot.fractions
assert.Equal(t, 2, len(allFractions), "should have 2 fraction: new active and old sealed")
_, ok = allFractions[0].(*refCountedSealed)
assert.True(t, ok, "first fraction should be sealed")
Expand Down
5 changes: 4 additions & 1 deletion fracmanager/fracs_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type registryStats struct {
active fracsStats // Statistics for active fraction
sealing fracsStats // Statistics for fractions in the sealing process
sealed fracsStats // Statistics for fractions on sealed disk
compacting fracsStats // Statistics for fractions participating in compaction
offloading fracsStats // Statistics for fractions in the offloading process
remotes fracsStats // Statistics for fractions in remote storage
}
Expand All @@ -84,6 +85,7 @@ func (s *registryStats) Log() {
s.active.Log("active")
s.sealing.Log("sealing")
s.sealed.Log("sealed")
s.compacting.Log("compacting")
s.offloading.Log("offloading")
s.remotes.Log("remotes")
}
Expand All @@ -92,10 +94,11 @@ func (s *registryStats) SetMetrics() {
s.active.SetMetrics(dataSizeTotal, "active")
s.sealing.SetMetrics(dataSizeTotal, "sealing")
s.sealed.SetMetrics(dataSizeTotal, "sealed")
s.compacting.SetMetrics(dataSizeTotal, "compacting")
s.offloading.SetMetrics(dataSizeTotal, "offloading")
s.remotes.SetMetrics(dataSizeTotal, "remotes")
}

func (s registryStats) TotalSizeOnDiskLocal() uint64 {
return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk
return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk + s.compacting.totalSizeOnDisk
}
Loading