Skip to content

Commit 39b7b33

Browse files
authored
Merge pull request #16 from seal90/develop
support for unix domain socket
2 parents 80e2d16 + d609c6d commit 39b7b33

File tree

13 files changed

+473
-1
lines changed

13 files changed

+473
-1
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
target/
22
**/*.rs.bk
33
Cargo.lock
4-
4+
.vscode/
55

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ members = [
55
"rsocket",
66
# transports
77
"rsocket-transport-tcp",
8+
"rsocket-transport-unix",
89
"rsocket-transport-websocket",
910
"rsocket-transport-wasm",
1011
# extra

examples/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ path = "../rsocket"
1919
[dev-dependencies.rsocket_rust_transport_tcp]
2020
path = "../rsocket-transport-tcp"
2121

22+
[dev-dependencies.rsocket_rust_transport_unix]
23+
path = "../rsocket-transport-unix"
24+
2225
[dev-dependencies.rsocket_rust_transport_websocket]
2326
version = "0.5.2"
2427
path = "../rsocket-transport-websocket"
@@ -51,3 +54,11 @@ path = "postgres.rs"
5154
[[example]]
5255
name = "redis"
5356
path = "redis.rs"
57+
58+
[[example]]
59+
name = "echo-uds"
60+
path = "echo-uds.rs"
61+
62+
[[example]]
63+
name = "echo-uds-client"
64+
path = "echo-uds-client.rs"

examples/echo-uds-client.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use log::info;
2+
use rsocket_rust::prelude::{ClientResponder, EchoRSocket, Payload, RSocket, RSocketFactory};
3+
use rsocket_rust_transport_unix::UnixClientTransport;
4+
use std::error::Error;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
8+
9+
let responder: ClientResponder = Box::new(|| Box::new(EchoRSocket));
10+
11+
let client = RSocketFactory::connect()
12+
.acceptor(responder)
13+
.transport(UnixClientTransport::from("/tmp/rsocket-uds.sock"))
14+
.setup(Payload::from("READY!"))
15+
.mime_type("text/plain", "text/plain")
16+
.start()
17+
.await
18+
.unwrap();
19+
20+
let request_payload: Payload = Payload::builder()
21+
.set_data_utf8("Hello World!")
22+
.set_metadata_utf8("Rust")
23+
.build();
24+
25+
let res = client.request_response(request_payload).await.unwrap();
26+
27+
info!("got: {:?}", res);
28+
29+
client.close();
30+
31+
Ok(())
32+
}

examples/echo-uds.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use log::info;
2+
use rsocket_rust::prelude::{EchoRSocket, RSocketFactory, ServerResponder};
3+
use rsocket_rust_transport_unix::UnixServerTransport;
4+
use std::error::Error;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
8+
9+
let transport: UnixServerTransport = UnixServerTransport::from("/tmp/rsocket-uds.sock");
10+
11+
let responder: ServerResponder = Box::new(|setup, _socket| {
12+
info!("accept setup: {:?}", setup);
13+
Ok(Box::new(EchoRSocket))
14+
// Or you can reject setup
15+
// Err(From::from("SETUP_NOT_ALLOW"))
16+
});
17+
18+
let on_start: Box<dyn FnMut() + Send + Sync> =
19+
Box::new(|| info!("+++++++ echo server started! +++++++"));
20+
21+
RSocketFactory::receive()
22+
.transport(transport)
23+
.acceptor(responder)
24+
.on_start(on_start)
25+
.serve()
26+
.await?;
27+
28+
Ok(())
29+
}

rsocket-test/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ features = ["frame"]
2222
[dev-dependencies.rsocket_rust_transport_tcp]
2323
path = "../rsocket-transport-tcp"
2424

25+
[dev-dependencies.rsocket_rust_transport_unix]
26+
path = "../rsocket-transport-unix"
27+
2528
[dev-dependencies.rsocket_rust_transport_websocket]
2629
path = "../rsocket-transport-websocket"
2730

rsocket-test/tests/test_clients.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ extern crate log;
44
use futures::stream;
55
use rsocket_rust::prelude::*;
66
use rsocket_rust_transport_tcp::{TcpClientTransport, TcpServerTransport};
7+
use rsocket_rust_transport_unix::{UnixClientTransport, UnixServerTransport};
78
use rsocket_rust_transport_websocket::{WebsocketClientTransport, WebsocketServerTransport};
89
use std::thread::sleep;
910
use std::time::Duration;
@@ -120,6 +121,52 @@ fn test_tcp() {
120121
});
121122
}
122123

124+
#[test]
125+
fn test_unix() {
126+
init();
127+
128+
let addr = "/tmp/rsocket-uds.sock";
129+
130+
let server_runtime = Runtime::new().unwrap();
131+
132+
// spawn a server
133+
server_runtime.spawn(async move {
134+
RSocketFactory::receive()
135+
.transport(UnixServerTransport::from(addr))
136+
.acceptor(Box::new(|setup, _socket| {
137+
info!("accept setup: {:?}", setup);
138+
Ok(Box::new(EchoRSocket))
139+
}))
140+
.on_start(Box::new(|| {
141+
info!("+++++++ unix echo server started! +++++++")
142+
}))
143+
.serve()
144+
.await
145+
});
146+
147+
sleep(Duration::from_millis(500));
148+
149+
let mut client_runtime = Runtime::new().unwrap();
150+
151+
client_runtime.block_on(async {
152+
let cli = RSocketFactory::connect()
153+
.acceptor(Box::new(|| Box::new(EchoRSocket)))
154+
.transport(UnixClientTransport::from(addr))
155+
.setup(Payload::from("READY!"))
156+
.mime_type("text/plain", "text/plain")
157+
.start()
158+
.await
159+
.unwrap();
160+
161+
exec_metadata_push(&cli).await;
162+
exec_fire_and_forget(&cli).await;
163+
exec_request_response(&cli).await;
164+
exec_request_stream(&cli).await;
165+
exec_request_channel(&cli).await;
166+
cli.close();
167+
});
168+
}
169+
123170
#[tokio::main]
124171
#[test]
125172
#[ignore]

rsocket-transport-unix/Cargo.toml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
[package]
2+
name = "rsocket_rust_transport_unix"
3+
version = "0.5.3"
4+
authors = ["seal <[email protected]>"]
5+
edition = "2018"
6+
license = "Apache-2.0"
7+
readme = "README.md"
8+
repository = "https://github.com/rsocket/rsocket-rust"
9+
homepage = "https://github.com/rsocket/rsocket-rust"
10+
description = "Unix Domain Socket RSocket transport implementation."
11+
12+
[dependencies]
13+
log = "0.4.8"
14+
futures = "0.3.5"
15+
bytes = "0.5.4"
16+
17+
[dependencies.rsocket_rust]
18+
path = "../rsocket"
19+
features = ["frame"]
20+
21+
[dependencies.tokio]
22+
version = "0.2.21"
23+
default-features = false
24+
features = [ "rt-core", "rt-threaded", "uds", "sync", "stream" ]
25+
26+
[dependencies.tokio-util]
27+
version = "0.3.1"
28+
default-features = false
29+
features = ["codec"]

rsocket-transport-unix/README.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# RSocket Transport For Unix Domain Socket
2+
3+
## Example
4+
5+
Add dependencies in your `Cargo.toml`.
6+
7+
```toml
8+
[dependencies]
9+
tokio = "0.2.21"
10+
rsocket_rust = "0.5.3"
11+
rsocket_rust_transport_unix = "0.5.3"
12+
```
13+
14+
### Server
15+
16+
```rust
17+
use log::info;
18+
use rsocket_rust::prelude::{EchoRSocket, RSocketFactory, ServerResponder};
19+
use rsocket_rust_transport_unix::UnixServerTransport;
20+
use std::error::Error;
21+
22+
#[tokio::main]
23+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
24+
25+
let transport: UnixServerTransport = UnixServerTransport::from("/tmp/rsocket-uds.sock");
26+
27+
let responder: ServerResponder = Box::new(|setup, _socket| {
28+
info!("accept setup: {:?}", setup);
29+
Ok(Box::new(EchoRSocket))
30+
// Or you can reject setup
31+
// Err(From::from("SETUP_NOT_ALLOW"))
32+
});
33+
34+
let on_start: Box<dyn FnMut() + Send + Sync> =
35+
Box::new(|| info!("+++++++ echo server started! +++++++"));
36+
37+
RSocketFactory::receive()
38+
.transport(transport)
39+
.acceptor(responder)
40+
.on_start(on_start)
41+
.serve()
42+
.await?;
43+
44+
Ok(())
45+
}
46+
47+
```
48+
49+
### Client
50+
51+
```rust
52+
use log::info;
53+
use rsocket_rust::prelude::{ClientResponder, EchoRSocket, Payload, RSocket, RSocketFactory};
54+
use rsocket_rust_transport_unix::UnixClientTransport;
55+
use std::error::Error;
56+
57+
#[tokio::main]
58+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
59+
60+
let responder: ClientResponder = Box::new(|| Box::new(EchoRSocket));
61+
62+
let client = RSocketFactory::connect()
63+
.acceptor(responder)
64+
.transport(UnixClientTransport::from("/tmp/rsocket-uds.sock"))
65+
.setup(Payload::from("READY!"))
66+
.mime_type("text/plain", "text/plain")
67+
.start()
68+
.await
69+
.unwrap();
70+
71+
let request_payload: Payload = Payload::builder()
72+
.set_data_utf8("Hello World!")
73+
.set_metadata_utf8("Rust")
74+
.build();
75+
76+
let res = client.request_response(request_payload).await.unwrap();
77+
78+
info!("got: {:?}", res);
79+
80+
client.close();
81+
82+
Ok(())
83+
}
84+
85+
```

0 commit comments

Comments
 (0)