Skip to content

Commit ccff9aa

Browse files
authored
REP-5963 Migrate recheck queue to per-generation collections (#105)
Previously we stored the entire recheck queue in a single collection. This meant that, when we finished a generation, we had to “surgically” delete all rechecks for the old generation. This changeset alters the metadata so that, rather than a storing rechecks in a single collection, we store them in generation-specific collections like `recheckQueue_gen0`, `recheckQueue_gen1`, etc. That way, when we want to clean out a prior generation’s recheck queue, we just drop a collection, which is much faster than deleting 100s of millions of individual recheck documents.
1 parent f5f94a1 commit ccff9aa

File tree

7 files changed

+35
-76
lines changed

7 files changed

+35
-76
lines changed

internal/verifier/change_stream_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,9 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
271271

272272
suite.Assert().Equal(
273273
bson.M{
274-
"db": suite.DBNameForTest(),
275-
"coll": "testColl",
276-
"generation": int32(0),
277-
"docID": "heyhey",
274+
"db": suite.DBNameForTest(),
275+
"coll": "testColl",
276+
"docID": "heyhey",
278277
},
279278
recheckDocs[0]["_id"],
280279
"recheck doc should have expected ID",
@@ -297,7 +296,7 @@ func (suite *IntegrationTestSuite) getClusterTime(ctx context.Context, client *m
297296
func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
298297
recheckDocs := []bson.M{}
299298

300-
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
299+
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
301300
cursor, err := recheckColl.Find(ctx, bson.D{})
302301

303302
if !errors.Is(err, mongo.ErrNoDocuments) {
@@ -839,7 +838,7 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
839838
require.Eventually(
840839
suite.T(),
841840
func() bool {
842-
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
841+
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
843842
cursor, err := recheckColl.Find(ctx, bson.D{})
844843
if errors.Is(err, mongo.ErrNoDocuments) {
845844
return false

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
390390
return err
391391
}
392392

393-
err = verifier.ClearRecheckDocsWhileLocked(ctx)
393+
err = verifier.DropOldRecheckQueueWhileLocked(ctx)
394394
if err != nil {
395395
verifier.logger.Warn().
396396
Err(err).

internal/verifier/migration_verifier.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -339,11 +339,7 @@ func (verifier *Verifier) SetMetaURI(ctx context.Context, uri string) error {
339339
func (verifier *Verifier) AddMetaIndexes(ctx context.Context) error {
340340
model := mongo.IndexModel{Keys: bson.M{"generation": 1}}
341341
_, err := verifier.verificationTaskCollection().Indexes().CreateOne(ctx, model)
342-
if err != nil {
343-
return err
344-
}
345-
model = mongo.IndexModel{Keys: bson.D{{"_id.generation", 1}}}
346-
_, err = verifier.verificationDatabase().Collection(recheckQueue).Indexes().CreateOne(ctx, model)
342+
347343
return err
348344
}
349345

internal/verifier/migration_verifier_bench_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,7 @@ func BenchmarkGeneric(t *testing.B) {
7272
t.Fatal(err)
7373
}
7474
verifier.SetMetaDBName(metaDBName)
75-
err = verifier.verificationTaskCollection().Drop(context.Background())
76-
if err != nil {
77-
t.Fatal(err)
78-
}
79-
err = verifier.verificationDatabase().Collection(recheckQueue).Drop(context.Background())
75+
err = verifier.verificationDatabase().Drop(context.Background())
8076
if err != nil {
8177
t.Fatal(err)
8278
}

internal/verifier/migration_verifier_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1593,7 +1593,8 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
15931593

15941594
func (suite *IntegrationTestSuite) waitForRecheckDocs(verifier *Verifier) {
15951595
suite.Eventually(func() bool {
1596-
cursor, err := suite.metaMongoClient.Database(verifier.metaDBName).Collection(recheckQueue).Find(suite.Context(), bson.D{})
1596+
cursor, err := verifier.getRecheckQueueCollection(verifier.generation).
1597+
Find(suite.Context(), bson.D{})
15971598
var docs []bson.D
15981599
suite.Require().NoError(err)
15991600
suite.Require().NoError(cursor.All(suite.Context(), &docs))

internal/verifier/recheck.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ import (
1717
)
1818

1919
const (
20-
recheckQueue = "recheckQueue"
21-
maxBSONObjSize = 16 * 1024 * 1024
20+
recheckQueueCollectionNameBase = "recheckQueue"
2221

2322
// This is the upper limit on the BSON-encoded length of document IDs
2423
// per recheck task.
@@ -33,7 +32,6 @@ const (
3332
// sorting by _id will guarantee that all rechecks for a given
3433
// namespace appear consecutively.
3534
type RecheckPrimaryKey struct {
36-
Generation int `bson:"generation"`
3735
SrcDatabaseName string `bson:"db"`
3836
SrcCollectionName string `bson:"coll"`
3937
DocumentID any `bson:"docID"`
@@ -108,6 +106,8 @@ func (verifier *Verifier) insertRecheckDocs(
108106

109107
eg, groupCtx := contextplus.ErrGroup(ctx)
110108

109+
genCollection := verifier.getRecheckQueueCollection(generation)
110+
111111
for _, curThreadIndexes := range indexesPerThread {
112112
curThreadIndexes := curThreadIndexes
113113

@@ -116,7 +116,6 @@ func (verifier *Verifier) insertRecheckDocs(
116116
for m, i := range curThreadIndexes {
117117
recheckDoc := RecheckDoc{
118118
PrimaryKey: RecheckPrimaryKey{
119-
Generation: generation,
120119
SrcDatabaseName: dbNames[i],
121120
SrcCollectionName: collNames[i],
122121
DocumentID: documentIDs[i],
@@ -131,7 +130,7 @@ func (verifier *Verifier) insertRecheckDocs(
131130
retryer := retry.New()
132131
err := retryer.WithCallback(
133132
func(retryCtx context.Context, _ *retry.FuncInfo) error {
134-
_, err := verifier.verificationDatabase().Collection(recheckQueue).BulkWrite(
133+
_, err := genCollection.BulkWrite(
135134
retryCtx,
136135
models,
137136
options.BulkWrite().SetOrdered(false),
@@ -186,25 +185,22 @@ func (verifier *Verifier) insertRecheckDocs(
186185
return nil
187186
}
188187

189-
// ClearRecheckDocsWhileLocked deletes the previous generation’s recheck
188+
// DropOldRecheckQueueWhileLocked deletes the previous generation’s recheck
190189
// documents from the verifier’s metadata.
191190
//
192191
// The verifier **MUST** be locked when this function is called (or panic).
193-
func (verifier *Verifier) ClearRecheckDocsWhileLocked(ctx context.Context) error {
192+
func (verifier *Verifier) DropOldRecheckQueueWhileLocked(ctx context.Context) error {
194193
prevGeneration := verifier.getPreviousGenerationWhileLocked()
195194

196195
verifier.logger.Debug().
197196
Int("previousGeneration", prevGeneration).
198197
Msg("Deleting previous generation's enqueued rechecks.")
199198

199+
genCollection := verifier.getRecheckQueueCollection(prevGeneration)
200+
200201
return retry.New().WithCallback(
201202
func(ctx context.Context, i *retry.FuncInfo) error {
202-
_, err := verifier.verificationDatabase().Collection(recheckQueue).DeleteMany(
203-
ctx,
204-
bson.D{{"_id.generation", prevGeneration}},
205-
)
206-
207-
return err
203+
return genCollection.Drop(ctx)
208204
},
209205
"deleting generation %d's enqueued rechecks",
210206
prevGeneration,
@@ -231,14 +227,13 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
231227
func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) error {
232228
prevGeneration := verifier.getPreviousGenerationWhileLocked()
233229

234-
findFilter := bson.D{{"_id.generation", prevGeneration}}
235-
236230
verifier.logger.Debug().
237231
Int("priorGeneration", prevGeneration).
238232
Msgf("Counting prior generation’s enqueued rechecks.")
239233

240-
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
241-
rechecksCount, err := recheckColl.CountDocuments(ctx, findFilter)
234+
recheckColl := verifier.getRecheckQueueCollection(prevGeneration)
235+
236+
rechecksCount, err := recheckColl.CountDocuments(ctx, bson.D{})
242237
if err != nil {
243238
return errors.Wrapf(err,
244239
"failed to count generation %d’s rechecks",
@@ -275,7 +270,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
275270
// namespace will be consecutive in this query’s result.
276271
cursor, err := recheckColl.Find(
277272
ctx,
278-
findFilter,
273+
bson.D{},
279274
options.Find().SetSort(bson.D{{"_id", 1}}),
280275
)
281276
if err != nil {
@@ -378,3 +373,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
378373

379374
return err
380375
}
376+
377+
func (v *Verifier) getRecheckQueueCollection(generation int) *mongo.Collection {
378+
return v.verificationDatabase().
379+
Collection(fmt.Sprintf("%s_gen%d", recheckQueueCollectionNameBase, generation))
380+
}

internal/verifier/recheck_test.go

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
3636
[]RecheckDoc{
3737
{
3838
PrimaryKey: RecheckPrimaryKey{
39-
Generation: verifier.generation,
4039
SrcDatabaseName: "the",
4140
SrcCollectionName: "namespace",
4241
DocumentID: "theDocID",
@@ -74,7 +73,6 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
7473
[]RecheckDoc{
7574
{
7675
PrimaryKey: RecheckPrimaryKey{
77-
Generation: verifier.generation,
7876
SrcDatabaseName: "the",
7977
SrcCollectionName: "namespace",
8078
DocumentID: "theDocID",
@@ -87,7 +85,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
8785
}
8886

8987
func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []RecheckDoc {
90-
metaColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(recheckQueue)
88+
metaColl := verifier.getRecheckQueueCollection(verifier.generation)
9189

9290
cursor, err := metaColl.Find(
9391
ctx,
@@ -283,7 +281,6 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() {
283281

284282
d1 := RecheckDoc{
285283
PrimaryKey: RecheckPrimaryKey{
286-
Generation: 0,
287284
SrcDatabaseName: "testDB",
288285
SrcCollectionName: "testColl",
289286
DocumentID: id1,
@@ -343,7 +340,6 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() {
343340
suite.Require().NoError(err)
344341
d1 := RecheckDoc{
345342
PrimaryKey: RecheckPrimaryKey{
346-
Generation: 0,
347343
SrcDatabaseName: "testDB",
348344
SrcCollectionName: "testColl",
349345
DocumentID: id1,
@@ -451,60 +447,31 @@ func (suite *IntegrationTestSuite) TestGenerationalClear() {
451447
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
452448
suite.Require().NoError(err)
453449

454-
verifier.generation++
455-
456-
err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
457-
suite.Require().NoError(err)
458-
459-
verifier.generation++
460-
461-
err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
462-
suite.Require().NoError(err)
463-
464450
d1 := RecheckDoc{
465451
PrimaryKey: RecheckPrimaryKey{
466-
Generation: 0,
467452
SrcDatabaseName: "testDB",
468453
SrcCollectionName: "testColl",
469454
DocumentID: id1,
470455
},
471456
}
472457
d2 := d1
473458
d2.PrimaryKey.DocumentID = id2
474-
d3 := d1
475-
d3.PrimaryKey.Generation = 1
476-
d4 := d2
477-
d4.PrimaryKey.Generation = 1
478-
d5 := d1
479-
d5.PrimaryKey.Generation = 2
480-
d6 := d2
481-
d6.PrimaryKey.Generation = 2
482459

483460
results := suite.fetchRecheckDocs(ctx, verifier)
484-
suite.ElementsMatch([]any{d1, d2, d3, d4, d5, d6}, results)
461+
suite.Assert().ElementsMatch([]any{d1, d2}, results)
485462

486463
verifier.mux.Lock()
487464

488-
verifier.generation = 2
489-
err = verifier.ClearRecheckDocsWhileLocked(ctx)
490-
suite.Require().NoError(err)
491-
492-
results = suite.fetchRecheckDocs(ctx, verifier)
493-
suite.ElementsMatch([]any{d1, d2, d5, d6}, results)
465+
verifier.generation++
494466

495-
verifier.generation = 1
496-
err = verifier.ClearRecheckDocsWhileLocked(ctx)
467+
err = verifier.DropOldRecheckQueueWhileLocked(ctx)
497468
suite.Require().NoError(err)
498469

499-
results = suite.fetchRecheckDocs(ctx, verifier)
500-
suite.ElementsMatch([]any{d5, d6}, results)
501-
502-
verifier.generation = 3
503-
err = verifier.ClearRecheckDocsWhileLocked(ctx)
504-
suite.Require().NoError(err)
470+
// This never happens in real life but is needed for this test.
471+
verifier.generation--
505472

506473
results = suite.fetchRecheckDocs(ctx, verifier)
507-
suite.ElementsMatch([]any{}, results)
474+
suite.Assert().ElementsMatch([]any{}, results)
508475
}
509476

510477
func insertRecheckDocs(

0 commit comments

Comments
 (0)