Skip to content

Commit 88e1e23

Browse files
authored
Merge pull request #18 from pganalyze/connection-manager
Replace use of r2d2 with redis-rs ConnectionManager
2 parents fa2db19 + 8a85a0b commit 88e1e23

File tree

3 files changed

+76
-72
lines changed

3 files changed

+76
-72
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ edition = "2018"
1616
travis-ci = { repository = "spk/rust-sidekiq" }
1717

1818
[dependencies]
19+
futures = "*"
1920
rand = "0.8"
2021
serde = "1.0"
2122
serde_json = "1.0"
22-
r2d2 = "0.8"
23-
r2d2_redis = "0.14"
23+
redis = { version = "*", features = ["connection-manager", "async-std-comp", "async-std-tls-comp"] }
2424
time = "0.3"

src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@
1010
#![deny(warnings)]
1111
#![crate_name = "sidekiq"]
1212

13-
extern crate r2d2;
14-
extern crate r2d2_redis;
1513
extern crate rand;
1614
extern crate serde;
1715
extern crate serde_json;
1816

1917
mod sidekiq;
2018
pub use crate::sidekiq::{
21-
create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
22-
RedisPooledConnection,
19+
create_async_redis_pool, create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts,
20+
RedisPool,
2321
};
2422
pub use serde_json::value::Value;

src/sidekiq/mod.rs

Lines changed: 72 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@ use std::fmt;
44
use std::time::{SystemTime, UNIX_EPOCH};
55

66
use crate::Value;
7-
use r2d2_redis::{r2d2, redis, RedisConnectionManager};
87
use rand::distributions::Alphanumeric;
98
use rand::{thread_rng, Rng};
109
use serde::ser::SerializeStruct;
1110
use serde::{Serialize, Serializer};
1211

1312
use time::{Duration, OffsetDateTime};
1413

14+
use futures::executor::block_on;
15+
use futures::future::TryFutureExt;
16+
use redis::aio::ConnectionManager;
17+
1518
const REDIS_URL_ENV: &str = "REDIS_URL";
1619
const REDIS_URL_DEFAULT: &str = "redis://127.0.0.1/";
17-
pub type RedisPooledConnection = r2d2::PooledConnection<RedisConnectionManager>;
18-
pub type RedisPool = r2d2::Pool<RedisConnectionManager>;
20+
pub type RedisPool = ConnectionManager;
1921

