Skip to content
This repository was archived by the owner on Sep 4, 2024. It is now read-only.

Commit bb8b196

Browse files
committed
conditional compilation for async roundtripper
Signed-off-by: Gregory Hill <[email protected]>
1 parent e651798 commit bb8b196

File tree

4 files changed

+199
-82
lines changed

4 files changed

+199
-82
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ documentation = "https://docs.rs/jsonrpc/"
99
description = "Rust support for the JSON-RPC 2.0 protocol"
1010
keywords = [ "protocol", "json", "http", "jsonrpc" ]
1111
readme = "README.md"
12+
edition = "2018"
13+
14+
[features]
15+
async = []
1216

1317
[lib]
1418
name = "jsonrpc"

src/client.rs

Lines changed: 191 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@
1818
//! and parsing responses
1919
//!
2020
21-
use std::{error, io};
2221
use std::collections::HashMap;
2322
use std::sync::{Arc, Mutex};
23+
use std::{error, io};
2424

25-
use serde;
2625
use base64;
2726
use http;
27+
use serde;
2828
use serde_json;
2929

3030
use super::{Request, Response};
31-
use util::HashableValue;
32-
use error::Error;
31+
use crate::error::Error;
32+
use crate::util::HashableValue;
3333

3434
/// An interface for an HTTP roundtripper that handles HTTP requests.
3535
pub trait HttpRoundTripper {
@@ -38,30 +38,53 @@ pub trait HttpRoundTripper {
3838
/// The type for errors generated by the roundtripper.
3939
type Err: error::Error;
4040

41-
/// Make an HTTP request. In practice only POST request will be made.
41+
/// Make a synchronous HTTP request. In practice only POST request will be made.
4242
fn request(
4343
&self,
44-
http::Request<&[u8]>,
44+
_request: http::Request<Vec<u8>>,
4545
) -> Result<http::Response<Self::ResponseBody>, Self::Err>;
4646
}
4747

48+
/// An interface for an asynchronous HTTP roundtripper that handles HTTP requests.
49+
#[cfg(feature = "async")]
50+
pub trait AsyncHttpRoundTripper {
51+
/// The type of the http::Response body.
52+
type ResponseBody: io::Read;
53+
/// The type for errors generated by the roundtripper.
54+
type Err: error::Error;
55+
56+
/// Make an asynchronous HTTP request. In practice only POST request will be made.
57+
fn request<'life>(
58+
&'life self,
59+
_request: http::Request<Vec<u8>>,
60+
) -> std::pin::Pin<
61+
Box<
62+
dyn std::future::Future<Output = Result<http::Response<Self::ResponseBody>, Self::Err>>
63+
+ Send
64+
+ 'life,
65+
>,
66+
>
67+
where
68+
Self: Sync + 'life;
69+
}
70+
4871
/// A handle to a remote JSONRPC server
49-
pub struct Client<R: HttpRoundTripper> {
72+
pub struct Client<R> {
5073
url: String,
5174
user: Option<String>,
5275
pass: Option<String>,
5376
roundtripper: R,
5477
nonce: Arc<Mutex<u64>>,
5578
}
5679

57-
impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
80+
impl<R> Client<R> {
5881
/// Creates a new client
5982
pub fn new(
60-
roundtripper: Rt,
83+
roundtripper: R,
6184
url: String,
6285
user: Option<String>,
6386
pass: Option<String>,
64-
) -> Client<Rt> {
87+
) -> Client<R> {
6588
// Check that if we have a password, we have a username; other way around is ok
6689
debug_assert!(pass.is_none() || user.is_some());
6790

@@ -74,23 +97,25 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
7497
}
7598
}
7699

77-
/// Make a request and deserialize the response
78-
pub fn do_rpc<T: for<'a> serde::de::Deserialize<'a>>(
100+
/// Builds a request
101+
pub fn build_request<'a, 'b>(
79102
&self,
80-
rpc_name: &str,
81-
args: &[serde_json::value::Value],
82-
) -> Result<T, Error> {
83-
let request = self.build_request(rpc_name, args);
84-
let response = self.send_request(&request)?;
85-
86-
Ok(response.into_result()?)
103+
name: &'a str,
104+
params: &'b [serde_json::Value],
105+
) -> Request<'a, 'b> {
106+
let mut nonce = self.nonce.lock().unwrap();
107+
*nonce += 1;
108+
Request {
109+
method: name,
110+
params: params,
111+
id: From::from(*nonce),
112+
jsonrpc: Some("2.0"),
113+
}
87114
}
88115

89-
/// The actual send logic used by both [send_request] and [send_batch].
90-
fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
116+
fn build_http_request<B>(&self, body: &B) -> Result<http::Request<Vec<u8>>, Error>
91117
where
92118
B: serde::ser::Serialize,
93-
R: for<'de> serde::de::Deserialize<'de>,
94119
{
95120
// Build request
96121
let request_raw = serde_json::to_vec(body)?;
@@ -111,89 +136,179 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
111136
}
112137

113138
// Errors only on invalid header or builder reuse.
114-
let http_request = request_builder.body(&request_raw[..]).unwrap();
139+
Ok(request_builder.body(request_raw).unwrap())
140+
}
115141

116-
let http_response =
117-
self.roundtripper.request(http_request).map_err(|e| Error::Http(Box::new(e)))?;
142+
/// Accessor for the last-used nonce
143+
pub fn last_nonce(&self) -> u64 {
144+
*self.nonce.lock().unwrap()
145+
}
146+
}
147+
148+
impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
149+
/// Make a request and deserialize the response
150+
pub fn do_rpc<T: for<'a> serde::de::Deserialize<'a>>(
151+
&self,
152+
rpc_name: &str,
153+
args: &[serde_json::value::Value],
154+
) -> Result<T, Error> {
155+
let request = self.build_request(rpc_name, args);
156+
let response = self.send_request(&request)?;
157+
Ok(response.into_result()?)
158+
}
159+
160+
/// The actual send logic used by both [send_request] and [send_batch].
161+
fn send_raw<B, R>(&self, body: &B) -> Result<R, Error>
162+
where
163+
B: serde::ser::Serialize,
164+
R: for<'de> serde::de::Deserialize<'de>,
165+
{
166+
let http_request = self.build_http_request(body)?;
167+
168+
let http_response = self
169+
.roundtripper
170+
.request(http_request)
171+
.map_err(|e| Error::Http(Box::new(e)))?;
118172

119173
// nb we ignore stream.status since we expect the body
120174
// to contain information about any error
121175
Ok(serde_json::from_reader(http_response.into_body())?)
122176
}
123177

