Skip to content

Commit d90ef92

Browse files
committed
1. Optimize some codes.
2. Add qps benchmark.
1 parent 0ebbc48 commit d90ef92

File tree

27 files changed

+162
-112
lines changed

27 files changed

+162
-112
lines changed

examples/Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ publish = false
99
log = "0.4.8"
1010
env_logger = "0.7.1"
1111
futures = "0.3.5"
12-
clap = "2.33.0"
12+
clap = "2.33.1"
1313

1414
[dev-dependencies.rsocket_rust]
1515
path = "../rsocket"
@@ -21,7 +21,7 @@ path = "../rsocket-transport-tcp"
2121
path = "../rsocket-transport-websocket"
2222

2323
[dev-dependencies.tokio]
24-
version = "0.2.20"
24+
version = "0.2.21"
2525
default-features = false
2626
features = ["full"]
2727

@@ -35,4 +35,8 @@ path = "proxy.rs"
3535

3636
[[example]]
3737
name = "cli"
38-
path = "cli.rs"
38+
path = "cli.rs"
39+
40+
[[example]]
41+
name = "qps"
42+
path = "qps.rs"

examples/qps.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#[macro_use]
2+
extern crate log;
3+
4+
use rsocket_rust::prelude::*;
5+
use rsocket_rust_transport_tcp::TcpClientTransport;
6+
use std::error::Error;
7+
use std::sync::{
8+
atomic::{AtomicU32, Ordering},
9+
Arc,
10+
};
11+
use std::time::SystemTime;
12+
use tokio::runtime::Runtime;
13+
use tokio::sync::Notify;
14+
15+
const TOTAL: u32 = 1_000_000;
16+
17+
fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
18+
env_logger::builder().format_timestamp_millis().init();
19+
20+
let mut rt = Runtime::new()?;
21+
let client = rt.block_on(async {
22+
RSocketFactory::connect()
23+
.transport(TcpClientTransport::from("127.0.0.1:7878"))
24+
.start()
25+
.await
26+
})?;
27+
// simulate 1KB payload.
28+
let req = Payload::builder()
29+
.set_data_utf8("X".repeat(1024).as_ref())
30+
.build();
31+
let counter = Arc::new(AtomicU32::new(0));
32+
let start_time = SystemTime::now();
33+
let notify = Arc::new(Notify::new());
34+
for _ in 0..TOTAL {
35+
let client = client.clone();
36+
let counter = counter.clone();
37+
let notify = notify.clone();
38+
let req = req.clone();
39+
rt.spawn(async move {
40+
client.request_response(req).await.expect("Request failed");
41+
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
42+
if current >= TOTAL {
43+
notify.notify();
44+
}
45+
});
46+
}
47+
rt.block_on(async move {
48+
notify.notified().await;
49+
});
50+
let costs = SystemTime::now()
51+
.duration_since(start_time)
52+
.unwrap()
53+
.as_millis();
54+
info!(
55+
"total={}, cost={}ms, qps={}",
56+
TOTAL,
57+
costs,
58+
1000f64 * (TOTAL as f64) / (costs as f64)
59+
);
60+
Ok(())
61+
}

rsocket-benchmark/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ path = "../rsocket-transport-websocket"
3030
path = "../rsocket-messaging"
3131

3232
[dev-dependencies.tokio]
33-
version = "0.2.20"
33+
version = "0.2.21"
3434
default-features = false
3535
features = ["full"]
3636

rsocket-messaging/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rsocket_rust_messaging"
3-
version = "0.1.0"
3+
version = "0.5.2"
44
authors = ["Jeffsky <[email protected]>"]
55
edition = "2018"
66

