Skip to content

Commit 223514f

Browse files
committed
Fix custom spawn interface in sync API
1 parent 47d97f0 commit 223514f

File tree

2 files changed

+21
-31
lines changed

2 files changed

+21
-31
lines changed

postgres/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
3232
bytes = "0.5"
3333
fallible-iterator = "0.2"
3434
futures = "0.3"
35-
pin-utils = "=0.1.0-alpha.4"
3635
tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres", default-features = false }
37-
tokio-executor = "=0.2.0-alpha.6"
3836

3937
tokio = { version = "0.2", optional = true, features = ["rt-threaded"] }
4038
lazy_static = { version = "1.0", optional = true }

postgres/src/config.rs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,20 @@
22
//!
33
//! Requires the `runtime` Cargo feature (enabled by default).
44
5-
use futures::{FutureExt, executor};
5+
use crate::{Client, RUNTIME};
6+
use futures::{executor, FutureExt};
67
use log::error;
78
use std::fmt;
9+
use std::future::Future;
810
use std::path::Path;
11+
use std::pin::Pin;
912
use std::str::FromStr;
10-
use std::sync::{mpsc, Arc, Mutex};
13+
use std::sync::{mpsc, Arc};
1114
use std::time::Duration;
12-
use tokio_executor::Executor;
13-
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
14-
use tokio_postgres::{Error, Socket};
15-
1615
#[doc(inline)]
1716
pub use tokio_postgres::config::{ChannelBinding, SslMode, TargetSessionAttrs};
18-
19-
use crate::{Client, RUNTIME};
17+
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
18+
use tokio_postgres::{Error, Socket};
2019

2120
/// Connection configuration.
2221
///
@@ -94,8 +93,7 @@ use crate::{Client, RUNTIME};
9493
#[derive(Clone)]
9594
pub struct Config {
9695
config: tokio_postgres::Config,
97-
// this is an option since we don't want to boot up our default runtime unless we're actually going to use it.
98-
executor: Option<Arc<Mutex<dyn Executor + Send>>>,
96+
spawner: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Sync + Send>>,
9997
}
10098

10199
impl fmt::Debug for Config {
@@ -117,7 +115,7 @@ impl Config {
117115
pub fn new() -> Config {
118116
Config {
119117
config: tokio_postgres::Config::new(),
120-
executor: None,
118+
spawner: None,
121119
}
122120
}
123121

@@ -242,14 +240,14 @@ impl Config {
242240
self
243241
}
244242

245-
/// Sets the executor used to run the connection futures.
243+
/// Sets the spawner used to run the connection futures.
246244
///
247245
/// Defaults to a postgres-specific tokio `Runtime`.
248-
pub fn executor<E>(&mut self, executor: E) -> &mut Config
246+
pub fn spawner<F>(&mut self, spawn: F) -> &mut Config
249247
where
250-
E: Executor + 'static + Send,
248+
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + 'static + Sync + Send,
251249
{
252-
self.executor = Some(Arc::new(Mutex::new(executor)));
250+
self.spawner = Some(Arc::new(spawn));
253251
self
254252
}
255253

@@ -261,22 +259,20 @@ impl Config {
261259
T::Stream: Send,
262260
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
263261
{
264-
let (client, connection) = match &self.executor {
265-
Some(executor) => {
262+
let (client, connection) = match &self.spawner {
263+
Some(spawn) => {
266264
let (tx, rx) = mpsc::channel();
267265
let config = self.config.clone();
268266
let connect = async move {
269267
let r = config.connect(tls).await;
270268
let _ = tx.send(r);
271269
};
272-
executor.lock().unwrap().spawn(Box::pin(connect)).unwrap();
270+
spawn(Box::pin(connect));
273271
rx.recv().unwrap()?
274272
}
275273
None => {
276274
let connect = self.config.connect(tls);
277-
RUNTIME.handle().enter(|| {
278-
executor::block_on(connect)
279-
})?
275+
RUNTIME.handle().enter(|| executor::block_on(connect))?
280276
}
281277
};
282278

@@ -285,13 +281,9 @@ impl Config {
285281
error!("postgres connection error: {}", e)
286282
}
287283
});
288-
match &self.executor {
289-
Some(executor) => {
290-
executor
291-
.lock()
292-
.unwrap()
293-
.spawn(Box::pin(connection))
294-
.unwrap();
284+
match &self.spawner {
285+
Some(spawn) => {
286+
spawn(Box::pin(connection));
295287
}
296288
None => {
297289
RUNTIME.spawn(connection);
@@ -314,7 +306,7 @@ impl From<tokio_postgres::Config> for Config {
314306
fn from(config: tokio_postgres::Config) -> Config {
315307
Config {
316308
config,
317-
executor: None,
309+
spawner: None,
318310
}
319311
}
320312
}

0 commit comments

Comments
 (0)