Skip to content

Commit dbc7e3b

Browse files
committed
db: check problem spans when picking auto compactions
We check the problem spans to avoid any overlap when picking compactions. Note that normally the problem spans are empty and these checks are avoided. We add a test that verifies that compactions still go through after trying to compact a non-existent external file. Fixes #4285 Informs #1192
1 parent f551b59 commit dbc7e3b

11 files changed

+449
-50
lines changed

compaction.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1788,7 +1788,7 @@ func (d *DB) makeCompactionEnvLocked() *compactionEnv {
17881788
if d.closed.Load() != nil || d.opts.ReadOnly {
17891789
return nil
17901790
}
1791-
return &compactionEnv{
1791+
env := &compactionEnv{
17921792
diskAvailBytes: d.diskAvailBytes.Load(),
17931793
earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
17941794
earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
@@ -1799,6 +1799,10 @@ func (d *DB) makeCompactionEnvLocked() *compactionEnv {
17991799
rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
18001800
},
18011801
}
1802+
if !d.problemSpans.IsEmpty() {
1803+
env.problemSpans = &d.problemSpans
1804+
}
1805+
return env
18021806
}
18031807

18041808
// pickAnyCompaction tries to pick a manual or automatic compaction.
@@ -2324,7 +2328,7 @@ func (d *DB) compact(c *compaction, errChannel chan error) {
23242328
d.mu.Lock()
23252329
c.grantHandle.Started()
23262330
if err := d.compact1(c, errChannel); err != nil {
2327-
d.handleCompactFailure(err)
2331+
d.handleCompactFailure(c, err)
23282332
}
23292333
if c.isDownload {
23302334
d.mu.compact.downloadingCount--
@@ -2362,13 +2366,34 @@ func (d *DB) compact(c *compaction, errChannel chan error) {
23622366
})
23632367
}
23642368

2365-
func (d *DB) handleCompactFailure(err error) {
2369+
func (d *DB) handleCompactFailure(c *compaction, err error) {
23662370
if errors.Is(err, ErrCancelledCompaction) {
23672371
// ErrCancelledCompaction is expected during normal operation, so we don't
23682372
// want to report it as a background error.
23692373
d.opts.Logger.Infof("%v", err)
23702374
return
23712375
}
2376+
2377+
// Record problem spans for a short duration, unless the error is a corruption.
2378+
expiration := 30 * time.Second
2379+
if IsCorruptionError(err) {
2380+
// TODO(radu): ideally, we should be using the corruption reporting
2381+
// mechanism which has a tighter span for the corruption. We would need to
2382+
// somehow plumb the level of the file.
2383+
expiration = 5 * time.Minute
2384+
}
2385+
for i := range c.inputs {
2386+
level := c.inputs[i].level
2387+
if level == 0 {
2388+
// We do not set problem spans on L0, as they could block flushes.
2389+
continue
2390+
}
2391+
it := c.inputs[i].files.Iter()
2392+
for f := it.First(); f != nil; f = it.Next() {
2393+
d.problemSpans.Add(level, f.UserKeyBounds(), expiration)
2394+
}
2395+
}
2396+
23722397
// TODO(peter): count consecutive compaction errors and backoff.
23732398
d.opts.EventListener.BackgroundError(err)
23742399
}

compaction_picker.go

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/pebble/internal/humanize"
1818
"github.com/cockroachdb/pebble/internal/invariants"
1919
"github.com/cockroachdb/pebble/internal/manifest"
20+
"github.com/cockroachdb/pebble/internal/problemspans"
2021
)
2122

2223
// The minimum count for an intra-L0 compaction. This matches the RocksDB
@@ -39,6 +40,10 @@ type compactionEnv struct {
3940
earliestSnapshotSeqNum base.SeqNum
4041
inProgressCompactions []compactionInfo
4142
readCompactionEnv readCompactionEnv
43+
// problemSpans is checked by the compaction picker to avoid compactions that
44+
// overlap an active "problem span". It can be nil when there are no problem
45+
// spans.
46+
problemSpans *problemspans.ByLevel
4247
}
4348

4449
type compactionPicker interface {
@@ -414,7 +419,10 @@ func (pc *pickedCompaction) maybeExpandBounds(smallest InternalKey, largest Inte
414419
// setupInputs returns true if a compaction has been set up. It returns false if
415420
// a concurrent compaction is occurring on the start or output level files.
416421
func (pc *pickedCompaction) setupInputs(
417-
opts *Options, diskAvailBytes uint64, startLevel *compactionLevel,
422+
opts *Options,
423+
diskAvailBytes uint64,
424+
startLevel *compactionLevel,
425+
problemSpans *problemspans.ByLevel,
418426
) bool {
419427
// maxExpandedBytes is the maximum size of an expanded compaction. If
420428
// growing a compaction results in a larger size, the original compaction
@@ -423,7 +431,7 @@ func (pc *pickedCompaction) setupInputs(
423431
opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes,
424432
)
425433

426-
if anyTablesCompacting(startLevel.files) {
434+
if !canCompactTables(startLevel.files, startLevel.level, problemSpans) {
427435
return false
428436
}
429437

@@ -434,7 +442,7 @@ func (pc *pickedCompaction) setupInputs(
434442
// left empty for those.
435443
if startLevel.level != pc.outputLevel.level {
436444
pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.userKeyBounds())
437-
if anyTablesCompacting(pc.outputLevel.files) {
445+
if !canCompactTables(pc.outputLevel.files, pc.outputLevel.level, problemSpans) {
438446
return false
439447
}
440448

@@ -504,7 +512,7 @@ func (pc *pickedCompaction) setupInputs(
504512
*pc.lcf = *oldLcf
505513
}
506514
}
507-
} else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes, startLevel) {
515+
} else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes, startLevel, problemSpans) {
508516
pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
509517
startLevel.files.All(), pc.outputLevel.files.All()))
510518
}
@@ -521,13 +529,16 @@ func (pc *pickedCompaction) setupInputs(
521529
// c.level+1 files in the compaction, and returns whether the inputs grew. sm
522530
// and la are the smallest and largest InternalKeys in all of the inputs.
523531
func (pc *pickedCompaction) grow(
524-
sm, la InternalKey, maxExpandedBytes uint64, startLevel *compactionLevel,
532+
sm, la InternalKey,
533+
maxExpandedBytes uint64,
534+
startLevel *compactionLevel,
535+
problemSpans *problemspans.ByLevel,
525536
) bool {
526537
if pc.outputLevel.files.Empty() {
527538
return false
528539
}
529540
grow0 := pc.version.Overlaps(startLevel.level, base.UserKeyBoundsFromInternal(sm, la))
530-
if anyTablesCompacting(grow0) {
541+
if !canCompactTables(grow0, startLevel.level, problemSpans) {
531542
return false
532543
}
533544
if grow0.Len() <= startLevel.files.Len() {
@@ -540,10 +551,10 @@ func (pc *pickedCompaction) grow(
540551
// sm1 and la1 could shift the output level keyspace when pc.outputLevel.files is set to grow1.
541552
sm1, la1 := manifest.KeyRange(pc.cmp, grow0.All(), pc.outputLevel.files.All())
542553
grow1 := pc.version.Overlaps(pc.outputLevel.level, base.UserKeyBoundsFromInternal(sm1, la1))
543-
if anyTablesCompacting(grow1) {
554+
if grow1.Len() != pc.outputLevel.files.Len() {
544555
return false
545556
}
546-
if grow1.Len() != pc.outputLevel.files.Len() {
557+
if !canCompactTables(grow1, pc.outputLevel.level, problemSpans) {
547558
return false
548559
}
549560
startLevel.files = grow0
@@ -570,18 +581,23 @@ func (pc *pickedCompaction) setupMultiLevelCandidate(opts *Options, diskAvailByt
570581
pc.startLevel = &pc.inputs[0]
571582
pc.extraLevels = []*compactionLevel{&pc.inputs[1]}
572583
pc.outputLevel = &pc.inputs[2]
573-
return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1])
584+
return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1], nil /* TODO(radu) */)
574585
}
575586

576-
// anyTablesCompacting returns true if any tables in the level slice are
577-
// compacting.
578-
func anyTablesCompacting(inputs manifest.LevelSlice) bool {
587+
// canCompactTables returns true if the tables in the level slice are not
588+
// compacting already and don't intersect any problem spans.
589+
func canCompactTables(
590+
inputs manifest.LevelSlice, level int, problemSpans *problemspans.ByLevel,
591+
) bool {
579592
for f := range inputs.All() {
580593
if f.IsCompacting() {
581-
return true
594+
return false
595+
}
596+
if problemSpans != nil && problemSpans.Overlaps(level, f.UserKeyBounds()) {
597+
return false
582598
}
583599
}
584-
return false
600+
return true
585601
}
586602

587603
// newCompactionPickerByScore creates a compactionPickerByScore associated with
@@ -1033,6 +1049,7 @@ func pickCompactionSeedFile(
10331049
opts *Options,
10341050
level, outputLevel int,
10351051
earliestSnapshotSeqNum base.SeqNum,
1052+
problemSpans *problemspans.ByLevel,
10361053
) (manifest.LevelFile, bool) {
10371054
// Select the file within the level to compact. We want to minimize write
10381055
// amplification, but also ensure that (a) deletes are propagated to the
@@ -1067,22 +1084,36 @@ func pickCompactionSeedFile(
10671084

10681085
for f := startIter.First(); f != nil; f = startIter.Next() {
10691086
var overlappingBytes uint64
1070-
compacting := f.IsCompacting()
1071-
if compacting {
1087+
if f.IsCompacting() {
10721088
// Move on if this file is already being compacted. We'll likely
10731089
// still need to move past the overlapping output files regardless,
10741090
// but in cases where all start-level files are compacting we won't.
10751091
continue
10761092
}
1093+
if problemSpans != nil && problemSpans.Overlaps(level, f.UserKeyBounds()) {
1094+
// File touches problem span which temporarily disallows auto compactions.
1095+
continue
1096+
}
10771097

10781098
// Trim any output-level files smaller than f.
10791099
for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest, f.Smallest) < 0 {
10801100
outputFile = outputIter.Next()
10811101
}
10821102

1083-
for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest, f.Largest) <= 0 && !compacting {
1103+
skip := false
1104+
for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest, f.Largest) <= 0 {
10841105
overlappingBytes += outputFile.Size
1085-
compacting = compacting || outputFile.IsCompacting()
1106+
if outputFile.IsCompacting() {
1107+
// If one of the overlapping files is compacting, we're not going to be
1108+
// able to compact f anyway, so skip it.
1109+
skip = true
1110+
break
1111+
}
1112+
if problemSpans != nil && problemSpans.Overlaps(outputLevel, outputFile.UserKeyBounds()) {
1113+
// Overlapping file touches problem span which temporarily disallows auto compactions.
1114+
skip = true
1115+
break
1116+
}
10861117

10871118
// For files in the bottommost level of the LSM, the
10881119
// Stats.RangeDeletionsBytesEstimate field is set to the estimate
@@ -1129,11 +1160,7 @@ func pickCompactionSeedFile(
11291160
}
11301161
outputFile = outputIter.Next()
11311162
}
1132-
1133-
// If the input level file or one of the overlapping files is
1134-
// compacting, we're not going to be able to compact this file
1135-
// anyways, so skip it.
1136-
if compacting {
1163+
if skip {
11371164
continue
11381165
}
11391166

@@ -1312,7 +1339,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
13121339

13131340
// info.level > 0
13141341
var ok bool
1315-
info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
1342+
info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum, env.problemSpans)
13161343
if !ok {
13171344
continue
13181345
}
@@ -1504,9 +1531,7 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
15041531
return nil
15051532
}
15061533

1507-
if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
1508-
// TODO(radu): do we expect this to happen? (it does seem to happen if I add
1509-
// a log here).
1534+
if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel, env.problemSpans) {
15101535
return nil
15111536
}
15121537

@@ -1639,8 +1664,7 @@ func pickAutoLPositive(
16391664
}
16401665
}
16411666

1642-
if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1643-
opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
1667+
if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel, env.problemSpans) {
16441668
return nil
16451669
}
16461670
return pc.maybeAddLevel(opts, env.diskAvailBytes)
@@ -1781,10 +1805,10 @@ func pickL0(
17811805
//
17821806
// TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
17831807
// has been shown to not cause a performance regression.
1784-
lcf := l0Organizer.PickBaseCompaction(opts.Logger, 1, vers.Levels[baseLevel].Slice())
1808+
lcf := l0Organizer.PickBaseCompaction(opts.Logger, 1, vers.Levels[baseLevel].Slice(), baseLevel, env.problemSpans)
17851809
if lcf != nil {
17861810
pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, baseLevel, true)
1787-
if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1811+
if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel, env.problemSpans) {
17881812
if pc.startLevel.files.Empty() {
17891813
opts.Logger.Errorf("%v", base.AssertionFailedf("empty compaction chosen"))
17901814
}
@@ -1798,10 +1822,10 @@ func pickL0(
17981822
// compaction. Note that we pass in L0CompactionThreshold here as opposed to
17991823
// 1, since choosing a single sublevel intra-L0 compaction is
18001824
// counterproductive.
1801-
lcf = l0Organizer.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
1825+
lcf = l0Organizer.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count, env.problemSpans)
18021826
if lcf != nil {
18031827
pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, 0, false)
1804-
if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1828+
if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel, env.problemSpans) {
18051829
if pc.startLevel.files.Empty() {
18061830
opts.Logger.Fatalf("empty compaction chosen")
18071831
}
@@ -1854,7 +1878,9 @@ func newPickedManualCompaction(
18541878
// Nothing to do
18551879
return nil, false
18561880
}
1857-
if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1881+
// We use nil problemSpans because we don't want problem spans to prevent
1882+
// manual compactions.
1883+
if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel, nil /* problemSpans */) {
18581884
// setupInputs returned false indicating there's a conflicting
18591885
// concurrent compaction.
18601886
return nil, true
@@ -1899,7 +1925,7 @@ func pickDownloadCompaction(
18991925
pc = newPickedCompaction(opts, vers, l0Organizer, level, level, baseLevel)
19001926
pc.kind = kind
19011927
pc.startLevel.files = manifest.NewLevelSliceKeySorted(opts.Comparer.Compare, []*tableMetadata{file})
1902-
if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1928+
if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel, nil /* problemSpans */) {
19031929
// setupInputs returned false indicating there's a conflicting
19041930
// concurrent compaction.
19051931
return nil
@@ -1950,7 +1976,7 @@ func pickReadTriggeredCompactionHelper(
19501976
pc = newPickedCompaction(p.opts, p.vers, p.l0Organizer, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
19511977

19521978
pc.startLevel.files = overlapSlice
1953-
if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
1979+
if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel, env.problemSpans) {
19541980
return nil
19551981
}
19561982
if inputRangeAlreadyCompacting(env, pc) {

compaction_picker_test.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/pebble/internal/base"
2222
"github.com/cockroachdb/pebble/internal/humanize"
2323
"github.com/cockroachdb/pebble/internal/manifest"
24+
"github.com/cockroachdb/pebble/internal/problemspans"
2425
"github.com/cockroachdb/pebble/internal/testkeys"
2526
"github.com/cockroachdb/pebble/vfs"
2627
"github.com/stretchr/testify/require"
@@ -1108,7 +1109,7 @@ func TestPickedCompactionSetupInputs(t *testing.T) {
11081109
)
11091110

11101111
var isCompacting bool
1111-
if !pc.setupInputs(opts, availBytes, pc.startLevel) {
1112+
if !pc.setupInputs(opts, availBytes, pc.startLevel, nil /* problemSpans */) {
11121113
isCompacting = true
11131114
}
11141115
origPC := pc
@@ -1410,9 +1411,12 @@ func TestCompactionPickerPickFile(t *testing.T) {
14101411
}
14111412
}()
14121413

1414+
var problemSpans *problemspans.ByLevel
1415+
14131416
datadriven.RunTest(t, "testdata/compaction_picker_pick_file", func(t *testing.T, td *datadriven.TestData) string {
14141417
switch td.Cmd {
14151418
case "define":
1419+
problemSpans = nil
14161420
require.NoError(t, d.Close())
14171421

14181422
d, err = runDBDefineCmd(td, opts)
@@ -1445,6 +1449,21 @@ func TestCompactionPickerPickFile(t *testing.T) {
14451449
d.mu.Unlock()
14461450
return s
14471451

1452+
case "problem-spans":
1453+
problemSpans = &problemspans.ByLevel{}
1454+
problemSpans.Init(manifest.NumLevels, opts.Comparer.Compare)
1455+
for _, line := range crstrings.Lines(td.Input) {
1456+
var level int
1457+
var span1, span2 string
1458+
n, err := fmt.Sscanf(line, "L%d %s %s", &level, &span1, &span2)
1459+
if err != nil || n != 3 {
1460+
td.Fatalf(t, "malformed problem span %q", line)
1461+
}
1462+
bounds := base.ParseUserKeyBounds(span1 + " " + span2)
1463+
problemSpans.Add(level, bounds, time.Hour*10)
1464+
}
1465+
return ""
1466+
14481467
case "pick-file":
14491468
s := strings.TrimPrefix(td.CmdArgs[0].String(), "L")
14501469
level, err := strconv.Atoi(s)
@@ -1471,7 +1490,7 @@ func TestCompactionPickerPickFile(t *testing.T) {
14711490
return
14721491
}
14731492
p := d.mu.versions.picker.(*compactionPickerByScore)
1474-
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum)
1493+
lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum, problemSpans)
14751494
}()
14761495
if !ok {
14771496
return "(none)"

0 commit comments

Comments
 (0)