Skip to content

Commit 7ced3c6

Browse files
committed
Support basic features.
1 parent c677f3f commit 7ced3c6

File tree

16 files changed

+406
-294
lines changed

16 files changed

+406
-294
lines changed

Cargo.toml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ description = "rsocket-rust is an implementation of the RSocket protocol in Rust
1212
[dependencies]
1313
matches = "0.1.8"
1414
log = "0.4.8"
15-
bytes = "0.5.1"
15+
bytes = "0.5.2"
1616
futures = "0.3.1"
1717
lazy_static = "1.4.0"
1818
# reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
@@ -27,7 +27,6 @@ version = "0.2.0"
2727
default-features = false
2828
features = ["codec"]
2929

30-
3130
[dev-dependencies]
3231
env_logger = "0.7.1"
3332
hex = "0.4.0"
@@ -37,10 +36,6 @@ rand = "0.7.2"
3736
name = "echo"
3837
path = "examples/echo/main.rs"
3938

40-
[[example]]
41-
name = "echo_client"
42-
path = "examples/echo_client/main.rs"
43-
4439
# [[example]]
4540
# name = "proxy"
4641
# path = "examples/proxy/main.rs"

README.md

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![License](https://img.shields.io/github/license/rsocket/rsocket-rust.svg)](https://github.com/rsocket/rsocket-rust/blob/master/LICENSE)
66
[![GitHub Release](https://img.shields.io/github/release-pre/rsocket/rsocket-rust.svg)](https://github.com/rsocket/rsocket-rust/releases)
77

8-
> rsocket-rust is an implementation of the RSocket protocol in Rust.
8+
> rsocket-rust is an implementation of the RSocket protocol in Rust(1.39+).
99
It's an **alpha** version and still under active development. **Do not use it in a production environment!**
1010

1111
## Example
@@ -15,66 +15,57 @@ It's an **alpha** version and still under active development. **Do not use it in
1515
### Server
1616

1717
```rust
18-
extern crate bytes;
19-
extern crate futures;
2018
extern crate rsocket_rust;
21-
22-
use bytes::Bytes;
23-
use futures::prelude::*;
19+
extern crate tokio;
20+
#[macro_use]
21+
extern crate log;
2422
use rsocket_rust::prelude::*;
25-
26-
#[test]
27-
fn test_serve() {
28-
RSocketFactory::receive()
29-
.transport(URI::Tcp("127.0.0.1:7878"))
30-
.acceptor(|setup, sending_socket| {
31-
println!("accept setup: {:?}", setup);
32-
// TODO: use tokio runtime?
33-
std::thread::spawn(move || {
34-
let resp = sending_socket
35-
.request_response(
36-
Payload::builder()
37-
.set_data(Bytes::from("Hello Client!"))
38-
.build(),
39-
)
40-
.wait()
41-
.unwrap();
42-
println!(">>>>> response success: {:?}", resp);
43-
});
44-
Box::new(MockResponder)
45-
})
46-
.serve()
47-
.wait()
48-
.unwrap();
23+
use std::env;
24+
use std::error::Error;
25+
26+
#[tokio::main]
27+
async fn main() -> Result<(), Box<dyn Error>> {
28+
env_logger::builder().init();
29+
let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());
30+
31+
RSocketFactory::receive()
32+
.transport(URI::Tcp(addr))
33+
.acceptor(|setup, _socket| {
34+
info!("accept setup: {:?}", setup);
35+
Box::new(EchoRSocket)
36+
})
37+
.serve()
38+
.await
4939
}
50-
5140
```
5241

5342
### Client
5443

5544
```rust
56-
extern crate futures;
5745
extern crate rsocket_rust;
5846

59-
use futures::prelude::*;
6047
use rsocket_rust::prelude::*;
6148

49+
#[tokio::main]
6250
#[test]
63-
fn test_client() {
64-
let cli = RSocketFactory::connect()
65-
.acceptor(||Box::new(MockResponder))
66-
.transport(URI::Tcp("127.0.0.1:7878"))
67-
.setup(Payload::from("READY!"))
68-
.mime_type("text/plain", "text/plain")
69-
.start()
70-
.unwrap();
71-
let pa = Payload::builder()
72-
.set_data_utf8("Hello World!")
73-
.set_metadata_utf8("Rust!")
74-
.build();
75-
let resp = cli.request_response(pa).wait().unwrap();
76-
println!("******* response: {:?}", resp);
51+
async fn test() {
52+
let cli = RSocketFactory::connect()
53+
.acceptor(|| Box::new(EchoRSocket))
54+
.transport(URI::Tcp("127.0.0.1:7878".to_string()))
55+
.setup(Payload::from("READY!"))
56+
.mime_type("text/plain", "text/plain")
57+
.start()
58+
.await
59+
.unwrap();
60+
let req = Payload::builder()
61+
.set_data_utf8("Hello World!")
62+
.set_metadata_utf8("Rust")
63+
.build();
64+
let res = cli.request_response(req).await.unwrap();
65+
println!("got: {:?}", res);
66+
cli.close();
7767
}
68+
7869
```
7970

8071
## Dependencies

examples/echo/main.rs

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
1313

1414
RSocketFactory::receive()
1515
.transport(URI::Tcp(addr))
16-
.acceptor(|setup, sending_socket| {
16+
.acceptor(|setup, _socket| {
1717
info!("accept setup: {:?}", setup);
1818
Box::new(EchoRSocket)
1919
})
2020
.serve()
2121
.await
22-
23-
// let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());
24-
// let mut listener = TcpListener::bind(&addr).await?;
25-
// println!("Listening on: {}", addr);
26-
27-
// loop {
28-
// let (mut socket, _) = listener.accept().await?;
29-
// let (rcv_tx, mut rcv_rx) = mpsc::unbounded_channel::<Frame>();
30-
// let (snd_tx, snd_rx) = mpsc::unbounded_channel::<Frame>();
31-
32-
// tokio::spawn(
33-
// async move { rsocket_rust::transport::tcp::process(socket, snd_rx, rcv_tx).await },
34-
// );
35-
36-
// let ds = DuplexSocket::new(0, snd_tx.clone());
37-
// tokio::spawn(async move {
38-
// let acceptor = Acceptor::Generate(Arc::new(|setup, socket| Box::new(EchoRSocket)));
39-
// ds.event_loop(acceptor, rcv_rx).await;
40-
// });
41-
// }
4222
}

examples/echo_client/main.rs

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/frame/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub struct U24;
55

66
impl U24 {
77
pub fn max() -> usize {
8-
0x00FFFFFF
8+
0x00FF_FFFF
99
}
1010

1111
pub fn write(n: u32, bf: &mut BytesMut) {

src/mime.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -201,51 +201,57 @@ impl WellKnownMIME {
201201

202202
pub fn raw(&self) -> u8 {
203203
let (v, _) = MIME_MAP.get(self).unwrap();
204-
v.clone()
204+
*v
205205
}
206206
}
207207

208208
impl From<String> for WellKnownMIME {
209209
fn from(s: String) -> WellKnownMIME {
210+
let mut result = WellKnownMIME::Unknown;
210211
for (k, v) in MIME_MAP.iter() {
211212
let (_, vv) = v;
212-
if vv.to_string() == s {
213-
return k.clone();
213+
if *vv == s {
214+
result = k.clone();
215+
break;
214216
}
215217
}
216-
return WellKnownMIME::Unknown;
218+
result
217219
}
218220
}
219221

220222
impl From<&str> for WellKnownMIME {
221223
fn from(s: &str) -> WellKnownMIME {
224+
let mut result = WellKnownMIME::Unknown;
222225
for (k, v) in MIME_MAP.iter() {
223226
let (_, vv) = v;
224-
if vv.to_string() == s.to_string() {
225-
return k.clone();
227+
if *vv == s {
228+
result = k.clone();
229+
break;
226230
}
227231
}
228-
return WellKnownMIME::Unknown;
232+
result
229233
}
230234
}
231235

232236
impl From<u8> for WellKnownMIME {
233237
fn from(n: u8) -> WellKnownMIME {
238+
let mut result = WellKnownMIME::Unknown;
234239
for (k, v) in MIME_MAP.iter() {
235240
let (a, _) = v;
236241
if *a == n {
237-
return k.clone();
242+
result = k.clone();
243+
break;
238244
}
239245
}
240-
return WellKnownMIME::Unknown;
246+
result
241247
}
242248
}
243249

244250
impl fmt::Display for WellKnownMIME {
245251
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
246-
return match MIME_MAP.get(self) {
252+
match MIME_MAP.get(self) {
247253
Some((_, v)) => write!(f, "{}", v),
248-
None => write!(f, "{}", "unknown"),
249-
};
254+
None => write!(f, "unknown"),
255+
}
250256
}
251257
}

src/payload/default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::frame;
22
use bytes::Bytes;
33

4-
#[derive(Debug)]
4+
#[derive(Debug, Clone)]
55
pub struct Payload {
66
m: Option<Bytes>,
77
d: Option<Bytes>,

src/prelude.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ pub use crate::payload::*;
33
pub use crate::result::*;
44
pub use crate::spi::*;
55
pub use crate::x::{Client, RSocketFactory, URI};
6+
pub use futures::{Sink, SinkExt, Stream, StreamExt};

src/spi.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,48 @@
1-
use crate::{
2-
errors::{ErrorKind, RSocketError},
3-
frame,
4-
payload::{Payload, SetupPayload},
5-
result::RSocketResult,
6-
};
1+
use crate::errors::{ErrorKind, RSocketError};
2+
use crate::frame;
3+
use crate::payload::{Payload, SetupPayload};
4+
use crate::result::RSocketResult;
75
use futures::future;
6+
use futures::{Sink, SinkExt, Stream, StreamExt};
87
use std::future::Future;
98
use std::pin::Pin;
109
use std::sync::Arc;
1110

12-
pub type Single<T> = Pin<Box<dyn Send + Sync + Future<Output = RSocketResult<T>>>>;
11+
// TODO: switch to reactor-rust.
12+
pub type Mono<T> = Pin<Box<dyn Send + Sync + Future<Output = RSocketResult<T>>>>;
13+
pub type Flux<T> = Pin<Box<dyn Send + Sync + Stream<Item = RSocketResult<T>>>>;
1314

1415
pub trait RSocket: Sync + Send {
15-
fn metadata_push(&self, req: Payload) -> Single<()>;
16-
fn fire_and_forget(&self, req: Payload) -> Single<()>;
17-
fn request_response(&self, req: Payload) -> Single<Payload>;
16+
fn metadata_push(&self, req: Payload) -> Mono<()>;
17+
fn fire_and_forget(&self, req: Payload) -> Mono<()>;
18+
fn request_response(&self, req: Payload) -> Mono<Payload>;
19+
fn request_stream(&self, req: Payload) -> Flux<Payload>;
20+
// fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload>;
1821
}
1922

2023
pub struct EchoRSocket;
2124

2225
impl RSocket for EchoRSocket {
23-
fn metadata_push(&self, req: Payload) -> Single<()> {
26+
fn metadata_push(&self, req: Payload) -> Mono<()> {
27+
info!("echo metadata_push: {:?}", req);
2428
Box::pin(future::ok::<(), RSocketError>(()))
2529
}
26-
fn fire_and_forget(&self, req: Payload) -> Single<()> {
30+
fn fire_and_forget(&self, req: Payload) -> Mono<()> {
2731
info!("echo fire_and_forget: {:?}", req);
2832
Box::pin(future::ok::<(), RSocketError>(()))
2933
}
30-
31-
fn request_response(&self, req: Payload) -> Single<Payload> {
34+
fn request_response(&self, req: Payload) -> Mono<Payload> {
3235
info!("echo request_response: {:?}", req);
3336
Box::pin(future::ok::<Payload, RSocketError>(req))
3437
}
38+
fn request_stream(&self, req: Payload) -> Flux<Payload> {
39+
info!("echo request_stream: {:?}", req);
40+
Box::pin(futures::stream::iter(vec![
41+
Ok(req.clone()),
42+
Ok(req.clone()),
43+
Ok(req),
44+
]))
45+
}
3546
}
3647

3748
pub struct EmptyRSocket;
@@ -43,17 +54,21 @@ impl EmptyRSocket {
4354
}
4455

4556
impl RSocket for EmptyRSocket {
46-
fn metadata_push(&self, _req: Payload) -> Single<()> {
57+
fn metadata_push(&self, _req: Payload) -> Mono<()> {
4758
Box::pin(future::err(self.must_failed()))
4859
}
4960

50-
fn fire_and_forget(&self, _req: Payload) -> Single<()> {
61+
fn fire_and_forget(&self, _req: Payload) -> Mono<()> {
5162
Box::pin(future::err(self.must_failed()))
5263
}
5364

54-
fn request_response(&self, _req: Payload) -> Single<Payload> {
65+
fn request_response(&self, _req: Payload) -> Mono<Payload> {
5566
Box::pin(future::err(self.must_failed()))
5667
}
68+
69+
fn request_stream(&self, req: Payload) -> Flux<Payload> {
70+
Box::pin(futures::stream::empty())
71+
}
5772
}
5873

5974
pub type AcceptorGenerator = Arc<fn(SetupPayload, Box<dyn RSocket>) -> Box<dyn RSocket>>;

0 commit comments

Comments
 (0)