Skip to content

REP-5963 Migrate recheck queue to per-generation collections #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,9 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {

suite.Assert().Equal(
bson.M{
"db": suite.DBNameForTest(),
"coll": "testColl",
"generation": int32(0),
"docID": "heyhey",
"db": suite.DBNameForTest(),
"coll": "testColl",
"docID": "heyhey",
},
recheckDocs[0]["_id"],
"recheck doc should have expected ID",
Expand All @@ -297,7 +296,7 @@ func (suite *IntegrationTestSuite) getClusterTime(ctx context.Context, client *m
func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
recheckDocs := []bson.M{}

recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
cursor, err := recheckColl.Find(ctx, bson.D{})

if !errors.Is(err, mongo.ErrNoDocuments) {
Expand Down Expand Up @@ -839,7 +838,7 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
require.Eventually(
suite.T(),
func() bool {
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
cursor, err := recheckColl.Find(ctx, bson.D{})
if errors.Is(err, mongo.ErrNoDocuments) {
return false
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
return err
}

err = verifier.ClearRecheckDocsWhileLocked(ctx)
err = verifier.DropOldRecheckQueueWhileLocked(ctx)
if err != nil {
verifier.logger.Warn().
Err(err).
Expand Down
6 changes: 1 addition & 5 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,7 @@ func (verifier *Verifier) SetMetaURI(ctx context.Context, uri string) error {
func (verifier *Verifier) AddMetaIndexes(ctx context.Context) error {
model := mongo.IndexModel{Keys: bson.M{"generation": 1}}
_, err := verifier.verificationTaskCollection().Indexes().CreateOne(ctx, model)
if err != nil {
return err
}
model = mongo.IndexModel{Keys: bson.D{{"_id.generation", 1}}}
_, err = verifier.verificationDatabase().Collection(recheckQueue).Indexes().CreateOne(ctx, model)

return err
}

Expand Down
6 changes: 1 addition & 5 deletions internal/verifier/migration_verifier_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ func BenchmarkGeneric(t *testing.B) {
t.Fatal(err)
}
verifier.SetMetaDBName(metaDBName)
err = verifier.verificationTaskCollection().Drop(context.Background())
if err != nil {
t.Fatal(err)
}
err = verifier.verificationDatabase().Collection(recheckQueue).Drop(context.Background())
err = verifier.verificationDatabase().Drop(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,8 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {

func (suite *IntegrationTestSuite) waitForRecheckDocs(verifier *Verifier) {
suite.Eventually(func() bool {
cursor, err := suite.metaMongoClient.Database(verifier.metaDBName).Collection(recheckQueue).Find(suite.Context(), bson.D{})
cursor, err := verifier.getRecheckQueueCollection(verifier.generation).
Find(suite.Context(), bson.D{})
var docs []bson.D
suite.Require().NoError(err)
suite.Require().NoError(cursor.All(suite.Context(), &docs))
Expand Down
36 changes: 18 additions & 18 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
)

const (
recheckQueue = "recheckQueue"
maxBSONObjSize = 16 * 1024 * 1024
recheckQueueCollectionNameBase = "recheckQueue"

// This is the upper limit on the BSON-encoded length of document IDs
// per recheck task.
Expand All @@ -33,7 +32,6 @@ const (
// sorting by _id will guarantee that all rechecks for a given
// namespace appear consecutively.
type RecheckPrimaryKey struct {
Generation int `bson:"generation"`
SrcDatabaseName string `bson:"db"`
SrcCollectionName string `bson:"coll"`
DocumentID any `bson:"docID"`
Expand Down Expand Up @@ -108,6 +106,8 @@ func (verifier *Verifier) insertRecheckDocs(

eg, groupCtx := contextplus.ErrGroup(ctx)

genCollection := verifier.getRecheckQueueCollection(generation)

for _, curThreadIndexes := range indexesPerThread {
curThreadIndexes := curThreadIndexes

Expand All @@ -116,7 +116,6 @@ func (verifier *Verifier) insertRecheckDocs(
for m, i := range curThreadIndexes {
recheckDoc := RecheckDoc{
PrimaryKey: RecheckPrimaryKey{
Generation: generation,
SrcDatabaseName: dbNames[i],
SrcCollectionName: collNames[i],
DocumentID: documentIDs[i],
Expand All @@ -131,7 +130,7 @@ func (verifier *Verifier) insertRecheckDocs(
retryer := retry.New()
err := retryer.WithCallback(
func(retryCtx context.Context, _ *retry.FuncInfo) error {
_, err := verifier.verificationDatabase().Collection(recheckQueue).BulkWrite(
_, err := genCollection.BulkWrite(
retryCtx,
models,
options.BulkWrite().SetOrdered(false),
Expand Down Expand Up @@ -186,25 +185,22 @@ func (verifier *Verifier) insertRecheckDocs(
return nil
}

// ClearRecheckDocsWhileLocked deletes the previous generation’s recheck
// DropOldRecheckQueueWhileLocked deletes the previous generation’s recheck
// documents from the verifier’s metadata.
//
// The verifier **MUST** be locked when this function is called (or panic).
func (verifier *Verifier) ClearRecheckDocsWhileLocked(ctx context.Context) error {
func (verifier *Verifier) DropOldRecheckQueueWhileLocked(ctx context.Context) error {
prevGeneration := verifier.getPreviousGenerationWhileLocked()

verifier.logger.Debug().
Int("previousGeneration", prevGeneration).
Msg("Deleting previous generation's enqueued rechecks.")

genCollection := verifier.getRecheckQueueCollection(prevGeneration)

return retry.New().WithCallback(
func(ctx context.Context, i *retry.FuncInfo) error {
_, err := verifier.verificationDatabase().Collection(recheckQueue).DeleteMany(
ctx,
bson.D{{"_id.generation", prevGeneration}},
)

return err
return genCollection.Drop(ctx)
},
"deleting generation %d's enqueued rechecks",
prevGeneration,
Expand All @@ -231,14 +227,13 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) error {
prevGeneration := verifier.getPreviousGenerationWhileLocked()

findFilter := bson.D{{"_id.generation", prevGeneration}}

verifier.logger.Debug().
Int("priorGeneration", prevGeneration).
Msgf("Counting prior generation’s enqueued rechecks.")

recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
rechecksCount, err := recheckColl.CountDocuments(ctx, findFilter)
recheckColl := verifier.getRecheckQueueCollection(prevGeneration)

rechecksCount, err := recheckColl.CountDocuments(ctx, bson.D{})
if err != nil {
return errors.Wrapf(err,
"failed to count generation %d’s rechecks",
Expand Down Expand Up @@ -275,7 +270,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
// namespace will be consecutive in this query’s result.
cursor, err := recheckColl.Find(
ctx,
findFilter,
bson.D{},
options.Find().SetSort(bson.D{{"_id", 1}}),
)
if err != nil {
Expand Down Expand Up @@ -378,3 +373,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e

return err
}

func (v *Verifier) getRecheckQueueCollection(generation int) *mongo.Collection {
return v.verificationDatabase().
Collection(fmt.Sprintf("%s_gen%d", recheckQueueCollectionNameBase, generation))
}
47 changes: 7 additions & 40 deletions internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
[]RecheckDoc{
{
PrimaryKey: RecheckPrimaryKey{
Generation: verifier.generation,
SrcDatabaseName: "the",
SrcCollectionName: "namespace",
DocumentID: "theDocID",
Expand Down Expand Up @@ -74,7 +73,6 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
[]RecheckDoc{
{
PrimaryKey: RecheckPrimaryKey{
Generation: verifier.generation,
SrcDatabaseName: "the",
SrcCollectionName: "namespace",
DocumentID: "theDocID",
Expand All @@ -87,7 +85,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
}

func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []RecheckDoc {
metaColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(recheckQueue)
metaColl := verifier.getRecheckQueueCollection(verifier.generation)

cursor, err := metaColl.Find(
ctx,
Expand Down Expand Up @@ -283,7 +281,6 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() {

d1 := RecheckDoc{
PrimaryKey: RecheckPrimaryKey{
Generation: 0,
SrcDatabaseName: "testDB",
SrcCollectionName: "testColl",
DocumentID: id1,
Expand Down Expand Up @@ -343,7 +340,6 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() {
suite.Require().NoError(err)
d1 := RecheckDoc{
PrimaryKey: RecheckPrimaryKey{
Generation: 0,
SrcDatabaseName: "testDB",
SrcCollectionName: "testColl",
DocumentID: id1,
Expand Down Expand Up @@ -451,60 +447,31 @@ func (suite *IntegrationTestSuite) TestGenerationalClear() {
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)

verifier.generation++

err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)

verifier.generation++

err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)

d1 := RecheckDoc{
PrimaryKey: RecheckPrimaryKey{
Generation: 0,
SrcDatabaseName: "testDB",
SrcCollectionName: "testColl",
DocumentID: id1,
},
}
d2 := d1
d2.PrimaryKey.DocumentID = id2
d3 := d1
d3.PrimaryKey.Generation = 1
d4 := d2
d4.PrimaryKey.Generation = 1
d5 := d1
d5.PrimaryKey.Generation = 2
d6 := d2
d6.PrimaryKey.Generation = 2

results := suite.fetchRecheckDocs(ctx, verifier)
suite.ElementsMatch([]any{d1, d2, d3, d4, d5, d6}, results)
suite.Assert().ElementsMatch([]any{d1, d2}, results)

verifier.mux.Lock()

verifier.generation = 2
err = verifier.ClearRecheckDocsWhileLocked(ctx)
suite.Require().NoError(err)

results = suite.fetchRecheckDocs(ctx, verifier)
suite.ElementsMatch([]any{d1, d2, d5, d6}, results)
verifier.generation++

verifier.generation = 1
err = verifier.ClearRecheckDocsWhileLocked(ctx)
err = verifier.DropOldRecheckQueueWhileLocked(ctx)
suite.Require().NoError(err)

results = suite.fetchRecheckDocs(ctx, verifier)
suite.ElementsMatch([]any{d5, d6}, results)

verifier.generation = 3
err = verifier.ClearRecheckDocsWhileLocked(ctx)
suite.Require().NoError(err)
// This never happens in real life but is needed for this test.
verifier.generation--

results = suite.fetchRecheckDocs(ctx, verifier)
suite.ElementsMatch([]any{}, results)
suite.Assert().ElementsMatch([]any{}, results)
}

func insertRecheckDocs(
Expand Down