Skip to content

Commit aebbf46

Browse files
committed
Refactor to mutliple files, serde feature-flag
1 parent 03146a9 commit aebbf46

File tree

9 files changed

+538
-508
lines changed

9 files changed

+538
-508
lines changed

Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,10 @@ futures = "0.1.27"
1010
tokio = "0.1.20"
1111
itertools = "0.8"
1212
failure = "0.1.5"
13-
serde = "1.0.92"
14-
serde_json = "1.0"
13+
serde = { version = "1.0.92", optional = true }
14+
serde_json = { version = "1.0", optional = true }
15+
16+
[features]
17+
serde-orm = ["serde", "serde_json"]
18+
19+
default = ["serde-orm"]

src/client/mod.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
use futures::{Future, Stream};
2+
use reqwest::r#async::{Client, Decoder};
3+
4+
use std::mem;
5+
6+
use crate::error::InfluxDbError;
7+
use crate::query::{InfluxDbQuery, QueryType};
8+
9+
/// Client which can read and write data from InfluxDB.
10+
///
11+
/// # Arguments
12+
///
13+
/// * `url`: The URL where InfluxDB is running (ex. `http://localhost:8086`).
14+
/// * `database`: The Database against which queries and writes will be run.
15+
///
16+
/// # Examples
17+
///
18+
/// ```rust
19+
/// use influxdb::client::InfluxDbClient;
20+
///
21+
/// let client = InfluxDbClient::new("http://localhost:8086", "test");
22+
///
23+
/// assert_eq!(client.database_name(), "test");
24+
/// ```
25+
pub struct InfluxDbClient {
26+
url: String,
27+
database: String,
28+
// auth: Option<InfluxDbAuthentication>
29+
}
30+
31+
impl InfluxDbClient {
32+
/// Instantiates a new [`InfluxDbClient`]
33+
///
34+
/// # Arguments
35+
///
36+
/// * `url`: The URL where InfluxDB is running (ex. `http://localhost:8086`).
37+
/// * `database`: The Database against which queries and writes will be run.
38+
///
39+
/// # Examples
40+
///
41+
/// ```rust
42+
/// use influxdb::client::InfluxDbClient;
43+
///
44+
/// let _client = InfluxDbClient::new("http://localhost:8086", "test");
45+
/// ```
46+
pub fn new<S>(url: S, database: S) -> Self
47+
where
48+
S: Into<String>,
49+
{
50+
InfluxDbClient {
51+
url: url.into(),
52+
database: database.into(),
53+
}
54+
}
55+
56+
pub fn database_name<'a>(&'a self) -> &'a str {
57+
&self.database
58+
}
59+
60+
pub fn database_url<'a>(&'a self) -> &'a str {
61+
&self.url
62+
}
63+
64+
pub fn ping(&self) -> impl Future<Item = (String, String), Error = InfluxDbError> {
65+
Client::new()
66+
.get(format!("{}/ping", self.url).as_str())
67+
.send()
68+
.map(|res| {
69+
let build = res
70+
.headers()
71+
.get("X-Influxdb-Build")
72+
.unwrap()
73+
.to_str()
74+
.unwrap();
75+
let version = res
76+
.headers()
77+
.get("X-Influxdb-Version")
78+
.unwrap()
79+
.to_str()
80+
.unwrap();
81+
82+
(String::from(build), String::from(version))
83+
})
84+
.map_err(|err| InfluxDbError::UnspecifiedError {
85+
error: format!("{}", err),
86+
})
87+
}
88+
89+
pub fn query<Q>(self, q: Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
90+
where
91+
Q: InfluxDbQuery,
92+
{
93+
use futures::future;
94+
95+
let query_type = q.get_type();
96+
let endpoint = match query_type {
97+
QueryType::ReadQuery => "query",
98+
QueryType::WriteQuery => "write",
99+
};
100+
101+
let query = match q.build() {
102+
Err(err) => {
103+
let error = InfluxDbError::UnspecifiedError {
104+
error: format!("{}", err),
105+
};
106+
return Box::new(future::err::<String, InfluxDbError>(error));
107+
}
108+
Ok(query) => query,
109+
};
110+
111+
let query_str = query.get();
112+
let url_params = match query_type {
113+
QueryType::ReadQuery => format!("&q={}", query_str),
114+
QueryType::WriteQuery => String::from(""),
115+
};
116+
117+
let client = match query_type {
118+
QueryType::ReadQuery => Client::new().get(
119+
format!(
120+
"{url}/{endpoint}?db={db}{url_params}",
121+
url = self.url,
122+
endpoint = endpoint,
123+
db = self.database,
124+
url_params = url_params
125+
)
126+
.as_str(),
127+
),
128+
QueryType::WriteQuery => Client::new()
129+
.post(
130+
format!(
131+
"{url}/{endpoint}?db={db}",
132+
url = self.url,
133+
endpoint = endpoint,
134+
db = self.database,
135+
)
136+
.as_str(),
137+
)
138+
.body(query_str),
139+
};
140+
141+
Box::new(
142+
client
143+
.send()
144+
.and_then(|mut res| {
145+
let body = mem::replace(res.body_mut(), Decoder::empty());
146+
body.concat2()
147+
})
148+
.map_err(|err| InfluxDbError::UnspecifiedError {
149+
error: format!("{}", err),
150+
})
151+
.and_then(|body| {
152+
if let Ok(utf8) = std::str::from_utf8(&body) {
153+
let mut s = String::new();
154+
utf8.clone_into(&mut s);
155+
156+
// todo: improve error parsing without serde
157+
if s.contains("\"error\"") {
158+
return futures::future::err(InfluxDbError::UnspecifiedError {
159+
error: format!("influxdb error: \"{}\"", s),
160+
});
161+
}
162+
163+
return futures::future::ok(s);
164+
}
165+
166+
futures::future::err(InfluxDbError::UnspecifiedError {
167+
error: "some other error has happened here!".to_string(),
168+
})
169+
}),
170+
)
171+
}
172+
}

