Skip to content

Commit 7677730

Browse files
authored
Merge pull request #14 from rsocket/feature/messaging
Feature/messaging
2 parents 0cdf6d3 + da8bccc commit 7677730

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1905
-808
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1-
/target
1+
target/
22
**/*.rs.bk
33
Cargo.lock
4+
5+

Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
[workspace]
22

33
members = [
4+
# core
45
"rsocket",
6+
# transports
57
"rsocket-transport-tcp",
68
"rsocket-transport-websocket",
79
"rsocket-transport-wasm",
8-
9-
# Internal
10+
# extra
11+
"rsocket-messaging",
12+
# internal
1013
"examples",
1114
"rsocket-test",
15+
"rsocket-benchmark",
1216
]

examples/Cargo.toml

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ publish = false
88
[dev-dependencies]
99
log = "0.4.8"
1010
env_logger = "0.7.1"
11-
futures = "0.3.4"
12-
clap = "2.33.0"
11+
futures = "0.3.5"
12+
clap = "2.33.1"
1313

1414
[dev-dependencies.rsocket_rust]
15-
path = "../rsocket"
15+
version = "0.5.2"
1616

1717
[dev-dependencies.rsocket_rust_transport_tcp]
18-
path = "../rsocket-transport-tcp"
18+
version = "0.5.2"
1919

2020
[dev-dependencies.rsocket_rust_transport_websocket]
21-
path = "../rsocket-transport-websocket"
21+
version = "0.5.2"
2222

2323
[dev-dependencies.tokio]
24-
version = "0.2.16"
24+
version = "0.2.21"
2525
default-features = false
2626
features = ["full"]
2727

@@ -35,4 +35,8 @@ path = "proxy.rs"
3535

3636
[[example]]
3737
name = "cli"
38-
path = "cli.rs"
38+
path = "cli.rs"
39+
40+
[[example]]
41+
name = "qps"
42+
path = "qps.rs"

examples/qps.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#[macro_use]
2+
extern crate log;
3+
4+
use rsocket_rust::prelude::*;
5+
use rsocket_rust_transport_tcp::TcpClientTransport;
6+
use std::error::Error;
7+
use std::sync::{
8+
atomic::{AtomicU32, Ordering},
9+
Arc,
10+
};
11+
use std::time::SystemTime;
12+
use tokio::runtime::Runtime;
13+
use tokio::sync::Notify;
14+
15+
const TOTAL: u32 = 1_000_000;
16+
17+
fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
18+
env_logger::builder().format_timestamp_millis().init();
19+
20+
let mut rt = Runtime::new()?;
21+
let client = rt.block_on(async {
22+
RSocketFactory::connect()
23+
.transport(TcpClientTransport::from("127.0.0.1:7878"))
24+
.start()
25+
.await
26+
})?;
27+
// simulate 1KB payload.
28+
let req = Payload::builder()
29+
.set_data_utf8("X".repeat(1024).as_ref())
30+
.build();
31+
let counter = Arc::new(AtomicU32::new(0));
32+
let start_time = SystemTime::now();
33+
let notify = Arc::new(Notify::new());
34+
for _ in 0..TOTAL {
35+
let client = client.clone();
36+
let counter = counter.clone();
37+
let notify = notify.clone();
38+
let req = req.clone();
39+
rt.spawn(async move {
40+
client.request_response(req).await.expect("Request failed");
41+
let current = counter.fetch_add(1, Ordering::SeqCst) + 1;
42+
if current >= TOTAL {
43+
notify.notify();
44+
}
45+
});
46+
}
47+
rt.block_on(async move {
48+
notify.notified().await;
49+
});
50+
let costs = SystemTime::now()
51+
.duration_since(start_time)
52+
.unwrap()
53+
.as_millis();
54+
info!(
55+
"total={}, cost={}ms, qps={}",
56+
TOTAL,
57+
costs,
58+
1000f64 * (TOTAL as f64) / (costs as f64)
59+
);
60+
Ok(())
61+
}

rsocket-benchmark/Cargo.toml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
[package]
2+
name = "rsocket_rust_benchmark"
3+
version = "0.0.0"
4+
authors = ["Jeffsky <[email protected]>"]
5+
edition = "2018"
6+
publish = false
7+
8+
[dev-dependencies]
9+
log = "0.4"
10+
futures = "0.3.5"
11+
env_logger = "0.7.1"
12+
bytes = "0.5.4"
13+
hex = "0.4.2"
14+
rand = "0.7.3"
15+
serde = "1.0.110"
16+
serde_derive = "1.0.110"
17+
criterion = "0.3.2"
18+
19+
[dev-dependencies.rsocket_rust]
20+
version = "0.5.2"
21+
features = ["frame"]
22+
23+
[dev-dependencies.rsocket_rust_transport_tcp]
24+
version = "0.5.2"
25+
26+
[dev-dependencies.rsocket_rust_transport_websocket]
27+
version = "0.5.2"
28+
29+
[dev-dependencies.rsocket_rust_messaging]
30+
version = "0.5.2"
31+
32+
[dev-dependencies.tokio]
33+
version = "0.2.21"
34+
default-features = false
35+
features = ["full"]
36+
37+
[[bench]]
38+
name = "bench_main"
39+
path = "benchmark_main.rs"
40+
harness = false

rsocket-benchmark/benchmark_main.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use criterion::criterion_main;
2+
3+
mod benchmarks;
4+
5+
criterion_main! {
6+
benchmarks::fibonaccis::benches,
7+
benchmarks::frames::benches,
8+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use criterion::{black_box, criterion_group, Criterion};
2+
3+
fn fibonacci(n: u64) -> u64 {
4+
match n {
5+
0 => 1,
6+
1 => 1,
7+
n => fibonacci(n - 1) + fibonacci(n - 2),
8+
}
9+
}
10+
11+
fn fibonaccis_benchmark(c: &mut Criterion) {
12+
c.bench_function("fib 20", |b| b.iter(|| fibonacci(black_box(20))));
13+
}
14+
15+
criterion_group!(benches, fibonaccis_benchmark);
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use bytes::{Bytes, BytesMut};
2+
use criterion::{criterion_group, Criterion};
3+
use rsocket_rust::frame::*;
4+
use rsocket_rust::utils::Writeable;
5+
6+
fn bench_unmarshal_request_response(c: &mut Criterion) {
7+
c.bench_function("unmarshal request_response", |b| {
8+
b.iter(|| {
9+
let f = RequestResponse::builder(1234, 0)
10+
.set_data(Bytes::from("Hello World"))
11+
.set_metadata(Bytes::from("Foobar"))
12+
.build();
13+
let mut bf = BytesMut::with_capacity(f.len() as usize);
14+
f.write_to(&mut bf);
15+
})
16+
});
17+
}
18+
19+
criterion_group!(benches, bench_unmarshal_request_response);

rsocket-benchmark/benchmarks/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod fibonaccis;
2+
pub mod frames;

rsocket-messaging/Cargo.toml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
[package]
2+
name = "rsocket_rust_messaging"
3+
version = "0.5.2"
4+
authors = ["Jeffsky <[email protected]>"]
5+
edition = "2018"
6+
license = "Apache-2.0"
7+
readme = "README.md"
8+
repository = "https://github.com/rsocket/rsocket-rust"
9+
homepage = "https://github.com/rsocket/rsocket-rust"
10+
description = "Communicate with Spring RSocket Messaging."
11+
12+
[dependencies]
13+
futures = "0.3.5"
14+
bytes = "0.5.4"
15+
serde = "1.0.110"
16+
serde_json = "1.0.53"
17+
serde_cbor = "0.11.1"
18+
hex = "0.4.2"
19+
url = "2.1.1"
20+
21+
[dependencies.rsocket_rust]
22+
version = "0.5.2"
23+
features = ["frame"]
24+
25+
[dependencies.rsocket_rust_transport_tcp]
26+
version = "0.5.2"
27+
28+
[dependencies.rsocket_rust_transport_websocket]
29+
version = "0.5.2"

0 commit comments

Comments
 (0)