Skip to content

Commit b5d087d

Browse files
committed
record: add sync offsets to manifest writer
Add sync offsets to manifest writer to enable read ahead and detect corruption.
1 parent 57c482f commit b5d087d

12 files changed

+189
-78
lines changed

checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ func (d *DB) writeCheckpointManifest(
494494
// append a record after a raw data copy; see
495495
// https://github.com/cockroachdb/cockroach/issues/100935).
496496
r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
497-
w := record.NewWriter(dst)
497+
w := record.NewWriter(dst, manifestFileNum, d.FormatMajorVersion() >= FormatManifestSyncChunks)
498498
for {
499499
rr, err := r.Next()
500500
if err != nil {

format_major_version.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,10 @@ const (
226226
// once stable.
227227
FormatExperimentalValueSeparation
228228

229+
// FormatManifestSyncChunks is a format major version enabling the writing of
230+
// sync offset chunks for Manifest files. See comment for FormatWALSyncChunks.
231+
FormatManifestSyncChunks
232+
229233
// -- Add new versions here --
230234

231235
// FormatNewest is the most recent format major version.
@@ -266,7 +270,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
266270
return sstable.TableFormatPebblev4
267271
case FormatColumnarBlocks, FormatWALSyncChunks:
268272
return sstable.TableFormatPebblev5
269-
case FormatTableFormatV6, FormatExperimentalValueSeparation:
273+
case FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks:
270274
return sstable.TableFormatPebblev6
271275
default:
272276
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
@@ -280,7 +284,7 @@ func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat {
280284
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted,
281285
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
282286
FormatFlushableIngestExcises, FormatColumnarBlocks, FormatWALSyncChunks,
283-
FormatTableFormatV6, FormatExperimentalValueSeparation:
287+
FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks:
284288
return sstable.TableFormatPebblev1
285289
default:
286290
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
@@ -332,6 +336,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
332336
FormatExperimentalValueSeparation: func(d *DB) error {
333337
return d.finalizeFormatVersUpgrade(FormatExperimentalValueSeparation)
334338
},
339+
FormatManifestSyncChunks: func(d *DB) error {
340+
return d.finalizeFormatVersUpgrade(FormatManifestSyncChunks)
341+
},
335342
}
336343

337344
const formatVersionMarkerName = `format-version`

format_major_version_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) {
2929
require.Equal(t, FormatWALSyncChunks, FormatMajorVersion(20))
3030
require.Equal(t, FormatTableFormatV6, FormatMajorVersion(21))
3131
require.Equal(t, FormatExperimentalValueSeparation, FormatMajorVersion(22))
32+
require.Equal(t, FormatManifestSyncChunks, FormatMajorVersion(23))
3233

3334
// When we add a new version, we should add a check for the new version in
3435
// addition to updating these expected values.
3536
require.Equal(t, FormatNewest, FormatMajorVersion(21))
36-
require.Equal(t, internalFormatNewest, FormatMajorVersion(22))
37+
require.Equal(t, internalFormatNewest, FormatMajorVersion(23))
3738
}
3839

3940
func TestFormatMajorVersion_MigrationDefined(t *testing.T) {
@@ -70,6 +71,8 @@ func TestRatchetFormat(t *testing.T) {
7071
require.Equal(t, FormatTableFormatV6, d.FormatMajorVersion())
7172
require.NoError(t, d.RatchetFormatMajorVersion(FormatExperimentalValueSeparation))
7273
require.Equal(t, FormatExperimentalValueSeparation, d.FormatMajorVersion())
74+
require.NoError(t, d.RatchetFormatMajorVersion(FormatManifestSyncChunks))
75+
require.Equal(t, FormatManifestSyncChunks, d.FormatMajorVersion())
7376

7477
require.NoError(t, d.Close())
7578

@@ -230,6 +233,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
230233
FormatWALSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev5},
231234
FormatTableFormatV6: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
232235
FormatExperimentalValueSeparation: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
236+
FormatManifestSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
233237
}
234238

235239
// Valid versions.

objstorage/objstorageprovider/remoteobjcat/catalog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
339339
if err != nil {
340340
return err
341341
}
342-
recWriter := record.NewWriter(file)
342+
recWriter := record.NewWriter(file, 0, false)
343343
err = func() error {
344344
// Create a VersionEdit that gets us from an empty catalog to the current state.
345345
var ve VersionEdit

open_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func TestNewDBFilenames(t *testing.T) {
333333
"LOCK",
334334
"MANIFEST-000001",
335335
"OPTIONS-000003",
336-
"marker.format-version.000009.022",
336+
"marker.format-version.000010.023",
337337
"marker.manifest.000001.MANIFEST-000001",
338338
},
339339
}

record/record.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -678,10 +678,22 @@ type Writer struct {
678678
err error
679679
// buf is the buffer.
680680
buf [blockSize]byte
681+
682+
// logNum is the log number of the log.
683+
logNum base.DiskFileNum
684+
685+
// fillHeader fills in the header for the pending chunk.
686+
// Its implementation is decided at runtime and is decided
687+
// by whether or not the log is writing sync offsets or not.
688+
fillHeader func(bool)
689+
690+
// headerSize is the size of the type of header that the writer
691+
// is writing. It can either be legacyHeaderSize or walSyncHeaderSize.
692+
headerSize int
681693
}
682694

683695
// NewWriter returns a new Writer.
684-
func NewWriter(w io.Writer) *Writer {
696+
func NewWriter(w io.Writer, logNum base.DiskFileNum, writingSyncOffsets bool) *Writer {
685697
f, _ := w.(flusher)
686698

687699
var o int64
@@ -691,16 +703,24 @@ func NewWriter(w io.Writer) *Writer {
691703
o = 0
692704
}
693705
}
694-
return &Writer{
706+
wr := &Writer{
695707
w: w,
696708
f: f,
697709
baseOffset: o,
698710
lastRecordOffset: -1,
711+
logNum: logNum,
712+
}
713+
if writingSyncOffsets {
714+
wr.fillHeader = wr.fillHeaderSyncOffsets
715+
wr.headerSize = walSyncHeaderSize
716+
} else {
717+
wr.fillHeader = wr.fillHeaderLegacy
718+
wr.headerSize = legacyHeaderSize
699719
}
720+
return wr
700721
}
701722

702-
// fillHeader fills in the header for the pending chunk.
703-
func (w *Writer) fillHeader(last bool) {
723+
func (w *Writer) fillHeaderLegacy(last bool) {
704724
if w.i+legacyHeaderSize > w.j || w.j > blockSize {
705725
panic("pebble/record: bad writer state")
706726
}
@@ -721,12 +741,35 @@ func (w *Writer) fillHeader(last bool) {
721741
binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-legacyHeaderSize))
722742
}
723743

744+
func (w *Writer) fillHeaderSyncOffsets(last bool) {
745+
if w.i+walSyncHeaderSize > w.j || w.j > blockSize {
746+
panic("pebble/record: bad writer state")
747+
}
748+
if last {
749+
if w.first {
750+
w.buf[w.i+6] = walSyncFullChunkEncoding
751+
} else {
752+
w.buf[w.i+6] = walSyncLastChunkEncoding
753+
}
754+
} else {
755+
if w.first {
756+
w.buf[w.i+6] = walSyncFirstChunkEncoding
757+
} else {
758+
w.buf[w.i+6] = walSyncMiddleChunkEncoding
759+
}
760+
}
761+
binary.LittleEndian.PutUint32(w.buf[w.i+7:w.i+11], uint32(w.logNum))
762+
binary.LittleEndian.PutUint64(w.buf[w.i+11:w.i+19], uint64(w.lastRecordOffset)+uint64(w.written))
763+
binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], crc.New(w.buf[w.i+6:w.j]).Value())
764+
binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-walSyncHeaderSize))
765+
}
766+
724767
// writeBlock writes the buffered block to the underlying writer, and reserves
725768
// space for the next chunk's header.
726769
func (w *Writer) writeBlock() {
727770
_, w.err = w.w.Write(w.buf[w.written:])
728771
w.i = 0
729-
w.j = legacyHeaderSize
772+
w.j = w.headerSize
730773
w.written = 0
731774
w.blockNumber++
732775
}
@@ -782,7 +825,7 @@ func (w *Writer) Next() (io.Writer, error) {
782825
w.fillHeader(true)
783826
}
784827
w.i = w.j
785-
w.j = w.j + legacyHeaderSize
828+
w.j = w.j + w.headerSize
786829
// Check if there is room in the block for the header.
787830
if w.j > blockSize {
788831
// Fill in the rest of the block with zeroes.

record/record_test.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func testGeneratorWriter(
8989
func testGenerator(t *testing.T, reset func(), gen func() (string, bool)) {
9090
t.Run("Writer", func(t *testing.T) {
9191
testGeneratorWriter(t, reset, gen, func(w io.Writer) recordWriter {
92-
return NewWriter(w)
92+
return NewWriter(w, 0, false)
9393
})
9494
})
9595

@@ -175,7 +175,7 @@ func TestBoundary(t *testing.T) {
175175

176176
func TestFlush(t *testing.T) {
177177
buf := new(bytes.Buffer)
178-
w := NewWriter(buf)
178+
w := NewWriter(buf, 0, false)
179179
// Write a couple of records. Everything should still be held
180180
// in the record.Writer buffer, so that buf.Len should be 0.
181181
w0, _ := w.Next()
@@ -240,7 +240,7 @@ func TestNonExhaustiveRead(t *testing.T) {
240240
p := make([]byte, 10)
241241
rnd := rand.New(rand.NewPCG(0, 1))
242242

243-
w := NewWriter(buf)
243+
w := NewWriter(buf, 0, false)
244244
for i := 0; i < n; i++ {
245245
length := len(p) + rnd.IntN(3*blockSize)
246246
s := string(uint8(i)) + "123456789abcdefgh"
@@ -267,7 +267,7 @@ func TestNonExhaustiveRead(t *testing.T) {
267267
func TestStaleReader(t *testing.T) {
268268
buf := new(bytes.Buffer)
269269

270-
w := NewWriter(buf)
270+
w := NewWriter(buf, 0, false)
271271
_, err := w.WriteRecord([]byte("0"))
272272
require.NoError(t, err)
273273

@@ -313,7 +313,7 @@ func makeTestRecords(recordLengths ...int) (*testRecords, error) {
313313
}
314314

315315
buf := new(bytes.Buffer)
316-
w := NewWriter(buf)
316+
w := NewWriter(buf, base.DiskFileNum(0), false)
317317
for i, rec := range ret.records {
318318
wRec, err := w.Next()
319319
if err != nil {
@@ -616,7 +616,7 @@ func TestLastRecordOffset(t *testing.T) {
616616

617617
func TestNoLastRecordOffset(t *testing.T) {
618618
buf := new(bytes.Buffer)
619-
w := NewWriter(buf)
619+
w := NewWriter(buf, 0, false)
620620
defer w.Close()
621621

622622
if _, err := w.LastRecordOffset(); err != ErrNoLastRecord {
@@ -682,7 +682,7 @@ func TestInvalidLogNum(t *testing.T) {
682682
func TestSize(t *testing.T) {
683683
var buf bytes.Buffer
684684
zeroes := make([]byte, 8<<10)
685-
w := NewWriter(&buf)
685+
w := NewWriter(&buf, 0, false)
686686
for i := 0; i < 100; i++ {
687687
n := rand.IntN(len(zeroes))
688688
_, err := w.WriteRecord(zeroes[:n])
@@ -1099,6 +1099,41 @@ func describeWALSyncBlocks(
10991099
f.ToTreePrinter(n)
11001100
}
11011101

1102+
func TestManifestSyncOffset(t *testing.T) {
1103+
buf := new(bytes.Buffer)
1104+
w := NewWriter(buf, 0, true)
1105+
w.WriteRecord(bytes.Repeat([]byte{1}, blockSize-walSyncHeaderSize))
1106+
w.WriteRecord(bytes.Repeat([]byte{2}, blockSize-walSyncHeaderSize))
1107+
1108+
raw := buf.Bytes()
1109+
r := NewReader(bytes.NewReader(raw), 0)
1110+
r.loggerForTesting = &readerLogger{}
1111+
for {
1112+
_, err := r.Next()
1113+
if err != nil {
1114+
require.True(t, errors.Is(err, io.EOF))
1115+
require.True(t, r.loggerForTesting.(*readerLogger).getLog() == "")
1116+
break
1117+
}
1118+
}
1119+
1120+
// Check that corrupting a chunk should result in us reading ahead and returning
1121+
// an ErrInvalidChunk.
1122+
raw[0] ^= 0xFF
1123+
r = NewReader(bytes.NewReader(raw), 0)
1124+
r.loggerForTesting = &readerLogger{}
1125+
for {
1126+
_, err := r.Next()
1127+
if err != nil {
1128+
require.True(t, errors.Is(err, ErrInvalidChunk))
1129+
logStr := r.loggerForTesting.(*readerLogger).getLog()
1130+
require.True(t, logStr != "")
1131+
println(logStr)
1132+
break
1133+
}
1134+
}
1135+
}
1136+
11021137
func BenchmarkRecordWrite(b *testing.B) {
11031138
for _, size := range []int{8, 16, 32, 64, 256, 1028, 4096, 65_536} {
11041139
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {

0 commit comments

Comments
 (0)