Skip to content

Commit 233aea7

Browse files
committed
async thread safe
Signed-off-by: Sam Batschelet <[email protected]>
1 parent a707fbd commit 233aea7

File tree

6 files changed

+78
-53
lines changed

6 files changed

+78
-53
lines changed

spaces-cli/src/bin/spaces-cli/main.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,22 @@ async fn main() -> Result<(), Box<dyn error::Error>> {
4949

5050
let private_key = get_or_create_pk(&cli.private_key_file)?;
5151
let uri = cli.endpoint.parse::<Uri>()?;
52-
let mut client = Client::new(uri).set_private_key(private_key);
52+
let client = Client::new(uri);
53+
client.set_private_key(private_key).await;
5354

5455
if let Command::Get { space, key } = &cli.command {
55-
let resp =
56-
futures::executor::block_on(client.resolve(space, key)).map_err(|e| e.to_string())?;
56+
let resp = client
57+
.resolve(space, key)
58+
.await
59+
.map_err(|e| e.to_string())?;
5760
log::debug!("resolve response: {:?}", resp);
5861

5962
println!("{}", serde_json::to_string(&resp)?);
6063
return Ok(());
6164
}
6265

6366
if let Command::Ping {} = &cli.command {
64-
let resp = futures::executor::block_on(client.ping()).map_err(|e| e.to_string())?;
67+
let resp = client.ping().await.map_err(|e| e.to_string())?;
6568

6669
println!("{}", serde_json::to_string(&resp)?);
6770
return Ok(());
@@ -74,8 +77,10 @@ async fn main() -> Result<(), Box<dyn error::Error>> {
7477
let typed_data = &resp.typed_data;
7578

7679
// issue tx
77-
let resp =
78-
futures::executor::block_on(client.issue_tx(typed_data)).map_err(|e| e.to_string())?;
80+
let resp = client
81+
.issue_tx(typed_data)
82+
.await
83+
.map_err(|e| e.to_string())?;
7984
println!("{}", serde_json::to_string(&resp)?);
8085

8186
Ok(())

spacesvm/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ name = "spacesvm"
1515
path = "src/bin/spaces/main.rs"
1616

1717
[dependencies]
18-
avalanche-types = { version = "0.0.140", features = ["subnet"] }
18+
avalanche-types = { version = "0.0.143", features = ["subnet"] }
1919
byteorder = "1.4.3"
2020
chrono = "0.4.22"
2121
crossbeam-channel = "0.5.6"

spacesvm/src/api/client.rs

+53-34
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{
22
fs::File,
33
io::{Error, ErrorKind, Result, Write},
44
path::Path,
5+
sync::Arc,
56
};
67

78
use crate::{
@@ -25,9 +26,14 @@ use jsonrpc_core::{Call, Id, MethodCall, Params, Value, Version};
2526
use serde::de;
2627

2728
pub use http::Uri;
29+
use tokio::sync::RwLock;
2830

29-
/// HTTP client for interacting with the API, assumes single threaded use.
31+
/// HTTP client for interacting with the API.
3032
pub struct Client<C> {
33+
inner: Arc<RwLock<ClientInner<C>>>,
34+
}
35+
36+
pub struct ClientInner<C> {
3137
id: u64,
3238
client: HyperClient<C>,
3339
endpoint: Uri,
@@ -38,33 +44,37 @@ impl Client<HttpConnector> {
3844
pub fn new(endpoint: Uri) -> Self {
3945
let client = HyperClient::new();
4046
Self {
41-
id: 0,
42-
client,
43-
endpoint,
44-
private_key: None,
47+
inner: Arc::new(RwLock::new(ClientInner {
48+
id: 0,
49+
client,
50+
endpoint,
51+
private_key: None,
52+
})),
4553
}
4654
}
4755
}
4856

4957
impl Client<HttpConnector> {
50-
fn next_id(&mut self) -> Id {
51-
let id = self.id;
52-
self.id = id + 1;
58+
async fn next_id(&self) -> Id {
59+
let mut client = self.inner.write().await;
60+
let id = client.id;
61+
client.id = id + 1;
5362
Id::Num(id)
5463
}
5564

56-
pub fn set_endpoint(&mut self, endpoint: Uri) {
57-
self.endpoint = endpoint;
65+
pub async fn set_endpoint(&self, endpoint: Uri) {
66+
let mut inner = self.inner.write().await;
67+
inner.endpoint = endpoint;
5868
}
5969

60-
pub fn set_private_key(mut self, private_key: Key) -> Self {
61-
self.private_key = Some(private_key);
62-
self
70+
pub async fn set_private_key(&self, private_key: Key) {
71+
let mut inner = self.inner.write().await;
72+
inner.private_key = Some(private_key);
6373
}
6474

6575
/// Returns a serialized json request as string and the request id.
66-
pub fn raw_request(&mut self, method: &str, params: &Params) -> (Id, String) {
67-
let id = self.next_id();
76+
pub async fn raw_request(&self, method: &str, params: &Params) -> (Id, String) {
77+
let id = self.next_id().await;
6878
let request = jsonrpc_core::Request::Single(Call::MethodCall(MethodCall {
6979
jsonrpc: Some(Version::V2),
7080
method: method.to_owned(),
@@ -77,64 +87,73 @@ impl Client<HttpConnector> {
7787
)
7888
}
7989

80-
/// Returns a recoverable signature from bytes.
81-
pub fn sign_digest(&self, dh: &[u8]) -> Result<Sig> {
82-
if let Some(pk) = &self.private_key {
90+
/// Returns a recoverable signature from 32 byte SHA256 message.
91+
pub async fn sign_digest(&self, dh: &[u8]) -> Result<Sig> {
92+
let inner = self.inner.read().await;
93+
if let Some(pk) = &inner.private_key {
8394
return pk.sign_digest(dh);
8495
}
8596
Err(Error::new(ErrorKind::Other, "private key not set"))
8697
}
8798

8899
/// Returns a PingResponse from client request.
89-
pub async fn ping(&mut self) -> Result<PingResponse> {
90-
let (_id, json_request) = self.raw_request("ping", &Params::None);
100+
pub async fn ping(&self) -> Result<PingResponse> {
101+
let (_id, json_request) = self.raw_request("ping", &Params::None).await;
91102
let resp = self.post_de::<PingResponse>(&json_request).await?;
92103

93104
Ok(resp)
94105
}
95106

96107
/// Returns a DecodeTxResponse from client request.
97-
pub async fn decode_tx(&mut self, tx_data: TransactionData) -> Result<DecodeTxResponse> {
108+
pub async fn decode_tx(&self, tx_data: TransactionData) -> Result<DecodeTxResponse> {
98109
let arg_value = serde_json::to_value(&DecodeTxArgs { tx_data })?;
99-
let (_id, json_request) = self.raw_request("decodeTx", &Params::Array(vec![arg_value]));
110+
let (_id, json_request) = self
111+
.raw_request("decodeTx", &Params::Array(vec![arg_value]))
112+
.await;
100113
let resp = self.post_de::<DecodeTxResponse>(&json_request).await?;
101114

102115
Ok(resp)
103116
}
104117

105118
/// Returns a IssueTxResponse from client request.
106-
pub async fn issue_tx(&mut self, typed_data: &TypedData) -> Result<IssueTxResponse> {
119+
pub async fn issue_tx(&self, typed_data: &TypedData) -> Result<IssueTxResponse> {
107120
let dh = decoder::hash_structured_data(typed_data)?;
108-
let sig = self.sign_digest(&dh.as_bytes())?.to_bytes().to_vec();
121+
let sig = self.sign_digest(&dh.as_bytes()).await?.to_bytes().to_vec();
109122
log::debug!("signature: {:?}", sig);
110123

111124
let arg_value = serde_json::to_value(&IssueTxArgs {
112125
typed_data: typed_data.to_owned(),
113126
signature: sig,
114127
})?;
115-
let (_id, json_request) = self.raw_request("issueTx", &Params::Array(vec![arg_value]));
128+
let (_id, json_request) = self
129+
.raw_request("issueTx", &Params::Array(vec![arg_value]))
130+
.await;
116131
let resp = self.post_de::<IssueTxResponse>(&json_request).await?;
117132

118133
Ok(resp)
119134
}
120135

121136
/// Returns a ResolveResponse from client request.
122-
pub async fn resolve(&mut self, space: &str, key: &str) -> Result<ResolveResponse> {
137+
pub async fn resolve(&self, space: &str, key: &str) -> Result<ResolveResponse> {
123138
let arg_value = serde_json::to_value(&ResolveArgs {
124139
space: space.as_bytes().to_vec(),
125140
key: key.as_bytes().to_vec(),
126141
})?;
127-
let (_id, json_request) = self.raw_request("resolve", &Params::Array(vec![arg_value]));
142+
let (_id, json_request) = self
143+
.raw_request("resolve", &Params::Array(vec![arg_value]))
144+
.await;
128145
let resp = self.post_de::<ResolveResponse>(&json_request).await?;
129146

130147
Ok(resp)
131148
}
132149

133150
/// Returns a deserialized response from client request.
134151
pub async fn post_de<T: de::DeserializeOwned>(&self, json: &str) -> Result<T> {
152+
let inner = self.inner.read().await;
153+
135154
let req = Request::builder()
136155
.method(Method::POST)
137-
.uri(self.endpoint.to_string())
156+
.uri(inner.endpoint.to_string())
138157
.header("content-type", "application/json-rpc")
139158
.body(Body::from(json.to_owned()))
140159
.map_err(|e| {
@@ -144,7 +163,7 @@ impl Client<HttpConnector> {
144163
)
145164
})?;
146165

147-
let mut resp = self.client.request(req).await.map_err(|e| {
166+
let mut resp = inner.client.request(req).await.map_err(|e| {
148167
std::io::Error::new(
149168
std::io::ErrorKind::Other,
150169
format!("client post request failed: {}", e),
@@ -219,12 +238,12 @@ pub fn get_or_create_pk(path: &str) -> Result<key::secp256k1::private_key::Key>
219238
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
220239
}
221240

222-
#[test]
223-
fn test_raw_request() {
224-
let mut cli = Client::new(Uri::from_static("http://test.url"));
225-
let (id, _) = cli.raw_request("ping", &Params::None);
241+
#[tokio::test]
242+
async fn test_raw_request() {
243+
let cli = Client::new(Uri::from_static("http://test.url"));
244+
let (id, _) = cli.raw_request("ping", &Params::None).await;
226245
assert_eq!(id, jsonrpc_core::Id::Num(0));
227-
let (id, req) = cli.raw_request("ping", &Params::None);
246+
let (id, req) = cli.raw_request("ping", &Params::None).await;
228247
assert_eq!(id, jsonrpc_core::Id::Num(1));
229248
assert_eq!(
230249
req,

spacesvm/src/bin/spaces/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async fn main() -> Result<()> {
4545
) = tokio::sync::broadcast::channel(1);
4646

4747
info!("starting spacesvm-rs");
48-
let vm_server = subnet::rpc::vm::server::Server::new(Box::new(vm::ChainVm::new()), stop_ch_tx);
48+
let vm_server = subnet::rpc::vm::server::Server::new(vm::ChainVm::new(), stop_ch_tx);
4949

5050
subnet::rpc::plugin::serve(vm_server, stop_ch_rx)
5151
.await

spacesvm/tests/vm/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@ async fn test_api() {
2626

2727
// setup stop channel for grpc services.
2828
let (stop_ch_tx, stop_ch_rx): (Sender<()>, Receiver<()>) = tokio::sync::broadcast::channel(1);
29-
let vm_server = avalanche_types::subnet::rpc::vm::server::Server::new(
30-
Box::new(vm::ChainVm::new()),
31-
stop_ch_tx,
32-
);
29+
let vm_server =
30+
avalanche_types::subnet::rpc::vm::server::Server::new(Box::new(vm::ChainVm::new()), stop_ch_tx);
3331

3432
// start Vm service
3533
let vm_addr = utils::new_socket_addr();
@@ -115,7 +113,7 @@ async fn test_api() {
115113
let mut client = spacesvm::api::client::Client::new(http::Uri::from_static("http://test.url"));
116114

117115
// ping
118-
let (_id, json_str) = client.raw_request("ping", &Params::None);
116+
let (_id, json_str) = client.raw_request("ping", &Params::None).await;
119117
let req = http::request::Builder::new()
120118
.body(json_str.as_bytes().to_vec())
121119
.unwrap();
@@ -141,7 +139,9 @@ async fn test_api() {
141139
let tx_data = claim_tx("test_claim");
142140
let arg_value = serde_json::to_value(&DecodeTxArgs { tx_data }).unwrap();
143141

144-
let (_id, json_str) = client.raw_request("decodeTx", &Params::Array(vec![arg_value]));
142+
let (_id, json_str) = client
143+
.raw_request("decodeTx", &Params::Array(vec![arg_value]))
144+
.await;
145145
log::info!("decodeTx request: {}", json_str);
146146
let req = http::request::Builder::new()
147147
.body(json_str.as_bytes().to_vec())

tests/e2e/src/tests/mod.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -194,22 +194,23 @@ async fn e2e() {
194194

195195
let private_key = get_or_create_pk("/tmp/.spacesvm-cli-pk").expect("generate new private key");
196196
let chain_url = format!("{}/ext/bc/{}/public", rpc_eps[0], blockchain_id);
197-
let mut scli =
198-
spacesvm::api::client::Client::new(chain_url.parse::<Uri>().expect("valid endpoint"))
199-
.set_private_key(private_key);
197+
let scli =
198+
spacesvm::api::client::Client::new(chain_url.parse::<Uri>().expect("valid endpoint"));
199+
scli.set_private_key(private_key).await;
200200
for ep in rpc_eps.iter() {
201201
let chain_url = format!("{}/ext/bc/{}/public", ep, blockchain_id)
202202
.parse::<Uri>()
203203
.expect("valid endpoint");
204-
scli.set_endpoint(chain_url);
204+
scli.set_endpoint(chain_url).await;
205205
let resp = scli.ping().await.unwrap();
206206
log::info!("ping response from {}: {:?}", ep, resp);
207207
assert!(resp.success);
208208

209209
thread::sleep(Duration::from_millis(300));
210210
}
211211

212-
scli.set_endpoint(chain_url.parse::<Uri>().expect("valid endpoint"));
212+
scli.set_endpoint(chain_url.parse::<Uri>().expect("valid endpoint"))
213+
.await;
213214

214215
log::info!("decode claim tx request...");
215216
let resp = scli

0 commit comments

Comments
 (0)