Skip to content
This repository was archived by the owner on Sep 4, 2024. It is now read-only.

Commit 35cbdf7

Browse files
committed
conditional compilation for async roundtripper
Signed-off-by: Gregory Hill <[email protected]>
1 parent e651798 commit 35cbdf7

File tree

4 files changed

+195
-58
lines changed

4 files changed

+195
-58
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ documentation = "https://docs.rs/jsonrpc/"
99
description = "Rust support for the JSON-RPC 2.0 protocol"
1010
keywords = [ "protocol", "json", "http", "jsonrpc" ]
1111
readme = "README.md"
12+
edition = "2018"
13+
14+
[features]
15+
async = []
1216

1317
[lib]
1418
name = "jsonrpc"

src/client.rs

Lines changed: 187 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@
1818
//! and parsing responses
1919
//!
2020
21-
use std::{error, io};
2221
use std::collections::HashMap;
2322
use std::sync::{Arc, Mutex};
23+
use std::{error, io};
2424

25-
use serde;
2625
use base64;
2726
use http;
27+
use serde;
2828
use serde_json;
2929

3030
use super::{Request, Response};
31-
use util::HashableValue;
32-
use error::Error;
31+
use crate::error::Error;
32+
use crate::util::HashableValue;
3333

3434
/// An interface for an HTTP roundtripper that handles HTTP requests.
3535
pub trait HttpRoundTripper {
@@ -38,30 +38,53 @@ pub trait HttpRoundTripper {
3838
/// The type for errors generated by the roundtripper.
3939
type Err: error::Error;
4040

41-
/// Make an HTTP request. In practice only POST request will be made.
41+
/// Make a synchronous HTTP request. In practice only POST request will be made.
4242
fn request(
4343
&self,
44-
http::Request<&[u8]>,
44+
_request: http::Request<Vec<u8>>,
4545
) -> Result<http::Response<Self::ResponseBody>, Self::Err>;
4646
}
4747

48+
/// An interface for an asynchronous HTTP roundtripper that handles HTTP requests.
49+
#[cfg(feature = "async")]
50+
pub trait AsyncHttpRoundTripper {
51+
/// The type of the http::Response body.
52+
type ResponseBody: io::Read;
53+
/// The type for errors generated by the roundtripper.
54+
type Err: error::Error;
55+
56+
/// Make an asynchronous HTTP request. In practice only POST request will be made.
57+
fn request<'life>(
58+
&'life self,
59+
_request: http::Request<Vec<u8>>,
60+
) -> std::pin::Pin<
61+
Box<
62+
dyn std::future::Future<Output = Result<http::Response<Self::ResponseBody>, Self::Err>>
63+
+ Send
64+
+ 'life,
65+
>,
66+
>
67+
where
68+
Self: Sync + 'life;
69+
}
70+
4871
/// A handle to a remote JSONRPC server
49-
pub struct Client<R: HttpRoundTripper> {
72+
pub struct Client<R> {
5073
url: String,
5174
user: Option<String>,
5275
pass: Option<String>,
5376
roundtripper: R,
5477
nonce: Arc<Mutex<u64>>,
5578
}
5679

57-
impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
80+
impl<R> Client<R> {
5881
/// Creates a new client
5982
pub fn new(
60-
roundtripper: Rt,
83+
roundtripper: R,
6184
url: String,
6285
user: Option<String>,
6386
pass: Option<String>,
64-
) -> Client<Rt> {
87+
) -> Client<R> {
6588
// Check that if we have a password, we have a username; other way around is ok
6689
debug_assert!(pass.is_none() || user.is_some());
6790

@@ -74,23 +97,25 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
7497
}
7598
}
7699

77-
/// Make a request and deserialize the response
78-
pub fn do_rpc<T: for<'a> serde::de::Deserialize<'a>>(
100+
/// Builds a request
101+
pub fn build_request<'a, 'b>(
79102
&self,
80-
rpc_name: &str,
81-
args: &[serde_json::value::Value],
82-
) -> Result<T, Error> {
83-
let request = self.build_request(rpc_name, args);
84-
let response = self.send_request(&request)?;
85-
86-
Ok(response.into_result()?)
103+
name: &'a str,
104+
params: &'b [serde_json::Value],
105+
) -> Request<'a, 'b> {
106+
let mut nonce = self.nonce.lock().unwrap();
107+
*nonce += 1;
108+
Request {
109+
method: name,
110+
params: params,
111+
id: From::from(*nonce),
112+
jsonrpc: Some("2.0"),
113+
}
87114
}
88115

89-
/// The actual send logic used by both [send_request] and [send_batch].
90-
fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
116+
fn build_http_request<B>(&self, body: &B) -> Result<http::Request<Vec<u8>>, Error>
91117
where
92118
B: serde::ser::Serialize,
93-
R: for<'de> serde::de::Deserialize<'de>,
94119
{
95120
// Build request
96121
let request_raw = serde_json::to_vec(body)?;
@@ -111,41 +136,68 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
111136
}
112137

113138
// Errors only on invalid header or builder reuse.
114-
let http_request = request_builder.body(&request_raw[..]).unwrap();
139+
Ok(request_builder.body(request_raw).unwrap())
140+
}
141+
142+
/// Accessor for the last-used nonce
143+
pub fn last_nonce(&self) -> u64 {
144+
*self.nonce.lock().unwrap()
145+
}
146+
}
147+
148+
impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
149+
/// Make a request and deserialize the response
150+
pub fn do_rpc<T: for<'a> serde::de::Deserialize<'a>>(
151+
&self,
152+
rpc_name: &str,
153+
args: &[serde_json::value::Value],
154+
) -> Result<T, Error> {
155+
let request = self.build_request(rpc_name, args);
156+
let response = self.send_request(&request)?;
157+
Ok(response.into_result()?)
158+
}
159+
160+
/// The actual send logic used by both [send_request] and [send_batch].
161+
fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
162+
where
163+
B: serde::ser::Serialize,
164+
R: for<'de> serde::de::Deserialize<'de>,
165+
{
166+
let http_request = self.build_http_request(body)?;
115167

116-
let http_response =
117-
self.roundtripper.request(http_request).map_err(|e| Error::Http(Box::new(e)))?;
168+
let http_response = self
169+
.roundtripper
170+
.request(http_request)
171+
.map_err(|e| Error::Http(Box::new(e)))?;
118172

119173
// nb we ignore stream.status since we expect the body
120174
// to contain information about any error
121175
Ok(serde_json::from_reader(http_response.into_body())?)
122176
}
123177

