Skip to content
Draft

[WIP] #5143

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ func checkStaleCheckpointTs(
)
}
} else {
// if `isTiCDCBlockGC` is false, it means there is another service gc
// point less than the min checkpoint ts.
// If `isTiCDCBlockGC` is false, another service GC safepoint is at or
// below the min checkpoint ts. Since CDC reads from checkpointTs - 1,
// that upper bound must not be earlier than the actual GC safepoint.
if gcSafepointUpperBound < lastSafePointTs {
return errors.ErrSnapshotLostByGC.
GenWithStackByArgs(
Expand Down
22 changes: 22 additions & 0 deletions pkg/txnutil/gc/gc_manager_nextgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,25 @@ func TestTryUpdateKeyspaceGCBarrierDoesNotReturnSnapshotLost(t *testing.T) {
require.True(t, ok)
require.Equal(t, cerrors.ErrSnapshotLostByGC.RFCCode(), errCode)
}

func TestCheckStaleCheckpointTsRejectsCheckpointEqualGCBarrier(t *testing.T) {
appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test())

keyspaceID := uint32(1)
keyspaceName := "test"
pdClient := &MockPDClient{}
m := NewManager("test-service", pdClient).(*gcManager)
m.keyspaceGCBarrierInfoMap.Store(keyspaceID, &keyspaceGCBarrierInfo{
lastSafePointTs: 100,
isTiCDCBlockGC: false,
})

cfID := common.NewChangeFeedIDWithName("test-changefeed", keyspaceName)
err := m.CheckStaleCheckpointTs(keyspaceID, cfID, 100)
require.Error(t, err)
errCode, ok := cerrors.RFCCode(err)
require.True(t, ok)
require.Equal(t, cerrors.ErrSnapshotLostByGC.RFCCode(), errCode)

require.NoError(t, m.CheckStaleCheckpointTs(keyspaceID, cfID, 101))
}
18 changes: 18 additions & 0 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,24 @@ func TestTryUpdateServiceGCSafepointDoesNotReturnSnapshotLost(t *testing.T) {
require.Equal(t, cerrors.ErrSnapshotLostByGC.RFCCode(), errCode)
}

func TestCheckStaleCheckpointTsRejectsCheckpointEqualGCSafepoint(t *testing.T) {
appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test())

pdClient := &MockPDClient{}
m := NewManager("test-service", pdClient).(*gcManager)
m.lastSafePointTs.Store(100)
m.isTiCDCBlockGC.Store(false)

cfID := common.NewChangeFeedIDWithName("test-changefeed", "test")
err := m.CheckStaleCheckpointTs(0, cfID, 100)
require.Error(t, err)
errCode, ok := cerrors.RFCCode(err)
require.True(t, ok)
require.Equal(t, cerrors.ErrSnapshotLostByGC.RFCCode(), errCode)

require.NoError(t, m.CheckStaleCheckpointTs(0, cfID, 101))
}

func TestTryDeleteServiceGCSafepointClearsCachedState(t *testing.T) {
appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test())

Expand Down
37 changes: 27 additions & 10 deletions pkg/txnutil/gc/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (
EnsureGCServiceInitializing = "-initializing-"
)

// EnsureChangefeedStartTsSafety checks if the startTs less than the minimum of
// service GC safepoint and this function will update the service GC to startTs
// EnsureChangefeedStartTsSafety checks if the startTs is earlier than the
// minimum service GC safepoint. It keeps startTs from being collected by
// pinning the changefeed-level service GC safepoint to startTs - 1.
func EnsureChangefeedStartTsSafety(
ctx context.Context, pdCli GCServiceClient,
gcServiceIDPrefix string,
Expand All @@ -52,32 +53,48 @@ func EnsureChangefeedStartTsSafety(
return ensureChangefeedStartTsSafetyNextGen(ctx, pdCli, gcServiceID, keyspaceID, TTL, startTs)
}

func getGCSafepointUpperBound(startTs uint64) uint64 {
if startTs == 0 {
return 0
}
return startTs - 1
}

func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli GCServiceClient, gcServiceID string, ttl int64, startTs uint64) error {
// set gc safepoint for the changefeed gc service
minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, gcServiceID, ttl, startTs)
gcSafepointUpperBound := getGCSafepointUpperBound(startTs)
minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, gcServiceID, ttl, gcSafepointUpperBound)
if err != nil {
return errors.Trace(err)
}
log.Info("set gc safepoint for changefeed",
zap.String("gcServiceID", gcServiceID),
zap.Uint64("expectedGCSafepoint", startTs),
zap.Uint64("startTs", startTs),
zap.Uint64("expectedGCSafepoint", gcSafepointUpperBound),
zap.Uint64("actualGCSafepoint", minServiceGCTs),
zap.Int64("ttl", ttl))

// startTs should be greater than or equal to minServiceGCTs + 1, otherwise gcManager
// would return a ErrSnapshotLostByGC even though the changefeed would appear to be successfully
// created/resumed. See issue #6350 for more detail.
if startTs > 0 && startTs < minServiceGCTs+1 {
if startTs > 0 && gcSafepointUpperBound < minServiceGCTs {
return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, minServiceGCTs)
}
return nil
}