124178
/// Sends a request to a client
125-
pub fn send_request(&self, request: &Request) -> Result<Response, Error> {
179+
pub fn send_request<'a, 'b>(&self, request: &Request<'a, 'b>) -> Result<Response, Error> {
126180
let response: Response = self.send_raw(&request)?;
127-
if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
128-
return Err(Error::VersionMismatch);
129-
}
130-
if response.id != request.id {
131-
return Err(Error::NonceMismatch);
132-
}
133-
Ok(response)
181+
validate_response(request, response)
134182
}
135183

136-
/// Sends a batch of requests to the client. The return vector holds the response
137-
/// for the request at the corresponding index. If no response was provided, it's [None].
184+
/// Sends a batch of requests to the client. The return vector holds the response
185+
/// for the request at the corresponding index. If no response was provided, it's [None].
138186
///
139187
/// Note that the requests need to have valid IDs, so it is advised to create the requests
140188
/// with [build_request].
141-
pub fn send_batch(&self, requests: &[Request]) -> Result<Vec<Option<Response>>, Error> {
189+
pub fn send_batch<'a, 'b>(
190+
&self,
191+
requests: &[Request<'a, 'b>],
192+
) -> Result<Vec<Option<Response>>, Error> {
142193
if requests.len() < 1 {
143194
return Err(Error::EmptyBatch);
144195
}
145196

146197
// If the request body is invalid JSON, the response is a single response object.
147198
// We ignore this case since we are confident we are producing valid JSON.
148199
let responses: Vec<Response> = self.send_raw(&requests)?;
149-
if responses.len() > requests.len() {
150-
return Err(Error::WrongBatchResponseSize);
151-
}
152200

153-
// To prevent having to clone responses, we first copy all the IDs so we can reference
154-
// them easily. IDs can only be of JSON type String or Number (or Null), so cloning
155-
// should be inexpensive and require no allocations as Numbers are more common.
156-
let ids: Vec<serde_json::Value> = responses.iter().map(|r| r.id.clone()).collect();
157-
// First index responses by ID and catch duplicate IDs.
158-
let mut resp_by_id = HashMap::new();
159-
for (id, resp) in ids.iter().zip(responses.into_iter()) {
160-
if let Some(dup) = resp_by_id.insert(HashableValue(&id), resp) {
161-
return Err(Error::BatchDuplicateResponseId(dup.id));
162-
}
163-
}
164-
// Match responses to the requests.
165-
let results =
166-
requests.into_iter().map(|r| resp_by_id.remove(&HashableValue(&r.id))).collect();
167-
168-
// Since we're also just producing the first duplicate ID, we can also just produce the
169-
// first incorrect ID in case there are multiple.
170-
if let Some(incorrect) = resp_by_id.into_iter().nth(0) {
171-
return Err(Error::WrongBatchResponseId(incorrect.1.id));
172-
}
201+
validate_batch_response(requests, responses)
202+
}
203+
}
173204

174-
Ok(results)
205+
#[cfg(feature = "async")]
206+
impl<Rt: AsyncHttpRoundTripper + 'static + Sync> Client<Rt> {
207+
/// Make a request and deserialize the response
208+
pub async fn do_rpc_async<T: for<'a> serde::de::Deserialize<'a>>(
209+
&self,
210+
rpc_name: &str,
211+
args: &[serde_json::value::Value],
212+
) -> Result<T, Error> {
213+
let request = self.build_request(rpc_name, args);
214+
let response = self.send_request_async(&request).await?;
215+
Ok(response.into_result()?)
175216
}
176217

