Skip to content

Commit bfc9c40

Browse files
authored
Merge pull request #17 from rsocket/develop
Add some examples and support Unix Socket.
2 parents 7677730 + 9cf3f70 commit bfc9c40

File tree

24 files changed

+679
-35
lines changed

24 files changed

+679
-35
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
target/
22
**/*.rs.bk
33
Cargo.lock
4-
4+
.vscode/
55

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ members = [
55
"rsocket",
66
# transports
77
"rsocket-transport-tcp",
8+
"rsocket-transport-unix",
89
"rsocket-transport-websocket",
910
"rsocket-transport-wasm",
1011
# extra

examples/Cargo.toml

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,21 @@ log = "0.4.8"
1010
env_logger = "0.7.1"
1111
futures = "0.3.5"
1212
clap = "2.33.1"
13+
tokio-postgres = "0.5.4"
14+
redis = "0.16.0"
1315

1416
[dev-dependencies.rsocket_rust]
15-
version = "0.5.2"
17+
path = "../rsocket"
1618

1719
[dev-dependencies.rsocket_rust_transport_tcp]
18-
version = "0.5.2"
20+
path = "../rsocket-transport-tcp"
21+
22+
[dev-dependencies.rsocket_rust_transport_unix]
23+
path = "../rsocket-transport-unix"
1924

2025
[dev-dependencies.rsocket_rust_transport_websocket]
2126
version = "0.5.2"
27+
path = "../rsocket-transport-websocket"
2228

2329
[dev-dependencies.tokio]
2430
version = "0.2.21"
@@ -40,3 +46,19 @@ path = "cli.rs"
4046
[[example]]
4147
name = "qps"
4248
path = "qps.rs"
49+
50+
[[example]]
51+
name = "postgres"
52+
path = "postgres.rs"
53+
54+
[[example]]
55+
name = "redis"
56+
path = "redis.rs"
57+
58+
[[example]]
59+
name = "echo_uds"
60+
path = "echo_uds.rs"
61+
62+
[[example]]
63+
name = "echo_uds_client"
64+
path = "echo_uds_client.rs"

examples/docker-compose.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
version: '3.1'
2+
3+
services:
4+
postgres:
5+
image: postgres:9.6-alpine
6+
environment:
7+
POSTGRES_USER: postgres
8+
POSTGRES_PASSWORD: postgres
9+
ports:
10+
- 5432:5432
11+
volumes:
12+
- pgdata:/var/lib/postgresql/data
13+
redis:
14+
image: "redis:3-alpine"
15+
ports:
16+
- "6379:6379"
17+
command: ["redis-server", "--appendonly", "yes"]
18+
19+
volumes:
20+
pgdata:
21+
driver: local

examples/echo_uds.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use log::info;
2+
use rsocket_rust::prelude::{EchoRSocket, RSocketFactory, ServerResponder};
3+
use rsocket_rust_transport_unix::UnixServerTransport;
4+
use std::error::Error;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
8+
let transport: UnixServerTransport = UnixServerTransport::from("/tmp/rsocket-uds.sock");
9+
10+
let responder: ServerResponder = Box::new(|setup, _socket| {
11+
info!("accept setup: {:?}", setup);
12+
Ok(Box::new(EchoRSocket))
13+
// Or you can reject setup
14+
// Err(From::from("SETUP_NOT_ALLOW"))
15+
});
16+
17+
let on_start: Box<dyn FnMut() + Send + Sync> =
18+
Box::new(|| info!("+++++++ echo server started! +++++++"));
19+
20+
RSocketFactory::receive()
21+
.transport(transport)
22+
.acceptor(responder)
23+
.on_start(on_start)
24+
.serve()
25+
.await?;
26+
27+
Ok(())
28+
}

examples/echo_uds_client.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use log::info;
2+
use rsocket_rust::prelude::{ClientResponder, EchoRSocket, Payload, RSocket, RSocketFactory};
3+
use rsocket_rust_transport_unix::UnixClientTransport;
4+
use std::error::Error;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
8+
let responder: ClientResponder = Box::new(|| Box::new(EchoRSocket));
9+
10+
let client = RSocketFactory::connect()
11+
.acceptor(responder)
12+
.transport(UnixClientTransport::from("/tmp/rsocket-uds.sock"))
13+
.setup(Payload::from("READY!"))
14+
.mime_type("text/plain", "text/plain")
15+
.start()
16+
.await
17+
.unwrap();
18+
19+
let request_payload: Payload = Payload::builder()
20+
.set_data_utf8("Hello World!")
21+
.set_metadata_utf8("Rust")
22+
.build();
23+
24+
let res = client.request_response(request_payload).await.unwrap();
25+
26+
info!("got: {:?}", res);
27+
28+
client.close();
29+
30+
Ok(())
31+
}

examples/postgres.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#[macro_use]
2+
extern crate log;
3+
4+
use rsocket_rust::{error::RSocketError, prelude::*};
5+
use rsocket_rust_transport_tcp::TcpServerTransport;
6+
use std::error::Error;
7+
use std::sync::Arc;
8+
use tokio_postgres::{Client as PgClient, NoTls};
9+
10+
#[tokio::main]
11+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
12+
env_logger::builder().format_timestamp_millis().init();
13+
let dao = Dao::try_new().await?;
14+
RSocketFactory::receive()
15+
.acceptor(Box::new(move |_, _| Ok(Box::new(dao.clone()))))
16+
.on_start(Box::new(|| info!("server start success!!!")))
17+
.transport(TcpServerTransport::from("127.0.0.1:7878"))
18+
.serve()
19+
.await
20+
}
21+
22+
#[derive(Clone)]
23+
struct Dao {
24+
client: Arc<PgClient>,
25+
}
26+
27+
impl RSocket for Dao {
28+
fn request_response(&self, _: Payload) -> Mono<Result<Payload, RSocketError>> {
29+
let client = self.client.clone();
30+
Box::pin(async move {
31+
let row = client
32+
.query_one("SELECT 'world' AS hello", &[])
33+
.await
34+
.expect("Execute SQL failed!");
35+
let result: String = row.get("hello");
36+
Ok(Payload::builder().set_data_utf8(&result).build())
37+
})
38+
}
39+
40+
fn metadata_push(&self, _: Payload) -> Mono<()> {
41+
unimplemented!()
42+
}
43+
44+
fn fire_and_forget(&self, _: Payload) -> Mono<()> {
45+
unimplemented!()
46+
}
47+
48+
fn request_stream(&self, _: Payload) -> Flux<Result<Payload, RSocketError>> {
49+
unimplemented!()
50+
}
51+
52+
fn request_channel(
53+
&self,
54+
_: Flux<Result<Payload, RSocketError>>,
55+
) -> Flux<Result<Payload, RSocketError>> {
56+
unimplemented!()
57+
}
58+
}
59+
60+
impl Dao {
61+
async fn try_new() -> Result<Dao, Box<dyn Error + Sync + Send>> {
62+
let (client, connection) =
63+
tokio_postgres::connect("host=localhost user=postgres password=postgres", NoTls)
64+
.await?;
65+
66+
// The connection object performs the actual communication with the database,
67+
// so spawn it off to run on its own.
68+
tokio::spawn(async move {
69+
if let Err(e) = connection.await {
70+
eprintln!("connection error: {}", e);
71+
}
72+
});
73+
74+
info!("==> create postgres pool success!");
75+
Ok(Dao {
76+
client: Arc::new(client),
77+
})
78+
}
79+
}

examples/redis.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use redis::Client as RedisClient;
2+
use rsocket_rust::{error::RSocketError, prelude::*};
3+
use rsocket_rust_transport_tcp::TcpServerTransport;
4+
use std::error::Error;
5+
use std::str::FromStr;
6+
7+
#[derive(Clone)]
8+
struct RedisDao {
9+
inner: RedisClient,
10+
}
11+
12+
#[tokio::main]
13+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
14+
let dao = RedisDao::from_str("redis://127.0.0.1").expect("Connect redis failed!");
15+
RSocketFactory::receive()
16+
.acceptor(Box::new(move |_setup, _socket| Ok(Box::new(dao.clone()))))
17+
.transport(TcpServerTransport::from("127.0.0.1:7878"))
18+
.serve()
19+
.await
20+
}
21+
22+
impl FromStr for RedisDao {
23+
type Err = redis::RedisError;
24+
25+
fn from_str(s: &str) -> Result<Self, Self::Err> {
26+
let client = redis::Client::open(s)?;
27+
Ok(RedisDao { inner: client })
28+
}
29+
}
30+
31+
impl RSocket for RedisDao {
32+
fn request_response(&self, req: Payload) -> Mono<Result<Payload, RSocketError>> {
33+
let client = self.inner.clone();
34+
35+
Box::pin(async move {
36+
let mut conn: redis::aio::Connection = client
37+
.get_async_connection()
38+
.await
39+
.expect("Connect redis failed!");
40+
let value: String = redis::cmd("GET")
41+
.arg(&[req.data_utf8()])
42+
.query_async(&mut conn)
43+
.await
44+
.unwrap_or("<nil>".to_owned());
45+
Ok(Payload::builder().set_data_utf8(&value).build())
46+
})
47+
}
48+
49+
fn metadata_push(&self, _req: Payload) -> Mono<()> {
50+
unimplemented!()
51+
}
52+
fn fire_and_forget(&self, _req: Payload) -> Mono<()> {
53+
unimplemented!()
54+
}
55+
fn request_stream(&self, _req: Payload) -> Flux<Result<Payload, RSocketError>> {
56+
unimplemented!()
57+
}
58+
fn request_channel(
59+
&self,
60+
_reqs: Flux<Result<Payload, RSocketError>>,
61+
) -> Flux<Result<Payload, RSocketError>> {
62+
unimplemented!()
63+
}
64+
}

justfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
default:
2+
echo 'Hello, world!'
3+
test:
4+
@cargo test -- --nocapture

rsocket-benchmark/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@ env_logger = "0.7.1"
1212
bytes = "0.5.4"
1313
hex = "0.4.2"
1414
rand = "0.7.3"
15-
serde = "1.0.110"
16-
serde_derive = "1.0.110"
15+
serde = "1.0.111"
16+
serde_derive = "1.0.111"
1717
criterion = "0.3.2"
1818

1919
[dev-dependencies.rsocket_rust]
20-
version = "0.5.2"
20+
path = "../rsocket"
2121
features = ["frame"]
2222

2323
[dev-dependencies.rsocket_rust_transport_tcp]
24-
version = "0.5.2"
24+
path = "../rsocket-transport-tcp"
2525

2626
[dev-dependencies.rsocket_rust_transport_websocket]
27-
version = "0.5.2"
27+
path = "../rsocket-transport-websocket"
2828

2929
[dev-dependencies.rsocket_rust_messaging]
30-
version = "0.5.2"
30+
path = "../rsocket-messaging"
3131

3232
[dev-dependencies.tokio]
3333
version = "0.2.21"

0 commit comments

Comments
 (0)