Skip to content
This repository was archived by the owner on Oct 18, 2021. It is now read-only.

Commit 6cdf8dd

Browse files
authored
fix race condition causing deadlock when acquiring locks (#225)
1 parent 66f70dc commit 6cdf8dd

File tree

4 files changed

+386
-311
lines changed

4 files changed

+386
-311
lines changed

src/lib.rs

+113-87
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@
9292
#![cfg_attr(feature = "clippy", plugin(clippy))]
9393
#![cfg_attr(feature = "clippy", allow(
9494
doc_markdown,
95-
// allow double_parens for bson/doc macro.
95+
// allow double_parens for bson/doc macro.
9696
double_parens,
97-
// more explicit than catch-alls.
97+
// more explicit than catch-alls.
9898
match_wild_err_arm,
9999
too_many_arguments,
100100
))]
@@ -178,7 +178,7 @@ use error::Error::ResponseError;
178178
use pool::PooledStream;
179179
use stream::StreamConnector;
180180
use topology::{Topology, TopologyDescription, TopologyType, DEFAULT_HEARTBEAT_FREQUENCY_MS,
181-
DEFAULT_LOCAL_THRESHOLD_MS, DEFAULT_SERVER_SELECTION_TIMEOUT_MS};
181+
DEFAULT_LOCAL_THRESHOLD_MS, DEFAULT_SERVER_SELECTION_TIMEOUT_MS};
182182
use topology::server::Server;
183183

184184
/// Interfaces with a MongoDB server or replica set.
@@ -236,16 +236,17 @@ impl ClientOptions {
236236

237237
#[cfg(feature = "ssl")]
238238
/// Creates a new options struct with a specified SSL certificate and key files.
239-
pub fn with_ssl(ca_file: &str,
240-
certificate_file: &str,
241-
key_file: &str,
242-
verify_peer: bool)
243-
-> ClientOptions {
244-
let mut options = ClientOptions::new();
245-
options.stream_connector = StreamConnector::with_ssl(ca_file, certificate_file,
246-
key_file, verify_peer);
247-
options
248-
}
239+
pub fn with_ssl(
240+
ca_file: &str,
241+
certificate_file: &str,
242+
key_file: &str,
243+
verify_peer: bool,
244+
) -> ClientOptions {
245+
let mut options = ClientOptions::new();
246+
options.stream_connector =
247+
StreamConnector::with_ssl(ca_file, certificate_file, key_file, verify_peer);
248+
options
249+
}
249250
}
250251

