Skip to content

Commit 3029309

Browse files
authored
Improve the API of the WebSocket Client (#2)
Still in progress, but introduces the basics of supporting typed messages. * Wip * Wip * Minor * Experiments with parsing * Wip * Wip * Keep the requests ids to properly parse responses * More cleanup * More cleanup
1 parent f7d0875 commit 3029309

File tree

5 files changed

+93
-25
lines changed

5 files changed

+93
-25
lines changed

xrpl_api/src/api/subscribe.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,14 @@ impl SubscribeRequest {
4646
}
4747
}
4848

49+
#[derive(Debug, Serialize, Deserialize)]
50+
pub struct LedgerClosedEvent {
51+
#[serde(rename = "type")]
52+
pub event_type: String,
53+
pub fee_base: u32,
54+
pub fee_ref: u32,
55+
pub txn_count: u32,
56+
}
57+
4958
#[derive(Debug, Deserialize)]
5059
pub struct SubscribeResponse {}

xrpl_sdk_ws/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ edition = "2021"
1212
thiserror = "1"
1313
serde = { version = "1", features = ["derive"]}
1414
serde_json = "1"
15+
futures = "0.3"
1516
futures-util = "0.3"
1617
tokio = { version = "1", features = ["full"] }
1718
tokio-stream = "0.1"

xrpl_sdk_ws/src/client.rs

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use crate::util::Result;
2+
use futures::{future, stream::SplitSink, StreamExt};
23
use futures_util::SinkExt;
3-
use serde::Serialize;
4+
use serde::{Deserialize, Serialize};
5+
use std::{cell::RefCell, collections::HashMap, pin::Pin, rc::Rc};
46
use tokio::net::TcpStream;
7+
use tokio_stream::Stream;
58
use tokio_tungstenite::{
69
connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
710
};
811
use uuid::Uuid;
9-
use xrpl_api::Request;
12+
use xrpl_api::{AccountInfoResponse, LedgerClosedEvent, Request};
1013

1114
// https://xrpl.org/public-servers.html
1215

@@ -21,15 +24,71 @@ pub const DEFAULT_WS_URL: &str = XRPL_CLUSTER_MAINNET_WS_URL;
2124

2225
// #TODO extract Connection
2326

27+
#[derive(Serialize, Deserialize, Debug)]
28+
pub enum TypedMessage {
29+
AccountInfo(AccountInfoResponse),
30+
LedgerClosed(LedgerClosedEvent),
31+
Other(String),
32+
}
33+
2434
/// A WebSocket client for the XRP Ledger.
2535
pub struct Client {
26-
pub stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
36+
sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
37+
requests: Rc<RefCell<HashMap<String, String>>>,
38+
pub messages: Pin<Box<dyn Stream<Item = Result<TypedMessage>>>>,
2739
}
2840

