Skip to content

Commit 17b3af3

Browse files
authored
RUST-1712 Provide a connection pool warmup method (mongodb#932)
1 parent 66f2904 commit 17b3af3

File tree

7 files changed

+163
-28
lines changed

7 files changed

+163
-28
lines changed

src/client.rs

+18
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,24 @@ impl Client {
612612
self.inner.shutdown.executed.store(true, Ordering::SeqCst);
613613
}
614614

615+
/// Add connections to the connection pool up to `min_pool_size`. This is normally not needed -
616+
/// the connection pool will be filled in the background, and new connections created as needed
617+
/// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of
618+
/// creating new connections up-front so that individual operations execute as quickly as
619+
/// possible.
620+
///
621+
/// Note that topology changes require rebuilding the connection pool, so this method cannot
622+
/// guarantee that the pool will always be filled for the lifetime of the `Client`.
623+
///
624+
/// Does nothing if `min_pool_size` is unset or zero.
625+
pub async fn warm_connection_pool(&self) {
626+
if !self.inner.options.min_pool_size.map_or(false, |s| s > 0) {
627+
// No-op when min_pool_size is zero.
628+
return;
629+
}
630+
self.inner.topology.warm_pool().await;
631+
}
632+
615633
/// Check in a server session to the server session pool. The session will be discarded if it is
616634
/// expired or dirty.
617635
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {

src/cmap.rs

+3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ impl ConnectionPool {
134134
ConnectionRequestResult::PoolCleared(e) => {
135135
Err(Error::pool_cleared_error(&self.address, &e))
136136
}
137+
ConnectionRequestResult::PoolWarmed => {
138+
Err(Error::internal("Invalid result from connection requester"))
139+
}
137140
};
138141

139142
match conn {

src/cmap/connection_requester.rs

+47-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub(super) fn channel(handle: WorkerHandle) -> (ConnectionRequester, ConnectionR
2323
/// the pool will stop servicing requests, drop its available connections, and close.
2424
#[derive(Clone, Debug)]
2525
pub(super) struct ConnectionRequester {
26-
sender: mpsc::UnboundedSender<oneshot::Sender<ConnectionRequestResult>>,
26+
sender: mpsc::UnboundedSender<ConnectionRequest>,
2727
_handle: WorkerHandle,
2828
}
2929

@@ -34,33 +34,66 @@ impl ConnectionRequester {
3434

3535
// this only errors if the receiver end is dropped, which can't happen because
3636
// we own a handle to the worker, keeping it alive.
37-
self.sender.send(sender).unwrap();
37+
self.sender
38+
.send(ConnectionRequest {
39+
sender,
40+
warm_pool: false,
41+
})
42+
.unwrap();
3843

3944
// similarly, the receiver only returns an error if the sender is dropped, which
4045
// can't happen due to the handle.
4146
receiver.await.unwrap()
4247
}
48+
49+
pub(super) fn weak(&self) -> WeakConnectionRequester {
50+
WeakConnectionRequester {
51+
sender: self.sender.clone(),
52+
}
53+
}
54+
}
55+
56+
/// Handle for requesting Connections from the pool. This does *not* keep the
57+
/// pool alive.
58+
#[derive(Clone, Debug)]
59+
pub(super) struct WeakConnectionRequester {
60+
sender: mpsc::UnboundedSender<ConnectionRequest>,
61+
}
62+
63+
impl WeakConnectionRequester {
64+
pub(super) async fn request_warm_pool(&self) -> Option<ConnectionRequestResult> {
65+
let (sender, receiver) = oneshot::channel();
66+
if self
67+
.sender
68+
.send(ConnectionRequest {
69+
sender,
70+
warm_pool: true,
71+
})
72+
.is_err()
73+
{
74+
return None;
75+
}
76+
receiver.await.ok()
77+
}
4378
}
4479

4580
/// Receiving end of a given ConnectionRequester.
4681
#[derive(Debug)]
4782
pub(super) struct ConnectionRequestReceiver {
48-
receiver: mpsc::UnboundedReceiver<oneshot::Sender<ConnectionRequestResult>>,
83+
receiver: mpsc::UnboundedReceiver<ConnectionRequest>,
4984
}
5085

5186
impl ConnectionRequestReceiver {
5287
pub(super) async fn recv(&mut self) -> Option<ConnectionRequest> {
53-
self.receiver
54-
.recv()
55-
.await
56-
.map(|sender| ConnectionRequest { sender })
88+
self.receiver.recv().await
5789
}
5890
}
5991

6092
/// Struct encapsulating a request for a connection.
6193
#[derive(Debug)]
6294
pub(super) struct ConnectionRequest {
6395
sender: oneshot::Sender<ConnectionRequestResult>,
96+
warm_pool: bool,
6497
}
6598

6699
impl ConnectionRequest {
@@ -72,6 +105,10 @@ impl ConnectionRequest {
72105
) -> std::result::Result<(), ConnectionRequestResult> {
73106
self.sender.send(result)
74107
}
108+
109+
pub(super) fn is_warm_pool(&self) -> bool {
110+
self.warm_pool
111+
}
75112
}
76113

77114
#[derive(Debug)]
@@ -86,6 +123,9 @@ pub(super) enum ConnectionRequestResult {
86123
/// The request was rejected because the pool was cleared before it could
87124
/// be fulfilled. The error that caused the pool to be cleared is returned.
88125
PoolCleared(Error),
126+
127+
/// The request set `warm_pool: true` and the pool has reached `min_pool_size`.
128+
PoolWarmed,
89129
}
90130

91131
impl ConnectionRequestResult {

src/cmap/worker.rs

+65-20
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use super::{
88
ConnectionRequestReceiver,
99
ConnectionRequestResult,
1010
ConnectionRequester,
11+
WeakConnectionRequester,
1112
},
1213
establish::ConnectionEstablisher,
1314
manager,
@@ -110,6 +111,10 @@ pub(crate) struct ConnectionPoolWorker {
110111
/// sender ends of this receiver drop, this worker will be notified and drop too.
111112
handle_listener: WorkerHandleListener,
112113

114+
/// Sender for connection check out requests. Does not keep the worker alive the way
115+
/// a `ConnectionRequeter` would since it doesn't hold a `WorkerHandle`.
116+
weak_requester: WeakConnectionRequester,
117+
113118
/// Receiver for incoming connection check out requests.
114119
request_receiver: ConnectionRequestReceiver,
115120

@@ -218,6 +223,7 @@ impl ConnectionPoolWorker {
218223
service_connection_count: HashMap::new(),
219224
available_connections: VecDeque::new(),
220225
max_pool_size,
226+
weak_requester: connection_requester.weak(),
221227
request_receiver,
222228
wait_queue: Default::default(),
223229
management_receiver,
@@ -312,6 +318,12 @@ impl ConnectionPoolWorker {
312318
shutdown_ack = Some(ack);
313319
break;
314320
}
321+
BroadcastMessage::FillPool => {
322+
crate::runtime::execute(fill_pool(
323+
self.weak_requester.clone(),
324+
ack,
325+
));
326+
}
315327
#[cfg(test)]
316328
BroadcastMessage::SyncWorkers => {
317329
ack.acknowledge(());
@@ -363,30 +375,39 @@ impl ConnectionPoolWorker {
363375
}
364376

365377
fn check_out(&mut self, request: ConnectionRequest) {
366-
// first attempt to check out an available connection
367-
while let Some(mut conn) = self.available_connections.pop_back() {
368-
// Close the connection if it's stale.
369-
if conn.generation.is_stale(&self.generation) {
370-
self.close_connection(conn, ConnectionClosedReason::Stale);
371-
continue;
378+
if request.is_warm_pool() {
379+
if self.total_connection_count >= self.min_pool_size.unwrap_or(0) {
380+
let _ = request.fulfill(ConnectionRequestResult::PoolWarmed);
381+
return;
372382
}
383+
} else {
384+
// first attempt to check out an available connection
385+
while let Some(mut conn) = self.available_connections.pop_back() {
386+
// Close the connection if it's stale.
387+
if conn.generation.is_stale(&self.generation) {
388+
self.close_connection(conn, ConnectionClosedReason::Stale);
389+
continue;
390+
}
373391

374-
// Close the connection if it's idle.
375-
if conn.is_idle(self.max_idle_time) {
376-
self.close_connection(conn, ConnectionClosedReason::Idle);
377-
continue;
378-
}
392+
// Close the connection if it's idle.
393+
if conn.is_idle(self.max_idle_time) {
394+
self.close_connection(conn, ConnectionClosedReason::Idle);
395+
continue;
396+
}
379397

380-
conn.mark_as_in_use(self.manager.clone());
381-
if let Err(request) = request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn))) {
382-
// checking out thread stopped listening, indicating it hit the WaitQueue
383-
// timeout, so we put connection back into pool.
384-
let mut connection = request.unwrap_pooled_connection();
385-
connection.mark_as_available();
386-
self.available_connections.push_back(connection);
387-
}
398+
conn.mark_as_in_use(self.manager.clone());
399+
if let Err(request) =
400+
request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn)))
401+
{
402+
// checking out thread stopped listening, indicating it hit the WaitQueue
403+
// timeout, so we put connection back into pool.
404+
let mut connection = request.unwrap_pooled_connection();
405+
connection.mark_as_available();
406+
self.available_connections.push_back(connection);
407+
}
388408

389-
return;
409+
return;
410+
}
390411
}
391412

392413
// otherwise, attempt to create a connection.
@@ -669,6 +690,30 @@ async fn establish_connection(
669690
establish_result.map_err(|e| e.cause)
670691
}
671692

693+
async fn fill_pool(
694+
requester: WeakConnectionRequester,
695+
ack: crate::runtime::AcknowledgmentSender<()>,
696+
) {
697+
let mut establishing = vec![];
698+
loop {
699+
let result = requester.request_warm_pool().await;
700+
match result {
701+
None => break,
702+
Some(ConnectionRequestResult::Establishing(handle)) => {
703+
// Let connections finish establishing in parallel.
704+
establishing.push(crate::runtime::spawn(async move {
705+
let _ = handle.await;
706+
// The connection is dropped here, returning it to the pool.
707+
}));
708+
}
709+
_ => break,
710+
};
711+
}
712+
// Wait for all connections to finish establishing before reporting completion.
713+
futures_util::future::join_all(establishing).await;
714+
ack.acknowledge(());
715+
}
716+
672717
/// Enum modeling the possible pool states as described in the CMAP spec.
673718
///
674719
/// The "closed" state is omitted here because the pool considered closed only

src/runtime.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ mod worker_handle;
2121
use std::{future::Future, net::SocketAddr, time::Duration};
2222

2323
pub(crate) use self::{
24-
acknowledged_message::{AcknowledgedMessage, AcknowledgmentReceiver},
24+
acknowledged_message::{AcknowledgedMessage, AcknowledgmentReceiver, AcknowledgmentSender},
2525
join_handle::AsyncJoinHandle,
2626
resolver::AsyncResolver,
2727
stream::AsyncStream,

src/sdam/topology.rs

+10
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ impl Topology {
214214
self.updater.shutdown().await;
215215
}
216216

217+
pub(crate) async fn warm_pool(&self) {
218+
self.updater.fill_pool().await;
219+
}
220+
217221
/// Gets the addresses of the servers in the cluster.
218222
#[cfg(test)]
219223
pub(crate) fn server_addresses(&mut self) -> HashSet<ServerAddress> {
@@ -278,6 +282,7 @@ pub(crate) enum UpdateMessage {
278282
#[derive(Debug, Clone)]
279283
pub(crate) enum BroadcastMessage {
280284
Shutdown,
285+
FillPool,
281286
#[cfg(test)]
282287
SyncWorkers,
283288
}
@@ -877,6 +882,11 @@ impl TopologyUpdater {
877882
.await;
878883
}
879884

885+
pub(crate) async fn fill_pool(&self) {
886+
self.send_message(UpdateMessage::Broadcast(BroadcastMessage::FillPool))
887+
.await;
888+
}
889+
880890
#[cfg(test)]
881891
pub(crate) async fn sync_workers(&self) {
882892
self.send_message(UpdateMessage::Broadcast(BroadcastMessage::SyncWorkers))

src/test/client.rs

+19
Original file line numberDiff line numberDiff line change
@@ -968,3 +968,22 @@ async fn manual_shutdown_immediate_with_resources() {
968968
.is_empty());
969969
assert!(events.get_command_started_events(&["delete"]).is_empty());
970970
}
971+
972+
// Verifies that `Client::warm_connection_pool` succeeds.
973+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
974+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
975+
async fn warm_connection_pool() {
976+
let _guard = LOCK.run_exclusively().await;
977+
let client = Client::test_builder()
978+
.options({
979+
let mut opts = CLIENT_OPTIONS.get().await.clone();
980+
opts.min_pool_size = Some(10);
981+
opts
982+
})
983+
.build()
984+
.await;
985+
986+
client.warm_connection_pool().await;
987+
// Validate that a command executes.
988+
client.list_database_names(None, None).await.unwrap();
989+
}

0 commit comments

Comments
 (0)