177-
/// Builds a request
178-
pub fn build_request<'a, 'b>(
218+
/// The actual send logic used by both [send_request] and [send_batch].
219+
async fn send_raw_async<B, R>(&self, body: &B) -> Result<R, Error>
220+
where
221+
B: serde::ser::Serialize,
222+
R: for<'de> serde::de::Deserialize<'de>,
223+
{
224+
let http_request = self.build_http_request(body)?;
225+
226+
let http_response = self
227+
.roundtripper
228+
.request(http_request)
229+
.await
230+
.map_err(|e| Error::Http(Box::new(e)))?;
231+
232+
// nb we ignore stream.status since we expect the body
233+
// to contain information about any error
234+
Ok(serde_json::from_reader(http_response.into_body())?)
235+
}
236+
237+
/// Sends a request to a client
238+
pub async fn send_request_async<'a, 'b>(
179239
&self,
180-
name: &'a str,
181-
params: &'b [serde_json::Value],
182-
) -> Request<'a, 'b> {
183-
let mut nonce = self.nonce.lock().unwrap();
184-
*nonce += 1;
185-
Request {
186-
method: name,
187-
params: params,
188-
id: From::from(*nonce),
189-
jsonrpc: Some("2.0"),
240+
request: &Request<'a, 'b>,
241+
) -> Result<Response, Error> {
242+
let response: Response = self.send_raw_async(&request).await?;
243+
validate_response(request, response)
244+
}
245+
246+
/// Sends a batch of requests to the client. The return vector holds the response
247+
/// for the request at the corresponding index. If no response was provided, it's [None].
248+
///
249+
/// Note that the requests need to have valid IDs, so it is advised to create the requests
250+
/// with [build_request].
251+
pub async fn send_batch_async<'a, 'b>(
252+
&self,
253+
requests: &[Request<'a, 'b>],
254+
) -> Result<Vec<Option<Response>>, Error> {
255+
if requests.len() < 1 {
256+
return Err(Error::EmptyBatch);
257+
}
258+
259+
// If the request body is invalid JSON, the response is a single response object.
260+
// We ignore this case since we are confident we are producing valid JSON.
261+
let responses: Vec<Response> = self.send_raw_async(&requests).await?;
262+
263+
validate_batch_response(requests, responses)
264+
}
265+
}
266+
267+
fn validate_response<'a, 'b>(
268+
request: &Request<'a, 'b>,
269+
response: Response,
270+
) -> Result<Response, Error> {
271+
if response.jsonrpc != None && response.jsonrpc != Some(From::from("2.0")) {
272+
return Err(Error::VersionMismatch);
273+
}
274+
if response.id != request.id {
275+
return Err(Error::NonceMismatch);
276+
}
277+
Ok(response)
278+
}
279+
280+
fn validate_batch_response<'a, 'b>(
281+
requests: &[Request<'a, 'b>],
282+
responses: Vec<Response>,
283+
) -> Result<Vec<Option<Response>>, Error> {
284+
if responses.len() > requests.len() {
285+
return Err(Error::WrongBatchResponseSize);
286+
}
287+
288+
// To prevent having to clone responses, we first copy all the IDs so we can reference
289+
// them easily. IDs can only be of JSON type String or Number (or Null), so cloning
290+
// should be inexpensive and require no allocations as Numbers are more common.
291+
let ids: Vec<serde_json::Value> = responses.iter().map(|r| r.id.clone()).collect();
292+
// First index responses by ID and catch duplicate IDs.
293+
let mut resp_by_id = HashMap::new();
294+
for (id, resp) in ids.iter().zip(responses.into_iter()) {
295+
if let Some(dup) = resp_by_id.insert(HashableValue(&id), resp) {
296+
return Err(Error::BatchDuplicateResponseId(dup.id));
190297
}
191298
}
299+
// Match responses to the requests.
300+
let results = requests
301+
.into_iter()
302+
.map(|r| resp_by_id.remove(&HashableValue(&r.id)))
303+
.collect();
192304

193-
/// Accessor for the last-used nonce
194-
pub fn last_nonce(&self) -> u64 {
195-
*self.nonce.lock().unwrap()
305+
// Since we're also just producing the first duplicate ID, we can also just produce the
306+
// first incorrect ID in case there are multiple.
307+
if let Some(incorrect) = resp_by_id.into_iter().nth(0) {
308+
return Err(Error::WrongBatchResponseId(incorrect.1.id));
196309
}
310+
311+
Ok(results)
197312
}
198313

199314
#[cfg(test)]
@@ -208,7 +323,7 @@ mod tests {
208323

209324
fn request(
210325
&self,
211-
_: http::Request<&[u8]>,
326+
_: http::Request<Vec<u8>>,
212327
) -> Result<http::Response<Self::ResponseBody>, Self::Err> {
213328
Err(io::ErrorKind::Other.into())
214329
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::{error, fmt};
2121

2222
use serde_json;
2323

24-
use Response;
24+
use crate::Response;
2525

2626
/// A library error
2727
#[derive(Debug)]

0 commit comments

Comments
 (0)