Skip to content

Commit c5b3f83

Browse files
authored
RUST-1712 Support User Configuration for max_connecting (mongodb#923)
1 parent 17b3af3 commit c5b3f83

File tree

6 files changed

+128
-6
lines changed

6 files changed

+128
-6
lines changed

src/client/options.rs

+24
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const URI_OPTIONS: &[&str] = &[
6262
"maxstalenessseconds",
6363
"maxpoolsize",
6464
"minpoolsize",
65+
"maxconnecting",
6566
"readconcernlevel",
6667
"readpreference",
6768
"readpreferencetags",
@@ -491,6 +492,12 @@ pub struct ClientOptions {
491492
#[builder(default)]
492493
pub min_pool_size: Option<u32>,
493494

495+
/// The maximum number of new connections that can be created concurrently.
496+
///
497+
/// If specified, this value must be greater than 0. The default is 2.
498+
#[builder(default)]
499+
pub max_connecting: Option<u32>,
500+
494501
/// Specifies the default read concern for operations performed on the Client. See the
495502
/// ReadConcern type documentation for more details.
496503
#[builder(default)]
@@ -669,6 +676,8 @@ impl Serialize for ClientOptions {
669676

670677
minpoolsize: &'a Option<u32>,
671678

679+
maxconnecting: &'a Option<u32>,
680+
672681
#[serde(flatten, serialize_with = "ReadConcern::serialize_for_client_options")]
673682
readconcern: &'a Option<ReadConcern>,
674683

@@ -711,6 +720,7 @@ impl Serialize for ClientOptions {
711720
maxidletimems: &self.max_idle_time,
712721
maxpoolsize: &self.max_pool_size,
713722
minpoolsize: &self.min_pool_size,
723+
maxconnecting: &self.max_connecting,
714724
readconcern: &self.read_concern,
715725
replicaset: &self.repl_set_name,
716726
retryreads: &self.retry_reads,
@@ -802,6 +812,11 @@ pub struct ConnectionString {
802812
/// The default value is 0.
803813
pub min_pool_size: Option<u32>,
804814

815+
/// The maximum number of new connections that can be created concurrently.
816+
///
817+
/// If specified, this value must be greater than 0. The default is 2.
818+
pub max_connecting: Option<u32>,
819+
805820
/// The amount of time that a connection can remain idle in a connection pool before being
806821
/// closed. A value of zero indicates that connections should not be closed due to being idle.
807822
///
@@ -1285,6 +1300,7 @@ impl ClientOptions {
12851300
};
12861301
}
12871302
}
1303+
12881304
Self {
12891305
hosts: vec![],
12901306
app_name: conn_str.app_name,
@@ -1298,6 +1314,7 @@ impl ClientOptions {
12981314
max_pool_size: conn_str.max_pool_size,
12991315
min_pool_size: conn_str.min_pool_size,
13001316
max_idle_time: conn_str.max_idle_time,
1317+
max_connecting: conn_str.max_connecting,
13011318
server_selection_timeout: conn_str.server_selection_timeout,
13021319
compressors: conn_str.compressors,
13031320
connect_timeout: conn_str.connect_timeout,
@@ -1378,6 +1395,10 @@ impl ClientOptions {
13781395
return Err(Error::invalid_argument("cannot specify maxPoolSize=0"));
13791396
}
13801397

1398+
if let Some(0) = self.max_connecting {
1399+
return Err(Error::invalid_argument("cannot specify maxConnecting=0"));
1400+
}
1401+
13811402
if let Some(SelectionCriteria::ReadPreference(ref rp)) = self.selection_criteria {
13821403
if let Some(max_staleness) = rp.max_staleness() {
13831404
verify_max_staleness(
@@ -2028,6 +2049,9 @@ impl ConnectionString {
20282049
k @ "minpoolsize" => {
20292050
self.min_pool_size = Some(get_u32!(value, k));
20302051
}
2052+
k @ "maxconnecting" => {
2053+
self.max_connecting = Some(get_u32!(value, k));
2054+
}
20312055
"readconcernlevel" => {
20322056
self.read_concern = Some(ReadConcernLevel::from_str(value).into());
20332057
}

src/client/options/test.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ async fn run_test(test_file: TestFile) {
161161
if let Ok(max) = json_options.get_i32("maxpoolsize") {
162162
json_options.insert("maxpoolsize", Bson::Int64(max.into()));
163163
}
164+
if let Ok(max_connecting) = json_options.get_i32("maxconnecting") {
165+
json_options.insert("maxconnecting", Bson::Int64(max_connecting.into()));
166+
}
164167

165168
options_doc = options_doc
166169
.into_iter()
@@ -185,7 +188,6 @@ async fn run_test(test_file: TestFile) {
185188
}
186189
}
187190
}
188-
189191
assert_eq!(options_doc, json_options, "{}", test_case.description)
190192
}
191193

src/cmap/options.rs

+6
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ pub(crate) struct ConnectionPoolOptions {
6262

6363
/// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
6464
pub(crate) load_balanced: Option<bool>,
65+
66+
/// The maximum number of new connections that can be created concurrently.
67+
///
68+
/// The default is 2.
69+
pub(crate) max_connecting: Option<u32>,
6570
}
6671

6772
impl ConnectionPoolOptions {
@@ -77,6 +82,7 @@ impl ConnectionPoolOptions {
7782
ready: None,
7883
load_balanced: options.load_balanced,
7984
credential: options.credential.clone(),
85+
max_connecting: options.max_connecting,
8086
}
8187
}
8288

src/cmap/worker.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use std::{
4141
time::Duration,
4242
};
4343

44-
const MAX_CONNECTING: u32 = 2;
44+
const DEFAULT_MAX_CONNECTING: u32 = 2;
4545
const MAINTENACE_FREQUENCY: Duration = Duration::from_millis(500);
4646

4747
/// A worker task that manages the shared state of the pool.
@@ -134,6 +134,9 @@ pub(crate) struct ConnectionPoolWorker {
134134
/// A handle used to notify SDAM that a connection establishment error happened. This will
135135
/// allow the server to transition to Unknown and clear the pool as necessary.
136136
server_updater: TopologyUpdater,
137+
138+
/// The maximum number of new connections that can be created concurrently.
139+
max_connecting: u32,
137140
}
138141

139142
impl ConnectionPoolWorker {
@@ -158,6 +161,10 @@ impl ConnectionPoolWorker {
158161
.as_ref()
159162
.and_then(|opts| opts.max_pool_size)
160163
.unwrap_or(DEFAULT_MAX_POOL_SIZE);
164+
let max_connecting = options
165+
.as_ref()
166+
.and_then(|opts| opts.max_connecting)
167+
.unwrap_or(DEFAULT_MAX_CONNECTING);
161168

162169
let min_pool_size = options.as_ref().and_then(|opts| opts.min_pool_size);
163170

@@ -233,6 +240,7 @@ impl ConnectionPoolWorker {
233240
generation_publisher,
234241
maintenance_frequency,
235242
server_updater,
243+
max_connecting,
236244
};
237245

238246
runtime::execute(async move {
@@ -371,7 +379,7 @@ impl ConnectionPoolWorker {
371379
return true;
372380
}
373381

374-
self.below_max_connections() && self.pending_connection_count < MAX_CONNECTING
382+
self.below_max_connections() && self.pending_connection_count < self.max_connecting
375383
}
376384

377385
fn check_out(&mut self, request: ConnectionRequest) {
@@ -612,7 +620,7 @@ impl ConnectionPoolWorker {
612620
fn ensure_min_connections(&mut self) {
613621
if let Some(min_pool_size) = self.min_pool_size {
614622
while self.total_connection_count < min_pool_size
615-
&& self.pending_connection_count < MAX_CONNECTING
623+
&& self.pending_connection_count < self.max_connecting
616624
{
617625
let pending_connection = self.create_pending_connection();
618626
let event_handler = self.event_emitter.clone();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
{
2+
"version": 1,
3+
"style": "integration",
4+
"description": "custom maxConnecting is enforced",
5+
"runOn": [
6+
{
7+
"minServerVersion": "4.4.0"
8+
}
9+
],
10+
"failPoint": {
11+
"configureFailPoint": "failCommand",
12+
"mode": "alwaysOn",
13+
"data": {
14+
"failCommands": [
15+
"isMaster",
16+
"hello"
17+
],
18+
"closeConnection": false,
19+
"blockConnection": true,
20+
"blockTimeMS": 500
21+
}
22+
},
23+
"poolOptions": {
24+
"maxConnecting": 1,
25+
"maxPoolSize": 2,
26+
"waitQueueTimeoutMS": 5000
27+
},
28+
"operations": [
29+
{
30+
"name": "ready"
31+
},
32+
{
33+
"name": "start",
34+
"target": "thread1"
35+
},
36+
{
37+
"name": "start",
38+
"target": "thread2"
39+
},
40+
{
41+
"name": "checkOut",
42+
"thread": "thread1"
43+
},
44+
{
45+
"name": "waitForEvent",
46+
"event": "ConnectionCreated",
47+
"count": 1
48+
},
49+
{
50+
"name": "checkOut",
51+
"thread": "thread2"
52+
},
53+
{
54+
"name": "waitForEvent",
55+
"event": "ConnectionReady",
56+
"count": 2
57+
}
58+
],
59+
"events": [
60+
{
61+
"type": "ConnectionCreated"
62+
},
63+
{
64+
"type": "ConnectionReady"
65+
},
66+
{
67+
"type": "ConnectionCreated"
68+
},
69+
{
70+
"type": "ConnectionReady"
71+
}
72+
],
73+
"ignore": [
74+
"ConnectionCheckOutStarted",
75+
"ConnectionCheckedIn",
76+
"ConnectionCheckedOut",
77+
"ConnectionClosed",
78+
"ConnectionPoolCreated",
79+
"ConnectionPoolReady"
80+
]
81+
}

src/test/spec/json/uri-options/connection-pool-options.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22
"tests": [
33
{
44
"description": "Valid connection pool options are parsed correctly",
5-
"uri": "mongodb://example.com/?maxIdleTimeMS=50000&maxPoolSize=5&minPoolSize=3",
5+
"uri": "mongodb://example.com/?maxIdleTimeMS=50000&maxPoolSize=5&minPoolSize=3&maxConnecting=5",
66
"valid": true,
77
"warning": false,
88
"hosts": null,
99
"auth": null,
1010
"options": {
1111
"maxIdleTimeMS": 50000,
1212
"maxPoolSize": 5,
13-
"minPoolSize": 3
13+
"minPoolSize": 3,
14+
"maxConnecting": 5
1415
}
1516
},
1617
{

0 commit comments

Comments
 (0)