Skip to content

try this #817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions mongodump/mongodump.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,43 @@ func (dump *MongoDump) verifyCollectionExists() (bool, error) {
}

// Dump handles some final options checking and executes MongoDump.
func (dump *MongoDump) Dump() (err error) {
func (dump *MongoDump) Dump() (dumpErr error) {
defer dump.SessionProvider.Close()
defer dump.CloseMuxIfNeeded(dumpErr)

dumpErr = dump.DumpUntilOplog()
if dumpErr != nil {
return
}

dumpErr = dump.DumpOplogAndAfter()

return
}

func (dump *MongoDump) CloseMuxIfNeeded(dumpErr error) {
if dump.archive == nil {
return
}

// The Mux runs until its Control is closed
close(dump.archive.Mux.Control)
muxErr := <-dump.archive.Mux.Completed
dump.archive.Out.Close()
if muxErr != nil {
if dumpErr != nil {
dumpErr = fmt.Errorf("archive writer: %v / %v", dumpErr, muxErr)
} else {
dumpErr = fmt.Errorf("archive writer: %v", muxErr)
}
log.Logvf(log.DebugLow, "%v", dumpErr)
} else {
log.Logvf(log.DebugLow, "mux completed successfully")
}
}

// Dump handles some final options checking and executes MongoDump.
func (dump *MongoDump) DumpUntilOplog() (err error) {
if !dump.OutputOptions.Oplog && (dump.InputOptions.SourceWritesDoneBarrier != "") {
// Wait for tests to stop writes before dumping any collections.
//
Expand Down Expand Up @@ -287,22 +321,6 @@ func (dump *MongoDump) Dump() (err error) {
Mux: archive.NewMultiplexer(archiveOut, dump.shutdownIntentsNotifier),
}
go dump.archive.Mux.Run()
defer func() {
// The Mux runs until its Control is closed
close(dump.archive.Mux.Control)
muxErr := <-dump.archive.Mux.Completed
archiveOut.Close()
if muxErr != nil {
if err != nil {
err = fmt.Errorf("archive writer: %v / %v", err, muxErr)
} else {
err = fmt.Errorf("archive writer: %v", muxErr)
}
log.Logvf(log.DebugLow, "%v", err)
} else {
log.Logvf(log.DebugLow, "mux completed successfully")
}
}()
}

// Confirm connectivity
Expand Down Expand Up @@ -432,6 +450,10 @@ func (dump *MongoDump) Dump() (err error) {
return err
}

return nil
}

func (dump *MongoDump) DumpOplogAndAfter() (err error) {
// IO Phase III
// oplog

Expand Down
105 changes: 101 additions & 4 deletions mongorestore/mongorestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3280,17 +3280,116 @@ func uniqueDBName() string {
return fmt.Sprintf("mongorestore_test_%d_%d", os.Getpid(), time.Now().UnixMilli())
}

type nopWriteCloser struct {
io.Writer
}

func (nwc nopWriteCloser) Close() error {
return nil
}

func TestNamespaceFilterWithBulkWriteAndTxn(t *testing.T) {
testtype.SkipUnlessTestType(t, testtype.IntegrationTestType)

// TODO: Make this t.Context() once we move to Go 1.24.
ctx := context.Background()

provider, _, err := testutil.GetBareSessionProvider()
require.NoError(t, err, "should get session provider")

client, err := provider.GetSession()
require.NoError(t, err, "should get session")

db := client.Database(uniqueDBName())
require.NoError(t, db.Drop(ctx), "should pre-drop DB %#q", db.Name())

require.NoError(t, db.CreateCollection(ctx, "stuff"))
coll := db.Collection("stuff")

backup := bytes.Buffer{}

dump, err := GetArchiveMongoDump(nopWriteCloser{&backup})
require.NoError(t, err, "should create dump")

dump.OutputOptions.Oplog = true

dumpErr := dump.DumpUntilOplog()
defer dump.CloseMuxIfNeeded(dumpErr)

require.NoError(t, dumpErr, "dump until oplog should work")

_, err = coll.InsertMany(
ctx,
lo.RepeatBy(
1000,
func(index int) any {
return bson.D{{"num", index}}
},
),
)
require.NoError(t, err, "should bulk-insert docs")

require.NoError(
t,
client.UseSession(
ctx,
func(sc mongo.SessionContext) error {
_, err := sc.WithTransaction(
sc,
func(ctx mongo.SessionContext) (any, error) {
_, err = coll.InsertMany(
ctx,
lo.RepeatBy(
1000,
func(index int) any {
return bson.D{{"num", index}}
},
),
)
return nil, err
},
)

return err
},
),
"should do txn",
)

dumpErr = dump.DumpOplogAndAfter()
require.NoError(t, dumpErr, "finishing dump should work")
dump.CloseMuxIfNeeded(dumpErr)

// ------------------------------------------

require.NoError(t, db.Drop(ctx), "should drop database")

restore, err := GetArchiveMongoRestore(
io.NopCloser(bytes.NewReader(backup.Bytes())),
)
require.NoError(t, err, "should create restore")

restore.NSOptions = &NSOptions{
NSInclude: []string{db.Name()},
}

assert.NoError(t, restore.Restore().Err, "restore should work")

docs, err := coll.CountDocuments(ctx, bson.D{})
require.NoError(t, err, "should count documents")

assert.Equal(t, 200, docs, "all inserted docs should be archived & restored")
}

func TestPipedDumpRestore(t *testing.T) {
testtype.SkipUnlessTestType(t, testtype.IntegrationTestType)

t.Logf("start %#q", t.Name())
// TODO: Make this t.Context() once we move to Go 1.24.
ctx := context.Background()

provider, _, err := testutil.GetBareSessionProvider()
require.NoError(t, err, "should get session provider")

t.Logf("getting session")
sess, err := provider.GetSession()
require.NoError(t, err, "should get session")

Expand All @@ -3299,8 +3398,6 @@ func TestPipedDumpRestore(t *testing.T) {
db := sess.Database(uniqueDBName())
require.NoError(t, db.Drop(ctx), "should pre-drop DB %#q", db.Name())

t.Logf("creating collections")

for _, collName := range srcCollNames {
docs := lo.RepeatBy(
10_000,
Expand Down