Skip to content

record: add sync offsets to manifest writer #4617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (d *DB) writeCheckpointManifest(
// append a record after a raw data copy; see
// https://github.com/cockroachdb/cockroach/issues/100935).
r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
w := record.NewWriter(dst)
w := record.NewWriter(dst, manifestFileNum, d.FormatMajorVersion() >= FormatManifestSyncChunks)
for {
rr, err := r.Next()
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ const (
// once stable.
FormatExperimentalValueSeparation

// FormatManifestSyncChunks is a format major version enabling the writing of
// sync offset chunks for Manifest files. See comment for FormatWALSyncChunks.
FormatManifestSyncChunks

// -- Add new versions here --

// FormatNewest is the most recent format major version.
Expand Down Expand Up @@ -266,7 +270,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
return sstable.TableFormatPebblev4
case FormatColumnarBlocks, FormatWALSyncChunks:
return sstable.TableFormatPebblev5
case FormatTableFormatV6, FormatExperimentalValueSeparation:
case FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks:
return sstable.TableFormatPebblev6
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand All @@ -280,7 +284,7 @@ func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat {
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted,
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
FormatFlushableIngestExcises, FormatColumnarBlocks, FormatWALSyncChunks,
FormatTableFormatV6, FormatExperimentalValueSeparation:
FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks:
return sstable.TableFormatPebblev1
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand Down Expand Up @@ -332,6 +336,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatExperimentalValueSeparation: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatExperimentalValueSeparation)
},
FormatManifestSyncChunks: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatManifestSyncChunks)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
6 changes: 5 additions & 1 deletion format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) {
require.Equal(t, FormatWALSyncChunks, FormatMajorVersion(20))
require.Equal(t, FormatTableFormatV6, FormatMajorVersion(21))
require.Equal(t, FormatExperimentalValueSeparation, FormatMajorVersion(22))
require.Equal(t, FormatManifestSyncChunks, FormatMajorVersion(23))

// When we add a new version, we should add a check for the new version in
// addition to updating these expected values.
require.Equal(t, FormatNewest, FormatMajorVersion(21))
require.Equal(t, internalFormatNewest, FormatMajorVersion(22))
require.Equal(t, internalFormatNewest, FormatMajorVersion(23))
}

func TestFormatMajorVersion_MigrationDefined(t *testing.T) {
Expand Down Expand Up @@ -70,6 +71,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatTableFormatV6, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatExperimentalValueSeparation))
require.Equal(t, FormatExperimentalValueSeparation, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatManifestSyncChunks))
require.Equal(t, FormatManifestSyncChunks, d.FormatMajorVersion())

require.NoError(t, d.Close())

Expand Down Expand Up @@ -230,6 +233,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
FormatWALSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev5},
FormatTableFormatV6: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
FormatExperimentalValueSeparation: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
FormatManifestSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
}

// Valid versions.
Expand Down
2 changes: 1 addition & 1 deletion objstorage/objstorageprovider/remoteobjcat/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
if err != nil {
return err
}
recWriter := record.NewWriter(file)
recWriter := record.NewWriter(file, 0, false)
err = func() error {
// Create a VersionEdit that gets us from an empty catalog to the current state.
var ve VersionEdit
Expand Down
2 changes: 1 addition & 1 deletion open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestNewDBFilenames(t *testing.T) {
"LOCK",
"MANIFEST-000001",
"OPTIONS-000003",
"marker.format-version.000009.022",
"marker.format-version.000010.023",
"marker.manifest.000001.MANIFEST-000001",
},
}
Expand Down
55 changes: 49 additions & 6 deletions record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,22 @@ type Writer struct {
err error
// buf is the buffer.
buf [blockSize]byte

// logNum is the log number of the log.
logNum base.DiskFileNum

// fillHeader fills in the header for the pending chunk.
// Its implementation is decided at runtime and is decided
// by whether or not the log is writing sync offsets or not.
fillHeader func(bool)

// headerSize is the size of the type of header that the writer
// is writing. It can either be legacyHeaderSize or walSyncHeaderSize.
headerSize int
}

// NewWriter returns a new Writer.
func NewWriter(w io.Writer) *Writer {
func NewWriter(w io.Writer, logNum base.DiskFileNum, writingSyncOffsets bool) *Writer {
f, _ := w.(flusher)

var o int64
Expand All @@ -705,16 +717,24 @@ func NewWriter(w io.Writer) *Writer {
o = 0
}
}
return &Writer{
wr := &Writer{
w: w,
f: f,
baseOffset: o,
lastRecordOffset: -1,
logNum: logNum,
}
if writingSyncOffsets {
wr.fillHeader = wr.fillHeaderSyncOffsets
wr.headerSize = walSyncHeaderSize
} else {
wr.fillHeader = wr.fillHeaderLegacy
wr.headerSize = legacyHeaderSize
}
return wr
}

// fillHeader fills in the header for the pending chunk.
func (w *Writer) fillHeader(last bool) {
func (w *Writer) fillHeaderLegacy(last bool) {
if w.i+legacyHeaderSize > w.j || w.j > blockSize {
panic("pebble/record: bad writer state")
}
Expand All @@ -735,12 +755,35 @@ func (w *Writer) fillHeader(last bool) {
binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-legacyHeaderSize))
}

func (w *Writer) fillHeaderSyncOffsets(last bool) {
if w.i+walSyncHeaderSize > w.j || w.j > blockSize {
panic("pebble/record: bad writer state")
}
if last {
if w.first {
w.buf[w.i+6] = walSyncFullChunkEncoding
} else {
w.buf[w.i+6] = walSyncLastChunkEncoding
}
} else {
if w.first {
w.buf[w.i+6] = walSyncFirstChunkEncoding
} else {
w.buf[w.i+6] = walSyncMiddleChunkEncoding
}
}
binary.LittleEndian.PutUint32(w.buf[w.i+7:w.i+11], uint32(w.logNum))
binary.LittleEndian.PutUint64(w.buf[w.i+11:w.i+19], uint64(w.lastRecordOffset)+uint64(w.written))
binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], crc.New(w.buf[w.i+6:w.j]).Value())
binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-walSyncHeaderSize))
}