124178
/// Sends a request to a client
125-
pub fn send_request(&self, request: &Request) -> Result<Response, Error> {
179+
pub fn send_request<'a, 'b>(&self, request: &Request<'a, 'b>) -> Result<Response, Error> {
126180
let response: Response = self.send_raw(&request)?;
127-
if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
128-
return Err(Error::VersionMismatch);
129-
}
130-
if response.id != request.id {
131-
return Err(Error::NonceMismatch);
132-
}
133-
Ok(response)
181+
validate_response(request, response)
134182
}
135183

136184
/// Sends a batch of requests to the client. The return vector holds the response
137185
/// for the request at the corresponding index. If no response was provided, it's [None].
138186
///
139187
/// Note that the requests need to have valid IDs, so it is advised to create the requests
140188
/// with [build_request].
141-
pub fn send_batch(&self, requests: &[Request]) -> Result<Vec<Option<Response>>, Error> {
189+
pub fn send_batch<'a, 'b>(
190+
&self,
191+
requests: &[Request<'a, 'b>],
192+
) -> Result<Vec<Option<Response>>, Error> {
142193
if requests.len() < 1 {
143194
return Err(Error::EmptyBatch);
144195
}
145196

146197
// If the request body is invalid JSON, the response is a single response object.
147198
// We ignore this case since we are confident we are producing valid JSON.
148199
let responses: Vec<Response> = self.send_raw(&requests)?;
200+
149201
if responses.len() > requests.len() {
150202
return Err(Error::WrongBatchResponseSize);
151203
}
@@ -162,8 +214,10 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
162214
}
163215
}
164216
// Match responses to the requests.
165-
let results =
166-
requests.into_iter().map(|r| resp_by_id.remove(&HashableValue(&r.id))).collect();
217+
let results = requests
218+
.into_iter()
219+
.map(|r| resp_by_id.remove(&HashableValue(&r.id)))
220+
.collect();
167221

168222
// Since we're also just producing the first duplicate ID, we can also just produce the
169223
// first incorrect ID in case there are multiple.
@@ -173,27 +227,108 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
173227

174228
Ok(results)
175229
}
230+
}
176231