251252
pub trait ThreadedClient: Sync + Sized {
@@ -261,18 +262,20 @@ pub trait ThreadedClient: Sync + Sized {
261262
fn with_uri_and_options(uri: &str, options: ClientOptions) -> Result<Self>;
262263
/// Create a new Client with manual connection configurations.
263264
/// `connect` and `with_uri` should generally be used as higher-level constructors.
264-
fn with_config(config: ConnectionString,
265-
options: Option<ClientOptions>,
266-
description: Option<TopologyDescription>)
267-
-> Result<Self>;
265+
fn with_config(
266+
config: ConnectionString,
267+
options: Option<ClientOptions>,
268+
description: Option<TopologyDescription>,
269+
) -> Result<Self>;
268270
/// Creates a database representation.
269271
fn db(&self, db_name: &str) -> Database;
270272
/// Creates a database representation with custom read and write controls.
271-
fn db_with_prefs(&self,
272-
db_name: &str,
273-
read_preference: Option<ReadPreference>,
274-
write_concern: Option<WriteConcern>)
275-
-> Database;
273+
fn db_with_prefs(
274+
&self,
275+
db_name: &str,
276+
read_preference: Option<ReadPreference>,
277+
write_concern: Option<WriteConcern>,
278+
) -> Database;
276279
/// Acquires a connection stream from the pool, along with slave_ok and should_send_read_pref.
277280
fn acquire_stream(&self, read_pref: ReadPreference) -> Result<(PooledStream, bool, bool)>;
278281
/// Acquires a connection stream from the pool for write operations.
@@ -319,75 +322,93 @@ impl ThreadedClient for Client {
319322
Client::with_config(config, Some(options), None)
320323
}
321324

322-
fn with_config(config: ConnectionString,
323-
options: Option<ClientOptions>,
324-
description: Option<TopologyDescription>)
325-
-> Result<Client> {
326-
327-
let client_options = options.unwrap_or_else(ClientOptions::new);
328-
329-
let rp = client_options.read_preference
330-
.unwrap_or_else(|| ReadPreference::new(ReadMode::Primary, None));
331-
let wc = client_options.write_concern.unwrap_or_else(WriteConcern::new);
332-
333-
let listener = Listener::new();
334-
let file = match client_options.log_file {
335-
Some(string) => {
336-
let _ = listener.add_start_hook(log_command_started);
337-
let _ = listener.add_completion_hook(log_command_completed);
338-
Some(Mutex::new(try!(OpenOptions::new()
339-
.write(true)
340-
.append(true)
341-
.create(true)
342-
.open(&string))))
343-
}
344-
None => None,
345-
};
346-
347-
let client = Arc::new(ClientInner {
348-
req_id: Arc::new(ATOMIC_ISIZE_INIT),
349-
topology: try!(Topology::new(config.clone(), description, client_options.stream_connector.clone())),
350-
listener: listener,
351-
read_preference: rp,
352-
write_concern: wc,
353-
log_file: file,
354-
});
355-
356-
// Fill servers array and set options
357-
{
358-
let top_description = &client.topology.description;
359-
let mut top = try!(top_description.write());
360-
top.heartbeat_frequency_ms = client_options.heartbeat_frequency_ms;
361-
top.server_selection_timeout_ms = client_options.server_selection_timeout_ms;
362-
top.local_threshold_ms = client_options.local_threshold_ms;
363-
364-
for host in &config.hosts {
365-
let server = Server::new(client.clone(), host.clone(), top_description.clone(), true, client_options.stream_connector.clone());
366-
367-
top.servers.insert(host.clone(), server);
368-
}
325+
fn with_config(
326+
config: ConnectionString,
327+
options: Option<ClientOptions>,
328+
description: Option<TopologyDescription>,
329+
) -> Result<Client> {
330+
331+
let client_options = options.unwrap_or_else(ClientOptions::new);
332+
333+
let rp = client_options.read_preference.unwrap_or_else(|| {
334+
ReadPreference::new(ReadMode::Primary, None)
335+
});
336+
let wc = client_options.write_concern.unwrap_or_else(
337+
WriteConcern::new,
338+
);
339+
340+
let listener = Listener::new();
341+
let file = match client_options.log_file {
342+
Some(string) => {
343+
let _ = listener.add_start_hook(log_command_started);
344+
let _ = listener.add_completion_hook(log_command_completed);
345+
Some(Mutex::new(try!(
346+
OpenOptions::new()
347+
.write(true)
348+
.append(true)
349+
.create(true)
350+
.open(&string)
351+
)))
352+
}
353+
None => None,
354+
};
355+
356+
let client = Arc::new(ClientInner {
357+
req_id: Arc::new(ATOMIC_ISIZE_INIT),
358+
topology: try!(Topology::new(
359+
config.clone(),
360+
description,
361+
client_options.stream_connector.clone(),
362+
)),
363+
listener: listener,
364+
read_preference: rp,
365+
write_concern: wc,
366+
log_file: file,
367+
});
368+
369+
// Fill servers array and set options
370+
{
371+
let top_description = &client.topology.description;
372+
let mut top = try!(top_description.write());
373+
top.heartbeat_frequency_ms = client_options.heartbeat_frequency_ms;
374+
top.server_selection_timeout_ms = client_options.server_selection_timeout_ms;
375+
top.local_threshold_ms = client_options.local_threshold_ms;
376+
377+
for host in &config.hosts {
378+
let server = Server::new(
379+
client.clone(),
380+
host.clone(),
381+
top_description.clone(),
382+
true,
383+
client_options.stream_connector.clone(),
384+
);
385+
386+
top.servers.insert(host.clone(), server);
369387
}
370-
371-
Ok(client)
372388
}
373389

390+
Ok(client)
391+
}
392+
374393
fn db(&self, db_name: &str) -> Database {
375394
Database::open(self.clone(), db_name, None, None)
376395
}
377396

378-
fn db_with_prefs(&self,
379-
db_name: &str,
380-
read_preference: Option<ReadPreference>,
381-
write_concern: Option<WriteConcern>)
382-
-> Database {
383-
Database::open(self.clone(), db_name, read_preference, write_concern)
384-
}
397+
fn db_with_prefs(
398+
&self,
399+
db_name: &str,
400+
read_preference: Option<ReadPreference>,
401+
write_concern: Option<WriteConcern>,
402+
) -> Database {
403+
Database::open(self.clone(), db_name, read_preference, write_concern)
404+
}
385405

386-
fn acquire_stream(&self,
387-
read_preference: ReadPreference)
388-
-> Result<(PooledStream, bool, bool)> {
389-
self.topology.acquire_stream(read_preference)
390-
}
406+
fn acquire_stream(
407+
&self,
408+
read_preference: ReadPreference,
409+
) -> Result<(PooledStream, bool, bool)> {
410+
self.topology.acquire_stream(read_preference)
411+
}
391412

392413
fn acquire_write_stream(&self) -> Result<PooledStream> {
393414
self.topology.acquire_write_stream()
@@ -405,7 +426,8 @@ impl ThreadedClient for Client {
405426
let res = try!(db.command(doc, CommandType::ListDatabases, None));
406427
if let Some(&Bson::Array(ref batch)) = res.get("databases") {
407428
// Extract database names
408-
let map = batch.iter()
429+
let map = batch
430+
.iter()
409431
.filter_map(|bdoc| {
410432
if let Bson::Document(ref doc) = *bdoc {
411433
if let Some(&Bson::String(ref name)) = doc.get("name") {
@@ -414,11 +436,13 @@ impl ThreadedClient for Client {
414436
}
415437
None
416438
})
417-
.collect();
439+
.collect();
418440
return Ok(map);
419441
}
420442

421-
Err(ResponseError(String::from("Server reply does not contain 'databases'.")))
443+
Err(ResponseError(
444+
String::from("Server reply does not contain 'databases'."),
445+
))
422446
}
423447

424448
fn drop_database(&self, db_name: &str) -> Result<()> {
@@ -436,7 +460,9 @@ impl ThreadedClient for Client {
436460

437461
match res.get("ismaster") {
438462
Some(&Bson::Boolean(is_master)) => Ok(is_master),
439-
_ => Err(ResponseError(String::from("Server reply does not contain 'ismaster'."))),
463+
_ => Err(ResponseError(
464+
String::from("Server reply does not contain 'ismaster'."),
465+
)),
440466
}
441467
}
442468

0 commit comments

Comments
 (0)