Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,9 +1674,9 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {

// Check that dropping a watched column still stops the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) {
require.Regexp(t, `expected "schema change occurred at ..." got: %+v`, err)
}
msg, err := cf.Next()
require.True(t, testutils.IsError(err, `schema change occurred at`),
`expected "schema change occurred at ..." got: msg=%s, err=%+v`, msg, err)
}

runWithAndWithoutRegression141453(t, testFn, func(t *testing.T, testFn cdcTestFn) {
Expand Down Expand Up @@ -1767,27 +1767,32 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c`)
// We specify `updated` so that identical messages with different timestamps
// aren't filtered out as duplicates. The appearance of such messages would
// indicate that a backfill did happen even though it should not have.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH updated`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
assertPayloadsStripTs(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
})

sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)
assertPayloads(t, cf, []string{
assertPayloadsStripTs(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
})

// Check that dropping a watched column still backfills.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN c`)
assertPayloads(t, cf, []string{
assertPayloadsStripTs(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b"}}`,
`hasfams.b_and_c: [1]->{"after": {"b": "b1"}}`,
})
}

cdcTest(t, testFn)
runWithAndWithoutRegression141453(t, testFn, func(t *testing.T, testFn cdcTestFn) {
cdcTest(t, testFn)
})
}

func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
Expand Down