177-
/// Builds a request
178-
pub fn build_request<'a, 'b>(
232+
#[cfg(feature = "async")]
233+
impl<Rt: AsyncHttpRoundTripper + 'static + Sync> Client<Rt> {
234+
/// Make a request and deserialize the response
235+
pub async fn do_rpc_async<T: for<'a> serde::de::Deserialize<'a>>(
179236
&self,
180-
name: &'a str,
181-
params: &'b [serde_json::Value],
182-
) -> Request<'a, 'b> {
183-
let mut nonce = self.nonce.lock().unwrap();
184-
*nonce += 1;
185-
Request {
186-
method: name,
187-
params: params,
188-
id: From::from(*nonce),
189-
jsonrpc: Some("2.0"),
237+
rpc_name: &str,
238+
args: &[serde_json::value::Value],
239+
) -> Result<T, Error> {
240+
let request = self.build_request(rpc_name, args);
241+
let response = self.send_request_async(&request).await?;
242+
Ok(response.into_result()?)
243+
}
244+
245+
/// The actual send logic used by both [send_request] and [send_batch].
246+
async fn send_raw_async<B, R>(&self, body: &B) -> Result<R, Error>
247+
where
248+
B: serde::ser::Serialize,
249+
R: for<'de> serde::de::Deserialize<'de>,
250+
{
251+
let http_request = self.build_http_request(body)?;
252+
253+
let http_response = self
254+
.roundtripper
255+
.request(http_request)
256+
.await
257+
.map_err(|e| Error::Http(Box::new(e)))?;
258+
259+
// nb we ignore stream.status since we expect the body
260+
// to contain information about any error
261+
Ok(serde_json::from_reader(http_response.into_body())?)
262+
}
263+
264+
/// Sends a request to a client
265+
pub async fn send_request_async<'a, 'b>(
266+
&self,
267+
request: &Request<'a, 'b>,
268+
) -> Result<Response, Error> {
269+
let response: Response = self.send_raw_async(&request).await?;
270+
validate_response(request, response)
271+
}
272+
273+
/// Sends a batch of requests to the client. The return vector holds the response
274+
/// for the request at the corresponding index. If no response was provided, it's [None].
275+
///
276+
/// Note that the requests need to have valid IDs, so it is advised to create the requests
277+
/// with [build_request].
278+
pub async fn send_batch_async<'a, 'b>(
279+
&self,
280+
requests: &[Request<'a, 'b>],
281+
) -> Result<Vec<Option<Response>>, Error> {
282+
if requests.len() < 1 {
283+
return Err(Error::EmptyBatch);
284+
}
285+
286+
// If the request body is invalid JSON, the response is a single response object.
287+
// We ignore this case since we are confident we are producing valid JSON.
288+
let responses: Vec<Response> = self.send_raw_async(&requests).await?;
289+
290+
if responses.len() > requests.len() {
291+
return Err(Error::WrongBatchResponseSize);
292+
}
293+
294+
// To prevent having to clone responses, we first copy all the IDs so we can reference
295+
// them easily. IDs can only be of JSON type String or Number (or Null), so cloning
296+
// should be inexpensive and require no allocations as Numbers are more common.
297+
let ids: Vec<serde_json::Value> = responses.iter().map(|r| r.id.clone()).collect();
298+
// First index responses by ID and catch duplicate IDs.
299+
let mut resp_by_id = HashMap::new();
300+
for (id, resp) in ids.iter().zip(responses.into_iter()) {
301+
if let Some(dup) = resp_by_id.insert(HashableValue(&id), resp) {
302+
return Err(Error::BatchDuplicateResponseId(dup.id));
303+
}
304+
}
305+
// Match responses to the requests.
306+
let results = requests
307+
.into_iter()
308+
.map(|r| resp_by_id.remove(&HashableValue(&r.id)))
309+
.collect();
310+
311+
// Since we're also just producing the first duplicate ID, we can also just produce the
312+
// first incorrect ID in case there are multiple.
313+
if let Some(incorrect) = resp_by_id.into_iter().nth(0) {
314+
return Err(Error::WrongBatchResponseId(incorrect.1.id));
190315
}
316+
317+
Ok(results)
191318
}
319+
}
192320

193-
/// Accessor for the last-used nonce
194-
pub fn last_nonce(&self) -> u64 {
195-
*self.nonce.lock().unwrap()
321+
fn validate_response<'a, 'b>(
322+
request: &Request<'a, 'b>,
323+
response: Response,
324+
) -> Result<Response, Error> {
325+
if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
326+
return Err(Error::VersionMismatch);
327+
}
328+
if response.id != request.id {
329+
return Err(Error::NonceMismatch);
196330
}
331+
Ok(response)
197332
}
198333

199334
#[cfg(test)]
@@ -208,7 +343,7 @@ mod tests {
208343

209344
fn request(
210345
&self,
211-
_: http::Request<&[u8]>,
346+
_: http::Request<Vec<u8>>,
212347
) -> Result<http::Response<Self::ResponseBody>, Self::Err> {
213348
Err(io::ErrorKind::Other.into())
214349
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::{error, fmt};
2121

2222
use serde_json;
2323

24-
use Response;
24+
use crate::Response;
2525

2626
/// A library error
2727
#[derive(Debug)]

src/util.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,18 @@ impl<'a> Hash for HashableValue<'a> {
4444
} else {
4545
n.to_string().hash(state);
4646
}
47-
},
47+
}
4848
Value::String(ref s) => {
4949
"string".hash(state);
5050
s.hash(state);
51-
},
51+
}
5252
Value::Array(ref v) => {
5353
"array".hash(state);
5454
v.len().hash(state);
5555
for obj in v {
5656
HashableValue(obj).hash(state);
5757
}
58-
},
58+
}
5959
Value::Object(ref m) => {
6060
"object".hash(state);
6161
m.len().hash(state);
@@ -116,5 +116,3 @@ mod tests {
116116
assert!(coll.contains(&m));
117117
}
118118
}
119-
120-

0 commit comments

Comments
 (0)