// writeBlock writes the buffered block to the underlying writer, and reserves
// space for the next chunk's header.
func (w *Writer) writeBlock() {
_, w.err = w.w.Write(w.buf[w.written:])
w.i = 0
w.j = legacyHeaderSize
w.j = w.headerSize
w.written = 0
w.blockNumber++
}
Expand Down Expand Up @@ -796,7 +839,7 @@ func (w *Writer) Next() (io.Writer, error) {
w.fillHeader(true)
}
w.i = w.j
w.j = w.j + legacyHeaderSize
w.j = w.j + w.headerSize
// Check if there is room in the block for the header.
if w.j > blockSize {
// Fill in the rest of the block with zeroes.
Expand Down
49 changes: 42 additions & 7 deletions record/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func testGeneratorWriter(
func testGenerator(t *testing.T, reset func(), gen func() (string, bool)) {
t.Run("Writer", func(t *testing.T) {
testGeneratorWriter(t, reset, gen, func(w io.Writer) recordWriter {
return NewWriter(w)
return NewWriter(w, 0, false)
})
})

Expand Down Expand Up @@ -175,7 +175,7 @@ func TestBoundary(t *testing.T) {

func TestFlush(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
w := NewWriter(buf, 0, false)
// Write a couple of records. Everything should still be held
// in the record.Writer buffer, so that buf.Len should be 0.
w0, _ := w.Next()
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestNonExhaustiveRead(t *testing.T) {
p := make([]byte, 10)
rnd := rand.New(rand.NewPCG(0, 1))

w := NewWriter(buf)
w := NewWriter(buf, 0, false)
for i := 0; i < n; i++ {
length := len(p) + rnd.IntN(3*blockSize)
s := string(uint8(i)) + "123456789abcdefgh"
Expand All @@ -267,7 +267,7 @@ func TestNonExhaustiveRead(t *testing.T) {
func TestStaleReader(t *testing.T) {
buf := new(bytes.Buffer)

w := NewWriter(buf)
w := NewWriter(buf, 0, false)
_, err := w.WriteRecord([]byte("0"))
require.NoError(t, err)

Expand Down Expand Up @@ -313,7 +313,7 @@ func makeTestRecords(recordLengths ...int) (*testRecords, error) {
}

buf := new(bytes.Buffer)
w := NewWriter(buf)
w := NewWriter(buf, base.DiskFileNum(0), false)
for i, rec := range ret.records {
wRec, err := w.Next()
if err != nil {
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestLastRecordOffset(t *testing.T) {

func TestNoLastRecordOffset(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf)
w := NewWriter(buf, 0, false)
defer w.Close()

if _, err := w.LastRecordOffset(); err != ErrNoLastRecord {
Expand Down Expand Up @@ -682,7 +682,7 @@ func TestInvalidLogNum(t *testing.T) {
func TestSize(t *testing.T) {
var buf bytes.Buffer
zeroes := make([]byte, 8<<10)
w := NewWriter(&buf)
w := NewWriter(&buf, 0, false)
for i := 0; i < 100; i++ {
n := rand.IntN(len(zeroes))
_, err := w.WriteRecord(zeroes[:n])
Expand Down Expand Up @@ -1099,6 +1099,41 @@ func describeWALSyncBlocks(
f.ToTreePrinter(n)
}

func TestManifestSyncOffset(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(buf, 0, true)
w.WriteRecord(bytes.Repeat([]byte{1}, blockSize-walSyncHeaderSize))
w.WriteRecord(bytes.Repeat([]byte{2}, blockSize-walSyncHeaderSize))

raw := buf.Bytes()
r := NewReader(bytes.NewReader(raw), 0)
r.loggerForTesting = &readerLogger{}
for {
_, err := r.Next()
if err != nil {
require.True(t, errors.Is(err, io.EOF))
require.True(t, r.loggerForTesting.(*readerLogger).getLog() == "")
break
}
}

// Check that corrupting a chunk should result in us reading ahead and returning
// an ErrInvalidChunk.
raw[0] ^= 0xFF
r = NewReader(bytes.NewReader(raw), 0)
r.loggerForTesting = &readerLogger{}
for {
_, err := r.Next()
if err != nil {
require.True(t, errors.Is(err, ErrInvalidChunk))
logStr := r.loggerForTesting.(*readerLogger).getLog()
require.True(t, logStr != "")
println(logStr)
break
}
}
}

func BenchmarkRecordWrite(b *testing.B) {
for _, size := range []int{8, 16, 32, 64, 256, 1028, 4096, 65_536} {
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {
Expand Down
Loading