rsocket-test/tests/test_clients.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ async fn test_request_response_err() {
146146

147147
async fn exec_request_response<R>(socket: &Client<R>)
148148
where
149-
R: Send + Sync + Clone + Spawner + 'static,
149+
R: Send + Sync + Copy + Spawner + 'static,
150150
{
151151
// request response
152152
let sending = Payload::builder()
@@ -159,7 +159,7 @@ where
159159

160160
async fn exec_metadata_push<R>(socket: &Client<R>)
161161
where
162-
R: Send + Sync + Clone + Spawner + 'static,
162+
R: Send + Sync + Copy + Spawner + 'static,
163163
{
164164
let pa = Payload::builder().set_metadata_utf8("Hello World!").build();
165165
// metadata push
@@ -168,7 +168,7 @@ where
168168

169169
async fn exec_fire_and_forget<R>(socket: &Client<R>)
170170
where
171-
R: Send + Sync + Clone + Spawner + 'static,
171+
R: Send + Sync + Copy + Spawner + 'static,
172172
{
173173
// request fnf
174174
let fnf = Payload::from("Hello World!");
@@ -177,7 +177,7 @@ where
177177

178178
async fn exec_request_stream<R>(socket: &Client<R>)
179179
where
180-
R: Send + Sync + Clone + Spawner + 'static,
180+
R: Send + Sync + Copy + Spawner + 'static,
181181
{
182182
// request stream
183183
let sending = Payload::builder()
@@ -197,7 +197,7 @@ where
197197

198198
async fn exec_request_channel<R>(socket: &Client<R>)
199199
where
200-
R: Send + Sync + Clone + Spawner + 'static,
200+
R: Send + Sync + Copy + Spawner + 'static,
201201
{
202202
let sends: Vec<_> = (0..10)
203203
.map(|n| {

rsocket-transport-tcp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ path = "../rsocket"
1919
features = ["frame"]
2020

2121
[dependencies.tokio]
22-
version = "0.2.20"
22+
version = "0.2.21"
2323
default-features = false
2424
features = [ "rt-core", "rt-threaded", "tcp", "sync", "stream" ]
2525

rsocket-transport-tcp/src/client.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use rsocket_rust::error::RSocketError;
44
use rsocket_rust::frame::Frame;
55
use rsocket_rust::runtime::{DefaultSpawner, Spawner};
66
use rsocket_rust::transport::{ClientTransport, Rx, Tx, TxOnce};
7-
use std::net::SocketAddr;
8-
use std::net::TcpStream as StdTcpStream;
7+
use std::net::{AddrParseError, SocketAddr, TcpStream as StdTcpStream};
8+
use std::str::FromStr;
99
use tokio::net::TcpStream;
1010
use tokio_util::codec::Framed;
1111

@@ -75,6 +75,19 @@ impl ClientTransport for TcpClientTransport {
7575
}
7676
}
7777

78+
impl FromStr for TcpClientTransport {
79+
type Err = AddrParseError;
80+
81+
fn from_str(addr: &str) -> Result<Self, Self::Err> {
82+
let socket_addr = if addr.starts_with("tcp://") || addr.starts_with("TCP://") {
83+
addr.chars().skip(6).collect::<String>().parse()?
84+
} else {
85+
addr.parse()?
86+
};
87+
Ok(TcpClientTransport::new(Connector::Lazy(socket_addr)))
88+
}
89+
}
90+
7891
impl From<SocketAddr> for TcpClientTransport {
7992
fn from(addr: SocketAddr) -> TcpClientTransport {
8093
TcpClientTransport::new(Connector::Lazy(addr))
@@ -83,12 +96,12 @@ impl From<SocketAddr> for TcpClientTransport {
8396

8497
impl From<&str> for TcpClientTransport {
8598
fn from(addr: &str) -> TcpClientTransport {
86-
let socket_addr: SocketAddr = if addr.starts_with("tcp://") {
87-
let ss: String = addr.chars().skip(6).collect();
88-
ss.parse().unwrap()
99+
let socket_addr: SocketAddr = if addr.starts_with("tcp://") || addr.starts_with("TCP://") {
100+
addr.chars().skip(6).collect::<String>().parse()
89101
} else {
90-
addr.parse().unwrap()
91-
};
102+
addr.parse()
103+
}
104+
.expect("Invalid transport string!");
92105
TcpClientTransport::new(Connector::Lazy(socket_addr))
93106
}
94107
}

rsocket-transport-wasm/src/runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use rsocket_rust::runtime::Spawner;
22
use std::future::Future;
33
use wasm_bindgen_futures::spawn_local;
44

5-
#[derive(Clone)]
5+
#[derive(Clone, Copy, Debug)]
66
pub struct WASMSpawner;
77

88
impl Spawner for WASMSpawner {

rsocket-transport-websocket/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ path = "../rsocket"
2121
features = ["frame"]
2222

2323
[dependencies.tokio]
24-
version = "0.2.20"
24+
version = "0.2.21"
2525
default-features = false
2626
features = [ "rt-core", "rt-threaded", "tcp", "sync", "stream" ]

rsocket/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ futures = "0.3.5"
1616
lazy_static = "1.4.0"
1717

1818
[dependencies.tokio]
19-
version = "0.2.20"
19+
version = "0.2.21"
2020
default-features = false
2121
features = [ "rt-core", "rt-threaded", "sync", "stream" ]
2222

0 commit comments

Comments
 (0)