src/error.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#[derive(Debug, Fail)]
2+
/// Errors that might happen in the crate
3+
pub enum InfluxDbError {
4+
#[fail(display = "query must contain at least one field")]
5+
/// Error happens when query has zero fields
6+
InvalidQueryError,
7+
8+
#[fail(
9+
display = "an error happened: \"{}\". this case should be handled better, please file an issue.",
10+
error
11+
)]
12+
/// todo: Error which is a placeholder for more meaningful errors. This should be refactored away.
13+
UnspecifiedError { error: String },
14+
15+
#[fail(display = "InfluxDB encountered the following error: {}", error)]
16+
/// Error which has happened inside InfluxDB
17+
DatabaseError { error: String },
18+
}

src/integrations/serde_integration.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use crate::client::InfluxDbClient;
2+
3+
use serde::de::DeserializeOwned;
4+
5+
use futures::{Future, Stream};
6+
use reqwest::r#async::{Client, Decoder};
7+
8+
use serde_json;
9+
use serde::Deserialize;
10+
use std::mem;
11+
12+
use crate::error::InfluxDbError;
13+
use crate::query::{InfluxDbQuery, QueryType};
14+
15+
#[derive(Deserialize)]
16+
#[doc(hidden)]
17+
struct _DatabaseError {
18+
error: String,
19+
}
20+
21+
pub trait InfluxDbSerdeORM {
22+
fn json_query<T: 'static, Q>(self, q: Q) -> Box<dyn Future<Item = T, Error = InfluxDbError>> where
23+
Q: InfluxDbQuery,
24+
T: DeserializeOwned;
25+
}
26+
27+
impl InfluxDbSerdeORM for InfluxDbClient {
28+
fn json_query<T: 'static, Q>(self, q: Q) -> Box<dyn Future<Item = T, Error = InfluxDbError>>
29+
where
30+
Q: InfluxDbQuery,
31+
T: DeserializeOwned,
32+
{
33+
use futures::future;
34+
35+
let query_type = q.get_type();
36+
let endpoint = match query_type {
37+
QueryType::ReadQuery => "query",
38+
QueryType::WriteQuery => "write",
39+
};
40+
41+
let query = match q.build() {
42+
Err(err) => {
43+
let error = InfluxDbError::UnspecifiedError {
44+
error: format!("{}", err),
45+
};
46+
return Box::new(future::err::<T, InfluxDbError>(error));
47+
}
48+
Ok(query) => query,
49+
};
50+
51+
let query_str = query.get();
52+
let url_params = match query_type {
53+
QueryType::ReadQuery => format!("&q={}", query_str),
54+
QueryType::WriteQuery => String::from(""),
55+
};
56+
57+
let client = match query_type {
58+
QueryType::ReadQuery => Client::new().get(
59+
format!(
60+
"{url}/{endpoint}?db={db}{url_params}",
61+
url = self.database_url(),
62+
endpoint = endpoint,
63+
db = self.database_name(),
64+
url_params = url_params
65+
)
66+
.as_str(),
67+
),
68+
QueryType::WriteQuery => Client::new()
69+
.post(
70+
format!(
71+
"{url}/{endpoint}?db={db}",
72+
url = self.database_url(),
73+
endpoint = endpoint,
74+
db = self.database_name(),
75+
)
76+
.as_str(),
77+
)
78+
.body(query_str),
79+
};
80+
81+
Box::new(
82+
client
83+
.send()
84+
.and_then(|mut res| {
85+
let body = mem::replace(res.body_mut(), Decoder::empty());
86+
body.concat2()
87+
})
88+
.map_err(|err| InfluxDbError::UnspecifiedError {
89+
error: format!("{}", err)
90+
})
91+
.and_then(|body| {
92+
// Try parsing InfluxDBs { "error": "error message here" }
93+
if let Ok(error) = serde_json::from_slice::<_DatabaseError>(&body) {
94+
return futures::future::err(InfluxDbError::DatabaseError {
95+
error: error.error.to_string()
96+
})
97+
} else if let Ok(t_result) = serde_json::from_slice::<T>(&body) {
98+
// Json has another structure, let's try actually parsing it to the type we're deserializing to
99+
return futures::future::result(Ok(t_result));
100+
} else {
101+
return futures::future::err(InfluxDbError::UnspecifiedError {
102+
error: "something wen't wrong during deserializsation of the database response. this might be a bug!".to_string()
103+
})
104+
}
105+
})
106+
)
107+
}
108+
}

0 commit comments

Comments
 (0)