2941
impl Client {
3042
pub async fn connect(url: &str) -> Result<Self> {
3143
let (stream, _response) = connect_async(url).await?;
32-
Ok(Self { stream })
44+
let (sender, receiver) = stream.split();
45+
let requests: Rc<RefCell<HashMap<String, String>>> = Rc::new(RefCell::new(HashMap::new()));
46+
47+
let cloned_requests = requests.clone();
48+
let receiver = receiver
49+
.map(move |msg| {
50+
if let Message::Text(string) = msg.unwrap() {
51+
let mut value: serde_json::Value = serde_json::from_str(&string).unwrap();
52+
53+
if let Some(id) = value["id"].as_str() {
54+
// If the message contains an id field it's a response to
55+
// an RPC request.
56+
if let Some(method) = requests.borrow_mut().get(id) {
57+
let result = value["result"].take();
58+
match method.as_str() {
59+
"account_info" => Ok(Some(TypedMessage::AccountInfo(
60+
serde_json::from_value(result)?,
61+
))),
62+
_ => Ok(Some(TypedMessage::Other(string))),
63+
}
64+
} else {
65+
Ok(Some(TypedMessage::Other(string)))
66+
}
67+
} else {
68+
// If the message has no id field, it's a subscription event.
69+
70+
if let Some(event_type) = value["type"].as_str() {
71+
match event_type {
72+
"ledgerClosed" => Ok(Some(TypedMessage::LedgerClosed(
73+
serde_json::from_value(value)?,
74+
))),
75+
_ => Ok(Some(TypedMessage::Other(string))),
76+
}
77+
} else {
78+
Ok(Some(TypedMessage::Other(string)))
79+
}
80+
}
81+
} else {
82+
Ok(None)
83+
}
84+
})
85+
.filter_map(|res| future::ready(res.transpose()));
86+
87+
Ok(Self {
88+
sender,
89+
messages: Box::pin(receiver),
90+
requests: cloned_requests,
91+
})
3392
}
3493

3594
pub async fn call<Req>(&mut self, req: Req) -> Result<()>
@@ -43,14 +102,16 @@ impl Client {
43102
// #TODO, this is temp code, add error-handling!
44103

45104
if let serde_json::Value::Object(mut map) = msg {
46-
map.insert("id".to_owned(), serde_json::Value::String(id));
105+
map.insert("id".to_owned(), serde_json::Value::String(id.clone()));
47106
map.insert(
48107
"command".to_owned(),
49108
serde_json::Value::String(req.method()),
50109
);
51110
let msg = serde_json::to_string(&map).unwrap();
52111

53-
self.stream.send(Message::Text(msg.to_string())).await?;
112+
self.sender.send(Message::Text(msg.to_string())).await?;
113+
114+
self.requests.borrow_mut().insert(id, req.method());
54115
}
55116

56117
Ok(())

xrpl_sdk_ws/src/client_tests.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ mod tests {
1515

1616
client.call(req).await.expect("cannot send request");
1717

18-
let (_, rx) = client.stream.split();
19-
20-
tokio::pin!(rx);
21-
22-
while let Some(msg) = rx.next().await {
18+
if let Some(msg) = client.messages.next().await {
2319
dbg!(&msg);
2420
}
2521
}
@@ -31,14 +27,17 @@ mod tests {
3127
.expect("cannot connect");
3228

3329
let req = SubscribeRequest::streams(&["ledger"]);
34-
client.call(req).await.expect("cannot subscribe");
3530

36-
let (_, rx) = client.stream.split();
31+
client.call(req).await.expect("cannot subscribe");
3732

38-
tokio::pin!(rx);
33+
let mut count = 0;
3934

40-
while let Some(msg) = rx.next().await {
35+
while let Some(msg) = client.messages.next().await {
36+
if count > 2 {
37+
break;
38+
}
4139
dbg!(&msg);
40+
count += 1;
4241
}
4342
}
4443
}

xrpl_sdk_ws/src/error.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,18 @@ use thiserror::Error;
77
pub enum Error {
88
#[error("internal error: {0}")]
99
Internal(String),
10-
// #[error("failed request: {err}")]
11-
// FailedRequest { err: String, status: Option<u16> },
12-
// #[error("not authorized: missing api_credentials")]
13-
// Unauthorized,
14-
// #[error("api error: {0}")]
15-
// Api(String),
10+
#[error("malformed JSON payload: {0}")]
11+
MalformedJSON(String),
1612
}
1713

1814
impl From<tokio_tungstenite::tungstenite::Error> for Error {
1915
fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
2016
Self::Internal(e.to_string())
21-
// Self::Internal {
22-
// err: e.to_string(),
23-
// status: e.status().map(|c| c.as_u16()),
24-
// }
17+
}
18+
}
19+
20+
impl From<serde_json::Error> for Error {
21+
fn from(e: serde_json::Error) -> Self {
22+
Self::MalformedJSON(e.to_string())
2523
}
2624
}

0 commit comments

Comments
 (0)