func ensureChangefeedStartTsSafetyNextGen(ctx context.Context, pdCli GCServiceClient, gcServiceID string, keyspaceID uint32, ttl int64, startTs uint64) error {
gcCli := pdCli.GetGCStatesClient(keyspaceID)
_, err := SetGCBarrier(ctx, gcCli, gcServiceID, startTs, time.Duration(ttl)*time.Second)
gcSafepointUpperBound := getGCSafepointUpperBound(startTs)
_, err := SetGCBarrier(ctx, gcCli, gcServiceID, gcSafepointUpperBound, time.Duration(ttl)*time.Second)
if err != nil {
return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs)
if !errors.IsGCBarrierTSBehindTxnSafePointError(err) {
return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, gcSafepointUpperBound)
}
gcSafepoint, getErr := UnifyGetServiceGCSafepoint(ctx, pdCli, keyspaceID, gcServiceID)
if getErr != nil {
return getErr
}
if startTs > 0 && gcSafepointUpperBound < gcSafepoint {
return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, gcSafepoint)
}
}
Comment on lines 87 to 98

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

In next-gen mode, if SetGCBarrier fails because gcSafepointUpperBound (startTs - 1) is behind the current GC safepoint, and startTs == gcSafepoint, the current implementation simply returns nil (success) without setting any barrier. This leaves the changefeed unprotected, and a subsequent GC run could advance the safepoint past startTs before the changefeed starts and sets its own barrier.

To prevent this potential data loss, we should explicitly set the GC barrier to startTs when startTs == gcSafepoint.

	if err != nil {
		if !errors.IsGCBarrierTSBehindTxnSafePointError(err) {
			return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, gcSafepointUpperBound)
		}
		gcSafepoint, getErr := UnifyGetServiceGCSafepoint(ctx, pdCli, keyspaceID, gcServiceID)
		if getErr != nil {
			return getErr
		}
		if startTs > 0 && startTs < gcSafepoint {
			return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, gcSafepoint)
		}
		if startTs == gcSafepoint {
			_, err = SetGCBarrier(ctx, gcCli, gcServiceID, startTs, time.Duration(ttl)*time.Second)
			if err != nil {
				return errors.Trace(err)
			}
		}
	}

return nil
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/txnutil/gc/gc_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ func TestCheckSafetyOfStartTs(t *testing.T) {
require.Equal(t,
"[CDC:ErrStartTsBeforeGC]fail to create or maintain changefeed "+
"because start-ts 50 is earlier than or equal to GC safepoint at 60", err.Error())
err = EnsureChangefeedStartTsSafety(ctx, pdCli,
"ticdc-creating-",
0,
common.NewChangeFeedIDWithName("changefeed-boundary", "default"), TTL, 60)
require.Equal(t,
"[CDC:ErrStartTsBeforeGC]fail to create or maintain changefeed "+
"because start-ts 60 is earlier than or equal to GC safepoint at 60", err.Error())
pdCli.UpdateServiceGCSafePoint(ctx, "service2", 10, 80) //nolint:errcheck
pdCli.UpdateServiceGCSafePoint(ctx, "service3", 10, 70) //nolint:errcheck
err = EnsureChangefeedStartTsSafety(ctx, pdCli,
Expand All @@ -62,7 +69,7 @@ func TestCheckSafetyOfStartTs(t *testing.T) {
"service1": 60,
"service2": 80,
"service3": 70,
"ticdc-creating-default_changefeed2": 65,
"ticdc-creating-default_changefeed2": 64,
})
err = UndoEnsureChangefeedStartTsSafety(ctx, pdCli,
0,
Expand Down Expand Up @@ -114,12 +121,18 @@ func TestCheckSafetyOfStartTs(t *testing.T) {
common.NewChangeFeedIDWithName("changefeed1", "default"), TTL, 50)
require.True(t, cerror.ErrStartTsBeforeGC.Equal(errors.Cause(err)))

err = EnsureChangefeedStartTsSafety(ctx, pdCli,
"ticdc-creating-",
0,
common.NewChangeFeedIDWithName("changefeed-boundary", "default"), TTL, 60)
require.True(t, cerror.ErrStartTsBeforeGC.Equal(errors.Cause(err)))

err = EnsureChangefeedStartTsSafety(ctx, pdCli,
"ticdc-creating-",
0,
common.NewChangeFeedIDWithName("changefeed2", "default"), TTL, 65)
require.NoError(t, err)
require.Equal(t, uint64(65), pdCli.gcBarriers["ticdc-creating-default_changefeed2"])
require.Equal(t, uint64(64), pdCli.gcBarriers["ticdc-creating-default_changefeed2"])

err = UndoEnsureChangefeedStartTsSafety(ctx, pdCli,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ function resume_changefeed_in_failed_state() {
result=$(cdc_cli_changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr --overwrite-checkpoint-ts=$gc_safepoint --no-confirm=true 2>&1 || true)
if [[ $result != *"ErrStartTsBeforeGC"* ]]; then
echo "changefeeed resume result is expected to contain 'ErrStartTsBeforeGC', \
but actually got $resulservice-gc-safepointt"
but actually got $result"
exit 1
fi
fi
Expand Down
Loading