2022
#[derive(Debug)]
2123
pub struct ClientError {
@@ -25,19 +27,25 @@ pub struct ClientError {
2527
#[derive(Debug)]
2628
enum ErrorKind {
2729
Redis(redis::RedisError),
28-
PoolInit(r2d2::Error),
2930
}
3031

3132
impl std::error::Error for ClientError {}
3233

33-
pub fn create_redis_pool() -> Result<RedisPool, ClientError> {
34+
pub fn create_redis_pool() -> Result<ConnectionManager, ClientError> {
35+
block_on(create_async_redis_pool())
36+
}
37+
38+
pub async fn create_async_redis_pool() -> Result<ConnectionManager, ClientError> {
3439
let redis_url =
3540
&env::var(&REDIS_URL_ENV.to_owned()).unwrap_or_else(|_| REDIS_URL_DEFAULT.to_owned());
36-
let url = redis::parse_redis_url(redis_url).unwrap();
37-
let manager = RedisConnectionManager::new(url).unwrap();
38-
r2d2::Pool::new(manager).map_err(|err| ClientError {
39-
kind: ErrorKind::PoolInit(err),
40-
})
41+
// Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
42+
// https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
43+
match ConnectionManager::new(redis::Client::open((*redis_url).clone()).unwrap()).await {
44+
Ok(pool) => Ok(pool),
45+
Err(err) => Err(ClientError {
46+
kind: ErrorKind::Redis(err),
47+
}),
48+
}
4149
}
4250

4351
pub struct Job {
@@ -54,7 +62,6 @@ impl fmt::Display for ClientError {
5462
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5563
match self.kind {
5664
ErrorKind::Redis(ref err) => err.fmt(f),
57-
ErrorKind::PoolInit(ref err) => err.fmt(f),
5865
}
5966
}
6067
}
@@ -67,14 +74,6 @@ impl From<redis::RedisError> for ClientError {
6774
}
6875
}
6976

70-
impl From<r2d2::Error> for ClientError {
71-
fn from(error: r2d2::Error) -> ClientError {
72-
ClientError {
73-
kind: ErrorKind::PoolInit(error),
74-
}
75-
}
76-
}
77-
7877
pub struct JobOpts {
7978
pub retry: i64,
8079
pub queue: String,
@@ -157,7 +156,7 @@ pub struct ClientOpts {
157156
}
158157

159158
pub struct Client {
160-
pub redis_pool: RedisPool,
159+
pub redis_pool: ConnectionManager,
161160
pub namespace: Option<String>,
162161
}
163162

@@ -202,22 +201,13 @@ pub struct Client {
202201
/// }
203202
/// ```
204203
impl Client {
205-
pub fn new(redis_pool: RedisPool, opts: ClientOpts) -> Client {
204+
pub fn new(redis_pool: ConnectionManager, opts: ClientOpts) -> Client {
206205
Client {
207206
redis_pool,
208207
namespace: opts.namespace,
209208
}
210209
}
211210

212-
fn connect(&self) -> Result<RedisPooledConnection, ClientError> {
213-
match self.redis_pool.get() {
214-
Ok(conn) => Ok(conn),
215-
Err(err) => Err(ClientError {
216-
kind: ErrorKind::PoolInit(err),
217-
}),
218-
}
219-
}
220-
221211
fn calc_at(&self, target_millsec_number: f64) -> Option<f64> {
222212
let maximum_target: f64 = 1_000_000_000_f64;
223213
let target_millsec: f64 = target_millsec_number;
@@ -237,61 +227,77 @@ impl Client {
237227
}
238228

239229
pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
240-
let interval: f64 = interval.whole_seconds() as f64;
241-
self.raw_push(&[job], self.calc_at(interval))
230+
block_on(self.perform_in_async(interval, job))
242231
}
243232

244233
pub fn perform_at(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
245-
let timestamp: f64 = datetime.unix_timestamp() as f64;
246-
self.raw_push(&[job], self.calc_at(timestamp))
234+
block_on(self.perform_at_async(datetime, job))
247235
}
248236

249237
pub fn push(&self, job: Job) -> Result<(), ClientError> {
250-
self.raw_push(&[job], None)
238+
block_on(self.push_async(job))
251239
}
252240

253241
pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
254-
self.raw_push(jobs, None)
242+
block_on(self.push_bulk_async(jobs))
243+
}
244+
245+
pub async fn perform_in_async(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
246+
let interval: f64 = interval.whole_seconds() as f64;
247+
self.raw_push(&[job], self.calc_at(interval)).await
248+
}
249+
250+
pub async fn perform_at_async(
251+
&self,
252+
datetime: OffsetDateTime,
253+
job: Job,
254+
) -> Result<(), ClientError> {
255+
let timestamp: f64 = datetime.unix_timestamp() as f64;
256+
self.raw_push(&[job], self.calc_at(timestamp)).await
257+
}
258+
259+
pub async fn push_async(&self, job: Job) -> Result<(), ClientError> {
260+
self.raw_push(&[job], None).await
261+
}
262+
263+
pub async fn push_bulk_async(&self, jobs: &[Job]) -> Result<(), ClientError> {
264+
self.raw_push(jobs, None).await
255265
}
256266

257-
fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
267+
async fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
258268
let payload = &payloads[0];
259269
let to_push = payloads
260270
.iter()
261271
.map(|entry| serde_json::to_string(&entry).unwrap())
262272
.collect::<Vec<_>>();
263273

264274
if let Some(value) = at {
265-
match self.connect() {
266-
Ok(mut conn) => redis::pipe()
267-
.atomic()
268-
.cmd("ZADD")
269-
.arg(self.schedule_queue_name())
270-
.arg(value)
271-
.arg(to_push)
272-
.query(&mut *conn)
273-
.map_err(|err| ClientError {
274-
kind: ErrorKind::Redis(err),
275-
}),
276-
Err(err) => Err(err),
277-
}
275+
redis::pipe()
276+
.atomic()
277+
.cmd("ZADD")
278+
.arg(self.schedule_queue_name())
279+
.arg(value)
280+
.arg(to_push)
281+
.query_async(&mut self.redis_pool.clone())
282+
.map_err(|err| ClientError {
283+
kind: ErrorKind::Redis(err),
284+
})
285+
.await
278286
} else {
279-
match self.connect() {
280-
Ok(mut conn) => redis::pipe()
281-
.atomic()
282-
.cmd("SADD")
283-
.arg("queues")
284-
.arg(payload.queue.to_string())
285-
.ignore()
286-
.cmd("LPUSH")
287-
.arg(self.queue_name(&payload.queue))
288-
.arg(to_push)
289-
.query(&mut *conn)
290-
.map_err(|err| ClientError {
291-
kind: ErrorKind::Redis(err),
292-
}),
293-
Err(err) => Err(err),
294-
}
287+
redis::pipe()
288+
.atomic()
289+
.cmd("SADD")
290+
.arg("queues")
291+
.arg(payload.queue.to_string())
292+
.ignore()
293+
.cmd("LPUSH")
294+
.arg(self.queue_name(&payload.queue))
295+
.arg(to_push)
296+
.query_async(&mut self.redis_pool.clone())
297+
.map_err(|err| ClientError {
298+
kind: ErrorKind::Redis(err),
299+
})
300+
.await
295301
}
296302
}
297303

0 commit comments

Comments
 (0)