Skip to content

RUST-1842 Update prose tests for mongos deprioritization during retryable ops #1397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jul 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions src/test/spec/retryable_reads.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::IntoFuture, time::Duration};
use std::{future::IntoFuture, sync::Arc, time::Duration};

use crate::bson::doc;

Expand All @@ -8,6 +8,7 @@ use crate::{
cmap::{CmapEvent, ConnectionCheckoutFailedReason},
command::CommandEvent,
},
options::SelectionCriteria,
runtime::{self, AsyncJoinHandle},
test::{
block_connection_supported,
Expand Down Expand Up @@ -174,23 +175,40 @@ async fn retry_read_different_mongos() {
client_options.hosts.drain(2..);
client_options.retry_reads = Some(true);

let mut guards = vec![];
for ix in [0, 1] {
let mut opts = client_options.clone();
opts.hosts.remove(ix);
opts.direct_connection = Some(true);
let client = Client::for_test().options(opts).await;
let hosts = client_options.hosts.clone();
let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;

// NOTE: This test uses a single client to set failpoints on each mongos and run the find
// operation. This avoids flakiness caused by a race between server discovery and server
// selection.

// When a client is first created, it initializes its view of the topology with all configured
// mongos addresses, but marks each as Unknown until it completes the server discovery process
// by sending and receiving "hello" messages Unknown servers are not eligible for server
// selection.

// Previously, we created a new client for each call to `enable_fail_point` and for the find
// operation. Each new client restarted the discovery process, and sometimes had not yet marked
// both mongos servers as usable, leading to test failures when the retry logic couldn't find a
// second eligible server.

// By reusing a single client, each `enable_fail_point` call forces discovery to complete for
// the corresponding mongos. As a result, when the find operation runs, the client has a
// fully discovered topology and can reliably select between both servers.
let mut guards = Vec::new();
for address in hosts {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference, can you add a comment here explaining why we set the failpoints this way rather than with separate clients? and ditto elsewhere

Copy link
Collaborator Author

@JamieTsai1024 JamieTsai1024 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! let me know if you have any suggestions on the explanation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these details aren't quite accurate - the important distinction to note is that we're using the same client to set the failpoints on each mongos as we are for the find operation. The fundamental problem that we were encountering was a race between server discovery, which happens in the background after a client is created, and the server selection process for find, which was previously happening right after creating the client. Server discovery goes roughly as follows:

  • client gets created with two mongos addresses (localhost:27017 and localhost:27018) and stores each of these in its topology with an initial server type of Unknown. (Unknown servers are not eligible to be selected for operations)
  • client sends a hello message to each mongos and waits for a reply
  • each mongos replies to the hello message with information about itself, and client uses this information to update its server type from Unknown to Mongos

Executing an operation (in this case, enable_fail_point) on each individual mongos forces the client to complete its discovery of that mongos and select it for the operation. This means that once we get to the find operation, client has a list of two Mongos servers to select from. On the contrary, when we were creating a new client for each call to enable_fail_point and then the find operation, each of those clients was restarting the server discovery process from scratch.

The details here can be a little tricky to understand, so let me know if you have any questions about this and we can walk through it in more detail!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for the detailed explanation, Isabel! I hadn’t fully understood how server discovery works in the background or how using separate clients was restarting that process. I also realize now that some of my original terminology wasn’t quite accurate (e.g., implying it was about a single mongos instead of the client's discovery state), so I appreciate the correction.

I’ve updated the comment to reflect that. Let me know if it looks good now or if I should tweak anything further - would be happy to chat about it more if my understanding is still off!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great! thanks for making those changes.

let address = address.clone();
let fail_point = FailPoint::fail_command(&["find"], FailPointMode::Times(1))
.error_code(6)
.close_connection(true);
.selection_criteria(SelectionCriteria::Predicate(Arc::new(move |info| {
info.description.address == address
})));
guards.push(client.enable_fail_point(fail_point).await.unwrap());
}

let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;
let result = client
.database("test")
.collection::<crate::bson::Document>("retry_read_different_mongos")
Expand All @@ -211,6 +229,14 @@ async fn retry_read_different_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_failed().unwrap();
let second_address = &second_failed.connection.address;
assert_ne!(
first_address, second_address,
"Failed commands did not occur on two different mongos instances"
);

drop(guards); // enforce lifetime
}
Expand All @@ -235,12 +261,11 @@ async fn retry_read_same_mongos() {
client_options.direct_connection = Some(true);
let client = Client::for_test().options(client_options).await;

let fail_point = FailPoint::fail_command(&["find"], FailPointMode::Times(1))
.error_code(6)
.close_connection(true);
let fail_point = FailPoint::fail_command(&["find"], FailPointMode::Times(1)).error_code(6);
client.enable_fail_point(fail_point).await.unwrap()
};

client_options.direct_connection = Some(false);
let client = Client::for_test()
.options(client_options)
.monitor_events()
Expand All @@ -265,6 +290,14 @@ async fn retry_read_same_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_succeeded().unwrap();
let second_address = &second_failed.connection.address;
assert_eq!(
first_address, second_address,
"Failed command and retry did not occur on the same mongos instance",
);

drop(fp_guard); // enforce lifetime
}
65 changes: 49 additions & 16 deletions src/test/spec/retryable_writes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use crate::bson::Bson;
use crate::{bson::Bson, options::SelectionCriteria};
use tokio::sync::Mutex;

use crate::{
Expand Down Expand Up @@ -317,27 +317,44 @@ async fn retry_write_different_mongos() {
);
return;
}

// NOTE: This test uses a single client to set failpoints on each mongos and run the insert
// operation. This avoids flakiness caused by a race between server discovery and server
// selection.

// When a client is first created, it initializes its view of the topology with all configured
// mongos addresses, but marks each as Unknown until it completes the server discovery process
// by sending and receiving "hello" messages Unknown servers are not eligible for server
// selection.

// Previously, we created a new client for each call to `enable_fail_point` and for the insert
// operation. Each new client restarted the discovery process, and sometimes had not yet marked
// both mongos servers as usable, leading to test failures when the retry logic couldn't insert
// a second eligible server.

// By reusing a single client, each `enable_fail_point` call forces discovery to complete for
// the corresponding mongos. As a result, when the insert operation runs, the client has a
// fully discovered topology and can reliably select between both servers.
client_options.hosts.drain(2..);
client_options.retry_writes = Some(true);
let hosts = client_options.hosts.clone();
let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;

let mut guards = vec![];
for ix in [0, 1] {
let mut opts = client_options.clone();
opts.hosts.remove(ix);
opts.direct_connection = Some(true);
let client = Client::for_test().options(opts).await;

let mut guards = Vec::new();
for address in hosts {
let address = address.clone();
let fail_point = FailPoint::fail_command(&["insert"], FailPointMode::Times(1))
.error_code(6)
.error_labels(vec![RETRYABLE_WRITE_ERROR])
.close_connection(true);
.error_labels([RETRYABLE_WRITE_ERROR])
.selection_criteria(SelectionCriteria::Predicate(Arc::new(move |info| {
info.description.address == address
})));
guards.push(client.enable_fail_point(fail_point).await.unwrap());
}

let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;
let result = client
.database("test")
.collection::<crate::bson::Document>("retry_write_different_mongos")
Expand All @@ -358,6 +375,14 @@ async fn retry_write_different_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_failed().unwrap();
let second_address = &second_failed.connection.address;
assert_ne!(
first_address, second_address,
"Failed commands did not occur on two different mongos instances"
);

drop(guards); // enforce lifetime
}
Expand All @@ -384,11 +409,11 @@ async fn retry_write_same_mongos() {

let fail_point = FailPoint::fail_command(&["insert"], FailPointMode::Times(1))
.error_code(6)
.error_labels(vec![RETRYABLE_WRITE_ERROR])
.close_connection(true);
.error_labels(vec![RETRYABLE_WRITE_ERROR]);
client.enable_fail_point(fail_point).await.unwrap()
};

client_options.direct_connection = Some(false);
let client = Client::for_test()
.options(client_options)
.monitor_events()
Expand All @@ -413,6 +438,14 @@ async fn retry_write_same_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_succeeded().unwrap();
let second_address = &second_failed.connection.address;
assert_eq!(
first_address, second_address,
"Failed commands did not occur on the same mongos instance",
);

drop(fp_guard); // enforce lifetime
}
9 changes: 8 additions & 1 deletion src/test/util/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
bson::doc,
event::{
cmap::CmapEvent,
command::{CommandEvent, CommandSucceededEvent},
command::{CommandEvent, CommandFailedEvent, CommandSucceededEvent},
sdam::SdamEvent,
},
test::get_client_options,
Expand Down Expand Up @@ -101,6 +101,13 @@ impl CommandEvent {
_ => None,
}
}

pub(crate) fn as_command_failed(&self) -> Option<&CommandFailedEvent> {
match self {
CommandEvent::Failed(e) => Some(e),
_ => None,
}
}
}

#[derive(Clone, Debug)]
Expand Down