Skip to content

stateloader: untangle SynthesizeRaftState #147469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ var (
ppFunc: raftLogKeyPrint,
psFunc: raftLogKeyParse,
},
{name: "RaftReplicaID", suffix: LocalRaftReplicaIDSuffix},
{name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateSuffix},
{name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix},
{name: "RangeLease", suffix: LocalRangeLeaseSuffix},
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,11 +1460,14 @@ func splitTriggerHelper(
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load replica version")
}
*h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
if *h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease,
*gcThreshold, *gcHint, replicaVersion,
)
if err != nil {
); err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
}
// TODO(arulajmani): remove WriteInitialTruncState.
if err := stateloader.WriteInitialTruncState(ctx, batch, split.RightDesc.RangeID); err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
}
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,13 @@ func (sl StateLoader) SetHardState(
// SynthesizeHardState synthesizes an on-disk HardState from the given input,
// taking care that a HardState compatible with the existing data is written.
func (sl StateLoader) SynthesizeHardState(
ctx context.Context,
writer storage.Writer,
oldHS raftpb.HardState,
truncState kvserverpb.RaftTruncatedState,
raftAppliedIndex kvpb.RaftIndex,
ctx context.Context, writer storage.Writer, oldHS raftpb.HardState, applied EntryID,
) error {
newHS := raftpb.HardState{
Term: uint64(truncState.Term),
// Note that when applying a Raft snapshot, the applied index is
// equal to the Commit index represented by the snapshot.
Commit: uint64(raftAppliedIndex),
Term: uint64(applied.Term),
// NB: when applying a Raft snapshot, the applied index is equal to the
// Commit index represented by the snapshot.
Commit: uint64(applied.Index),
}

if oldHS.Commit > newHS.Commit {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/print/debug_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ func tryRangeIDKey(kv storage.MVCCKeyValue) (string, error) {
case bytes.Equal(suffix, keys.LocalRaftTruncatedStateSuffix):
msg = &kvserverpb.RaftTruncatedState{}

case bytes.Equal(suffix, keys.LocalRangeGCHintSuffix):
msg = &roachpb.GCHint{}

case bytes.Equal(suffix, keys.LocalRangeLeaseSuffix):
msg = &roachpb.Lease{}

Expand All @@ -449,6 +452,9 @@ func tryRangeIDKey(kv storage.MVCCKeyValue) (string, error) {
case bytes.Equal(suffix, keys.LocalRaftLogSuffix):
return tryRaftLogEntry(kv)

case bytes.Equal(suffix, keys.LocalRaftReplicaIDSuffix):
msg = &kvserverpb.RaftReplicaID{}

case bytes.Equal(suffix, keys.LocalRangeLastReplicaGCTimestampSuffix):
msg = &hlc.Timestamp{}

Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/stateloader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/logstore",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/storage/enginepb",
Expand All @@ -30,15 +31,19 @@ go_test(
"initial_test.go",
"stateloader_test.go",
],
data = glob(["testdata/**"]),
embed = [":stateloader"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/logstore",
"//pkg/kv/kvserver/print",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/echotest",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"@com_github_stretchr_testify//require",
],
Expand Down
61 changes: 41 additions & 20 deletions pkg/kv/kvserver/stateloader/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -56,13 +58,9 @@ func WriteInitialReplicaState(
gcHint roachpb.GCHint,
replicaVersion roachpb.Version,
) (enginepb.MVCCStats, error) {
truncState := &kvserverpb.RaftTruncatedState{
Term: RaftInitialLogTerm,
Index: RaftInitialLogIndex,
}
s := kvserverpb.ReplicaState{
RaftAppliedIndex: truncState.Index,
RaftAppliedIndexTerm: truncState.Term,
RaftAppliedIndex: RaftInitialLogIndex,
RaftAppliedIndexTerm: RaftInitialLogTerm,
LeaseAppliedIndex: InitialLeaseAppliedIndex,
Desc: &roachpb.RangeDescriptor{
RangeID: desc.RangeID,
Expand Down Expand Up @@ -101,11 +99,6 @@ func WriteInitialReplicaState(
log.Fatalf(ctx, "expected trivial version, but found %+v", existingVersion)
}

// TODO(sep-raft-log): SetRaftTruncatedState will be in a separate batch when
// the Raft log engine is separated. Figure out the ordering required here.
if err := rsl.SetRaftTruncatedState(ctx, readWriter, truncState); err != nil {
return enginepb.MVCCStats{}, err
}
newMS, err := rsl.Save(ctx, readWriter, s)
if err != nil {
return enginepb.MVCCStats{}, err
Expand All @@ -114,6 +107,16 @@ func WriteInitialReplicaState(
return newMS, nil
}

// WriteInitialTruncState writes the initial RaftTruncatedState.
// TODO(arulajmani): remove this.
func WriteInitialTruncState(ctx context.Context, w storage.Writer, rangeID roachpb.RangeID) error {
return logstore.NewStateLoader(rangeID).SetRaftTruncatedState(ctx, w,
&kvserverpb.RaftTruncatedState{
Index: RaftInitialLogIndex,
Term: RaftInitialLogTerm,
})
}

// WriteInitialRangeState writes the initial range state. It's called during
// bootstrap.
func WriteInitialRangeState(
Expand All @@ -134,17 +137,35 @@ func WriteInitialRangeState(
); err != nil {
return err
}

// TODO(sep-raft-log): when the log storage is separated, the below can't be
// written in the same batch. Figure out the ordering required here.
sl := Make(desc.RangeID)
if err := sl.SynthesizeRaftState(ctx, readWriter); err != nil {
return err
}
// Maintain the invariant that any replica (uninitialized or initialized),
// with persistent state, has a RaftReplicaID.
if err := sl.SetRaftReplicaID(ctx, readWriter, replicaID); err != nil {
if err := Make(desc.RangeID).SetRaftReplicaID(ctx, readWriter, replicaID); err != nil {
return err
}
return nil

// TODO(sep-raft-log): when the log storage is separated, raft state must be
// written separately.
return WriteInitialRaftState(ctx, readWriter, desc.RangeID)
}

// WriteInitialRaftState writes raft state for an initialized replica created
// during cluster bootstrap.
func WriteInitialRaftState(
ctx context.Context, writer storage.Writer, rangeID roachpb.RangeID,
) error {
sl := logstore.NewStateLoader(rangeID)
// Initialize the HardState with the term and commit index matching the
// initial applied state of the replica.
if err := sl.SetHardState(ctx, writer, raftpb.HardState{
Term: RaftInitialLogTerm,
Commit: RaftInitialLogIndex,
}); err != nil {
return err
}
// The raft log is initialized empty, with the truncated state matching the
// committed / applied initial state of the replica.
return sl.SetRaftTruncatedState(ctx, writer, &kvserverpb.RaftTruncatedState{
Index: RaftInitialLogIndex,
Term: RaftInitialLogTerm,
})
}
61 changes: 47 additions & 14 deletions pkg/kv/kvserver/stateloader/initial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,53 @@ package stateloader

import (
"context"
"path/filepath"
"reflect"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/print"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

// TestWriteInitialRangeState captures the typical initial range state written
// to storage at cluster bootstrap.
func TestWriteInitialRangeState(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
b := eng.NewBatch() // TODO(pav-kv): make it write-only batch
defer b.Close()

require.NoError(t, WriteInitialRangeState(context.Background(), b,
roachpb.RangeDescriptor{
RangeID: 5,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("z"),
NextReplicaID: 4,
},
roachpb.ReplicaID(3),
// Use arbitrary version instead of things like clusterversion.Latest, so
// that the test doesn't sporadically fail when version bumps occur.
roachpb.Version{Major: 10, Minor: 2, Patch: 17},
))

str, err := print.DecodeWriteBatch(b.Repr())
require.NoError(t, err)
echotest.Require(t, str, filepath.Join("testdata", t.Name()+".txt"))
}

func TestSynthesizeHardState(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
Expand All @@ -30,20 +64,20 @@ func TestSynthesizeHardState(t *testing.T) {
tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4, Lead: 5, LeadEpoch: 6}

testCases := []struct {
TruncTerm kvpb.RaftTerm
RaftAppliedIndex kvpb.RaftIndex
OldHS *raftpb.HardState
NewHS raftpb.HardState
Err string
AppliedTerm kvpb.RaftTerm
AppliedIndex kvpb.RaftIndex
OldHS *raftpb.HardState
NewHS raftpb.HardState
Err string
}{
{OldHS: nil, TruncTerm: 42, RaftAppliedIndex: 24, NewHS: raftpb.HardState{Term: 42, Vote: 0, Commit: 24}},
{OldHS: nil, AppliedTerm: 42, AppliedIndex: 24, NewHS: raftpb.HardState{Term: 42, Vote: 0, Commit: 24}},
// Can't wind back the committed index of the new HardState.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit - 1), Err: "can't decrease HardState.Commit"},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), NewHS: tHS},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1, Lead: 5, LeadEpoch: 6}},
{OldHS: &tHS, AppliedIndex: kvpb.RaftIndex(tHS.Commit - 1), Err: "can't decrease HardState.Commit"},
{OldHS: &tHS, AppliedIndex: kvpb.RaftIndex(tHS.Commit), NewHS: tHS},
{OldHS: &tHS, AppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1, Lead: 5, LeadEpoch: 6}},
// Higher Term is picked up, but Vote, Lead, and LeadEpoch aren't carried
// over when the term changes.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit, Lead: 0}},
{OldHS: &tHS, AppliedIndex: kvpb.RaftIndex(tHS.Commit), AppliedTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit, Lead: 0}},
}

for i, test := range testCases {
Expand All @@ -63,9 +97,8 @@ func TestSynthesizeHardState(t *testing.T) {
t.Fatal(err)
}

err = rsl.SynthesizeHardState(
context.Background(), batch, oldHS, kvserverpb.RaftTruncatedState{Term: test.TruncTerm}, test.RaftAppliedIndex,
)
err = rsl.SynthesizeHardState(context.Background(), batch, oldHS,
logstore.EntryID{Index: test.AppliedIndex, Term: test.AppliedTerm})
if !testutils.IsError(err, test.Err) {
t.Fatalf("%d: expected %q got %v", i, test.Err, err)
} else if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,24 +442,28 @@ func UninitializedReplicaState(rangeID roachpb.RangeID) kvserverpb.ReplicaState

// The rest is not technically part of ReplicaState.

// SynthesizeRaftState creates a Raft state which synthesizes both a HardState
// and a lastIndex from pre-seeded data in the engine (typically created via
// WriteInitialReplicaState and, on a split, perhaps the activity of an
// uninitialized Raft group)
// SynthesizeRaftState creates a Raft state which synthesizes HardState from
// pre-seeded data in the engine: the state machine state created by
// WriteInitialReplicaState on a split, and the existing HardState of an
// uninitialized replica.
//
// TODO(sep-raft-log): this is now only used in splits, when initializing a
// replica. Make the implementation straightforward, most of the stuff here is
// constant except the existing HardState.
func (rsl StateLoader) SynthesizeRaftState(
ctx context.Context, readWriter storage.ReadWriter,
) error {
hs, err := rsl.LoadHardState(ctx, readWriter)
if err != nil {
return err
}
truncState, err := rsl.LoadRaftTruncatedState(ctx, readWriter)
if err != nil {
return err
}
as, err := rsl.LoadRangeAppliedState(ctx, readWriter)
if err != nil {
return err
}
return rsl.SynthesizeHardState(ctx, readWriter, hs, truncState, as.RaftAppliedIndex)
applied := logstore.EntryID{
Index: as.RaftAppliedIndex,
Term: as.RaftAppliedIndexTerm,
}
return rsl.SynthesizeHardState(ctx, readWriter, hs, applied)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
echo
----
Put: 0,0 /Local/RangeID/5/r/RangeLease (0x01698d72726c6c2d00): <empty>
Put: 0,0 /Local/RangeID/5/r/RangeGCThreshold (0x01698d726c67632d00): 0,0
Put: 0,0 /Local/RangeID/5/r/RangeGCHint (0x01698d727267636800): latest_range_delete_timestamp:<> gc_timestamp:<> gc_timestamp_next:<>
Put: 0,0 /Local/RangeID/5/r/RangeVersion (0x01698d727276657200): 10.2
Put: 0,0 /Local/RangeID/5/r/RangeAppliedState (0x01698d727261736b00): raft_applied_index:10 lease_applied_index:10 range_stats:<sys_bytes:142 sys_count:4 > raft_closed_timestamp:<> raft_applied_index_term:5
Put: 0,0 /Local/RangeID/5/u/RaftReplicaID (0x01698d757266747200): replica_id:3
Put: 0,0 /Local/RangeID/5/u/RaftHardState (0x01698d757266746800): term:5 vote:0 commit:10 lead:0 lead_epoch:0
Put: 0,0 /Local/RangeID/5/u/RaftTruncatedState (0x01698d757266747400): index:10 term:5