Skip to content

Commit cc8750a

Browse files
authored
Add async-std support and replaced reqwest with surf (#58) (#72)
* Replaced tokio with async-std 1.6.5 and reqwest with surf 2.1.0 * Test with both async-std and tokio * Fix clipy warning * Add features to choose http lib: hyper-client (default), curl-client, h1-client or wasm-client
1 parent 7ee95f4 commit cc8750a

File tree

10 files changed

+110
-103
lines changed

10 files changed

+110
-103
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ use influxdb::{Client, Query, Timestamp};
6161
use influxdb::InfluxDbWriteable;
6262
use chrono::{DateTime, Utc};
6363

64-
#[tokio::main]
64+
#[async_std::main]
65+
// or #[tokio::main] if you prefer
6566
async fn main() {
6667
// Connect to db `test` on `http://localhost:8086`
6768
let client = Client::new("http://localhost:8086", "test");

benches/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use chrono::{DateTime, Utc};
2-
use futures::stream::StreamExt;
32
use influxdb::Error;
43
use influxdb::InfluxDbWriteable;
54
use influxdb::{Client, Query};
@@ -45,7 +44,7 @@ async fn main() {
4544

4645
let mut successful_count = 0;
4746
let mut error_count = 0;
48-
while let Some(res) = rx.next().await {
47+
while let Some(res) = rx.recv().await {
4948
if res.is_err() {
5049
error_count += 1;
5150
} else {

influxdb/Cargo.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,20 @@ futures = "0.3.4"
2121
lazy_static = "1.4.0"
2222
influxdb_derive = { version = "0.2.0", optional = true }
2323
regex = "1.3.5"
24-
reqwest = { version = "0.10.8", features = ["json"] }
24+
surf = { version = "2.1.0", default-features = false }
2525
serde = { version = "1.0.104", features = ["derive"], optional = true }
2626
serde_json = { version = "1.0.48", optional = true }
2727
thiserror = "1.0"
2828

2929
[features]
3030
use-serde = ["serde", "serde_json"]
31-
default = ["use-serde"]
31+
curl-client = ["surf/curl-client"]
32+
h1-client = ["surf/h1-client"]
33+
hyper-client = ["surf/hyper-client"]
34+
wasm-client = ["surf/wasm-client"]
35+
default = ["use-serde", "hyper-client"]
3236
derive = ["influxdb_derive"]
3337

3438
[dev-dependencies]
35-
tokio = { version = "0.2.11", features = ["macros"] }
39+
async-std = { version = "1.6.5", features = ["attributes"] }
40+
tokio = { version = "0.2.22", features = ["rt-threaded", "macros"] }

influxdb/src/client/mod.rs

Lines changed: 46 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,20 @@
1616
//! ```
1717
1818
use futures::prelude::*;
19-
use reqwest::{self, Client as ReqwestClient, StatusCode};
19+
use surf::{self, Client as SurfClient, StatusCode};
2020

2121
use crate::query::QueryTypes;
2222
use crate::Error;
2323
use crate::Query;
24+
use std::collections::HashMap;
2425
use std::sync::Arc;
2526

2627
#[derive(Clone, Debug)]
2728
/// Internal Representation of a Client
2829
pub struct Client {
2930
pub(crate) url: Arc<String>,
30-
pub(crate) parameters: Arc<Vec<(&'static str, String)>>,
31-
pub(crate) client: ReqwestClient,
31+
pub(crate) parameters: Arc<HashMap<&'static str, String>>,
32+
pub(crate) client: SurfClient,
3233
}
3334

3435
impl Client {
@@ -51,10 +52,12 @@ impl Client {
5152
S1: Into<String>,
5253
S2: Into<String>,
5354
{
55+
let mut parameters = HashMap::<&str, String>::new();
56+
parameters.insert("db", database.into());
5457
Client {
5558
url: Arc::new(url.into()),
56-
parameters: Arc::new(vec![("db", database.into())]),
57-
client: ReqwestClient::new(),
59+
parameters: Arc::new(parameters),
60+
client: SurfClient::new(),
5861
}
5962
}
6063

@@ -78,16 +81,16 @@ impl Client {
7881
S2: Into<String>,
7982
{
8083
let mut with_auth = self.parameters.as_ref().clone();
81-
with_auth.push(("u", username.into()));
82-
with_auth.push(("p", password.into()));
84+
with_auth.insert("u", username.into());
85+
with_auth.insert("p", password.into());
8386
self.parameters = Arc::new(with_auth);
8487
self
8588
}
8689

8790
/// Returns the name of the database the client is using
8891
pub fn database_name(&self) -> &str {
8992
// safe to unwrap: we always set the database name in `Self::new`
90-
&self.parameters.first().unwrap().1
93+
self.parameters.get("db").unwrap()
9194
}
9295

9396
/// Returns the URL of the InfluxDB installation the client is using
@@ -109,18 +112,8 @@ impl Client {
109112
error: format!("{}", err),
110113
})?;
111114

112-
let build = res
113-
.headers()
114-
.get("X-Influxdb-Build")
115-
.unwrap()
116-
.to_str()
117-
.unwrap();
118-
let version = res
119-
.headers()
120-
.get("X-Influxdb-Version")
121-
.unwrap()
122-
.to_str()
123-
.unwrap();
115+
let build = res.header("X-Influxdb-Build").unwrap().as_str();
116+
let version = res.header("X-Influxdb-Version").unwrap().as_str();
124117

125118
Ok((build.to_owned(), version.to_owned()))
126119
}
@@ -140,7 +133,7 @@ impl Client {
140133
/// use influxdb::InfluxDbWriteable;
141134
/// use std::time::{SystemTime, UNIX_EPOCH};
142135
///
143-
/// # #[tokio::main]
136+
/// # #[async_std::main]
144137
/// # async fn main() -> Result<(), influxdb::Error> {
145138
/// let start = SystemTime::now();
146139
/// let since_the_epoch = start
@@ -169,60 +162,55 @@ impl Client {
169162
&'q Q: Into<QueryTypes<'q>>,
170163
{
171164
let query = q.build().map_err(|err| Error::InvalidQueryError {
172-
error: format!("{}", err),
165+
error: err.to_string(),
173166
})?;
174167

175168
let request_builder = match q.into() {
176169
QueryTypes::Read(_) => {
177170
let read_query = query.get();
178171
let url = &format!("{}/query", &self.url);
179-
let query = [("q", &read_query)];
172+
let mut parameters = self.parameters.as_ref().clone();
173+
parameters.insert("q", read_query.clone());
180174

181175
if read_query.contains("SELECT") || read_query.contains("SHOW") {
182-
self.client
183-
.get(url)
184-
.query(self.parameters.as_ref())
185-
.query(&query)
176+
self.client.get(url).query(&parameters)
186177
} else {
187-
self.client
188-
.post(url)
189-
.query(self.parameters.as_ref())
190-
.query(&query)
178+
self.client.post(url).query(&parameters)
191179
}
192180
}
193181
QueryTypes::Write(write_query) => {
194182
let url = &format!("{}/write", &self.url);
195-
let precision = [("precision", write_query.get_precision())];
183+
let mut parameters = self.parameters.as_ref().clone();
184+
parameters.insert("precision", write_query.get_precision());
196185

197-
self.client
198-
.post(url)
199-
.query(self.parameters.as_ref())
200-
.query(&precision)
201-
.body(query.get())
186+
self.client.post(url).body(query.get()).query(&parameters)
202187
}
203-
};
204-
205-
let request = request_builder
206-
.build()
207-
.map_err(|err| Error::UrlConstructionError {
208-
error: format!("{}", &err),
209-
})?;
188+
}
189+
.map_err(|err| Error::UrlConstructionError {
190+
error: err.to_string(),
191+
})?;
210192

211-
let res = self
193+
let request = request_builder.build();
194+
let mut res = self
212195
.client
213-
.execute(request)
214-
.map_err(|err| Error::ConnectionError { error: err })
196+
.send(request)
197+
.map_err(|err| Error::ConnectionError {
198+
error: err.to_string(),
199+
})
215200
.await?;
216201

217202
match res.status() {
218-
StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError),
219-
StatusCode::FORBIDDEN => return Err(Error::AuthenticationError),
203+
StatusCode::Unauthorized => return Err(Error::AuthorizationError),
204+
StatusCode::Forbidden => return Err(Error::AuthenticationError),
220205
_ => {}
221206
}
222207

223-
let s = res.text().await.map_err(|_| Error::DeserializationError {
224-
error: "response could not be converted to UTF-8".to_string(),
225-
})?;
208+
let s = res
209+
.body_string()
210+
.await
211+
.map_err(|_| Error::DeserializationError {
212+
error: "response could not be converted to UTF-8".to_string(),
213+
})?;
226214

227215
// todo: improve error parsing without serde
228216
if s.contains("\"error\"") {
@@ -249,16 +237,13 @@ mod tests {
249237
#[test]
250238
fn test_with_auth() {
251239
let client = Client::new("http://localhost:8068", "database");
252-
assert_eq!(vec![("db", "database".to_string())], *client.parameters);
240+
assert_eq!(client.parameters.len(), 1);
241+
assert_eq!(client.parameters.get("db").unwrap(), "database");
253242

254243
let with_auth = client.with_auth("username", "password");
255-
assert_eq!(
256-
vec![
257-
("db", "database".to_string()),
258-
("u", "username".to_string()),
259-
("p", "password".to_string())
260-
],
261-
*with_auth.parameters
262-
);
244+
assert_eq!(with_auth.parameters.len(), 3);
245+
assert_eq!(with_auth.parameters.get("db").unwrap(), "database");
246+
assert_eq!(with_auth.parameters.get("u").unwrap(), "username");
247+
assert_eq!(with_auth.parameters.get("p").unwrap(), "password");
263248
}
264249
}

influxdb/src/error.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ pub enum Error {
3333
AuthorizationError,
3434

3535
#[error("connection error: {error}")]
36-
/// Error happens when reqwest fails
37-
ConnectionError {
38-
#[from]
39-
error: reqwest::Error,
40-
},
36+
/// Error happens when HTTP request fails
37+
ConnectionError { error: String },
4138
}

influxdb/src/integrations/serde_integration/mod.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
//! weather: WeatherWithoutCityName,
2323
//! }
2424
//!
25-
//! # #[tokio::main]
25+
//! # #[async_std::main]
2626
//! # async fn main() -> Result<(), influxdb::Error> {
2727
//! let client = Client::new("http://localhost:8086", "test");
2828
//! let query = Query::raw_read_query(
@@ -48,7 +48,7 @@
4848
4949
mod de;
5050

51-
use reqwest::StatusCode;
51+
use surf::StatusCode;
5252

5353
use serde::{de::DeserializeOwned, Deserialize};
5454

@@ -140,31 +140,33 @@ impl Client {
140140
}
141141

142142
let url = &format!("{}/query", &self.url);
143-
let query = [("q", &read_query)];
143+
let mut parameters = self.parameters.as_ref().clone();
144+
parameters.insert("q", read_query);
144145
let request = self
145146
.client
146147
.get(url)
147-
.query(self.parameters.as_ref())
148-
.query(&query)
149-
.build()
148+
.query(&parameters)
150149
.map_err(|err| Error::UrlConstructionError {
151-
error: format!("{}", err),
152-
})?;
150+
error: err.to_string(),
151+
})?
152+
.build();
153153

154-
let res = self
154+
let mut res = self
155155
.client
156-
.execute(request)
156+
.send(request)
157157
.await
158-
.map_err(|err| Error::ConnectionError { error: err })?;
158+
.map_err(|err| Error::ConnectionError {
159+
error: err.to_string(),
160+
})?;
159161

160162
match res.status() {
161-
StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError),
162-
StatusCode::FORBIDDEN => return Err(Error::AuthenticationError),
163+
StatusCode::Unauthorized => return Err(Error::AuthorizationError),
164+
StatusCode::Forbidden => return Err(Error::AuthenticationError),
163165
_ => {}
164166
}
165167

166-
let body = res.bytes().await.map_err(|err| Error::ProtocolError {
167-
error: format!("{}", err),
168+
let body = res.body_bytes().await.map_err(|err| Error::ProtocolError {
169+
error: err.to_string(),
168170
})?;
169171

170172
// Try parsing InfluxDBs { "error": "error message here" }

influxdb/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
//! use influxdb::InfluxDbWriteable;
3333
//! use chrono::{DateTime, Utc};
3434
//!
35-
//! #[tokio::main]
35+
//! #[async_std::main]
36+
//! // or #[tokio::main] if you prefer
3637
//! async fn main() {
3738
//! // Connect to db `test` on `http://localhost:8086`
3839
//! let client = Client::new("http://localhost:8086", "test");

influxdb/src/query/write_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl Query for WriteQuery {
180180
.join(",");
181181

182182
if !tags.is_empty() {
183-
tags.insert_str(0, ",");
183+
tags.insert(0, ',');
184184
}
185185
let fields = self
186186
.fields

influxdb/tests/derive_integration_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ fn test_build_query() {
4343
/// INTEGRATION TEST
4444
///
4545
/// This integration tests that writing data and retrieving the data again is working
46-
#[tokio::test]
46+
#[async_std::test]
4747
async fn test_derive_simple_write() {
4848
const TEST_NAME: &str = "test_derive_simple_write";
4949

@@ -72,7 +72,7 @@ async fn test_derive_simple_write() {
7272
/// This integration tests that writing data and retrieving the data again is working
7373
#[cfg(feature = "derive")]
7474
#[cfg(feature = "use-serde")]
75-
#[tokio::test]
75+
#[async_std::test]
7676
async fn test_write_and_read_option() {
7777
const TEST_NAME: &str = "test_write_and_read_option";
7878

0 commit comments

Comments
 (0)