Skip to content

Commit 777d554

Browse files
authored
Merge pull request #3 from rsocket/feature/routing_metadata
Feature/routing metadata
2 parents b4154b3 + 566a770 commit 777d554

File tree

12 files changed

+314
-65
lines changed

12 files changed

+314
-65
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rsocket_rust"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
authors = ["Jeffsky <[email protected]>"]
55
edition = "2018"
66
license = "Apache-2.0"
@@ -18,7 +18,7 @@ lazy_static = "1.4.0"
1818
# reactor_rs = {git = "https://github.com/jjeffcaii/reactor-rust", branch = "develop"}
1919

2020
[dependencies.tokio]
21-
version = "0.2.1"
21+
version = "0.2.2"
2222
default-features = false
2323
features = ["full"]
2424

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,12 @@ async fn test() {
8080
- [x] REQUEST_FNF
8181
- [x] REQUEST_RESPONSE
8282
- [x] REQUEST_STREAM
83-
- [ ] REQUEST_CHANNEL
83+
- [x] REQUEST_CHANNEL
8484
- Transport
8585
- [x] TCP
8686
- [ ] Websocket
8787
- Reactor
8888
- [ ] ...
8989
- High Level APIs
90-
- [ ] Client
91-
- [ ] Server
90+
- [x] Client
91+
- [x] Server

src/errors.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::io;
55
#[derive(Debug)]
66
pub enum ErrorKind {
77
Internal(u32, &'static str),
8-
WithDescription(&'static str),
8+
WithDescription(String),
99
IO(io::Error),
1010
Cancelled(),
1111
Send(),
@@ -41,11 +41,18 @@ impl From<ErrorKind> for RSocketError {
4141
RSocketError { kind }
4242
}
4343
}
44+
impl From<String> for RSocketError {
45+
fn from(e: String) -> RSocketError {
46+
RSocketError {
47+
kind: ErrorKind::WithDescription(e),
48+
}
49+
}
50+
}
4451

4552
impl From<&'static str> for RSocketError {
4653
fn from(e: &'static str) -> RSocketError {
4754
RSocketError {
48-
kind: ErrorKind::WithDescription(e),
55+
kind: ErrorKind::WithDescription(String::from(e)),
4956
}
5057
}
5158
}

src/extension.rs

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,92 @@ use crate::result::RSocketResult;
55
use bytes::{Buf, BufMut, Bytes, BytesMut};
66

77
const MAX_MIME_LEN: usize = 0x7F;
8+
const MAX_ROUTING_TAG_LEN: usize = 0xFF;
89

910
#[derive(Debug, Clone, Eq, PartialEq)]
1011
pub struct CompositeMetadata {
1112
mime: String,
1213
payload: Bytes,
1314
}
1415

16+
#[derive(Debug, Clone)]
17+
pub struct RoutingMetadata {
18+
tags: Vec<String>,
19+
}
20+
21+
pub struct RoutingMetadataBuilder {
22+
inner: RoutingMetadata,
23+
}
24+
25+
impl RoutingMetadataBuilder {
26+
pub fn push_str(self, tag: &str) -> Self {
27+
self.push(String::from(tag))
28+
}
29+
30+
pub fn push(mut self, tag: String) -> Self {
31+
if tag.len() > MAX_ROUTING_TAG_LEN {
32+
panic!("exceeded maximum routing tag length!");
33+
}
34+
self.inner.tags.push(tag);
35+
self
36+
}
37+
pub fn build(self) -> RoutingMetadata {
38+
self.inner
39+
}
40+
}
41+
42+
impl RoutingMetadata {
43+
pub fn builder() -> RoutingMetadataBuilder {
44+
RoutingMetadataBuilder {
45+
inner: RoutingMetadata { tags: vec![] },
46+
}
47+
}
48+
49+
pub fn decode(bf: &mut BytesMut) -> RSocketResult<RoutingMetadata> {
50+
let mut bu = RoutingMetadata::builder();
51+
loop {
52+
match Self::decode_once(bf) {
53+
Ok(v) => match v {
54+
Some(tag) => bu = bu.push(tag),
55+
None => break,
56+
},
57+
Err(e) => return Err(e),
58+
}
59+
}
60+
Ok(bu.build())
61+
}
62+
63+
fn decode_once(bf: &mut BytesMut) -> RSocketResult<Option<String>> {
64+
if bf.is_empty() {
65+
return Ok(None);
66+
}
67+
let size = bf.get_u8() as usize;
68+
if bf.len() < size {
69+
return Err(RSocketError::from("require more bytes!"));
70+
}
71+
let tag = String::from_utf8(bf.split_to(size).to_vec()).unwrap();
72+
Ok(Some(tag))
73+
}
74+
75+
pub fn get_tags(&self) -> &Vec<String> {
76+
&self.tags
77+
}
78+
79+
pub fn write_to(&self, bf: &mut BytesMut) {
80+
for tag in &self.tags {
81+
let size = tag.len() as u8;
82+
bf.put_u8(size);
83+
bf.put_slice(tag.as_bytes());
84+
}
85+
}
86+
87+
pub fn bytes(&self) -> Bytes {
88+
let mut bf = BytesMut::new();
89+
self.write_to(&mut bf);
90+
bf.freeze()
91+
}
92+
}
93+
1594
impl CompositeMetadata {
1695
pub fn new(mime: String, payload: Bytes) -> CompositeMetadata {
1796
if mime.len() > MAX_MIME_LEN {
@@ -50,24 +129,18 @@ impl CompositeMetadata {
50129
// Bad
51130
let mime_len = first as usize;
52131
if bs.len() < mime_len {
53-
return Err(RSocketError::from(ErrorKind::WithDescription(
54-
"bad COMPOSITE_METADATA bytes: missing required bytes!",
55-
)));
132+
return Err(RSocketError::from("broken COMPOSITE_METADATA bytes!"));
56133
}
57134
let front = bs.split_to(mime_len);
58135
String::from_utf8(front.to_vec()).unwrap()
59136
};
60137

61138
if bs.len() < 3 {
62-
return Err(RSocketError::from(ErrorKind::WithDescription(
63-
"bad COMPOSITE_METADATA bytes: missing required bytes!",
64-
)));
139+
return Err(RSocketError::from("broken COMPOSITE_METADATA bytes!"));
65140
}
66141
let payload_size = U24::read_advance(bs) as usize;
67142
if bs.len() < payload_size {
68-
return Err(RSocketError::from(ErrorKind::WithDescription(
69-
"bad COMPOSITE_METADATA bytes: missing required bytes!",
70-
)));
143+
return Err(RSocketError::from("broken COMPOSITE_METADATA bytes!"));
71144
}
72145
let p = bs.split_to(payload_size).freeze();
73146
Ok(Some(CompositeMetadata::new(m, p)))

src/payload/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
mod default;
22
mod setup;
33

4-
pub use default::{Payload,PayloadBuilder};
5-
pub use setup::{SetupPayload,SetupPayloadBuilder};
4+
pub use default::{Payload, PayloadBuilder};
5+
pub use setup::{SetupPayload, SetupPayloadBuilder};

src/spi.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use futures::{Sink, SinkExt, Stream, StreamExt};
77
use std::future::Future;
88
use std::pin::Pin;
99
use std::sync::Arc;
10+
use tokio::sync::mpsc;
1011

1112
// TODO: switch to reactor-rust.
1213
pub type Mono<T> = Pin<Box<dyn Send + Sync + Future<Output = RSocketResult<T>>>>;
@@ -17,32 +18,45 @@ pub trait RSocket: Sync + Send {
1718
fn fire_and_forget(&self, req: Payload) -> Mono<()>;
1819
fn request_response(&self, req: Payload) -> Mono<Payload>;
1920
fn request_stream(&self, req: Payload) -> Flux<Payload>;
20-
// fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload>;
21+
fn request_channel(&self, reqs: Flux<Payload>) -> Flux<Payload>;
2122
}
2223

2324
pub struct EchoRSocket;
2425

2526
impl RSocket for EchoRSocket {
2627
fn metadata_push(&self, req: Payload) -> Mono<()> {
27-
info!("echo metadata_push: {:?}", req);
28+
info!("{:?}", req);
2829
Box::pin(future::ok::<(), RSocketError>(()))
2930
}
3031
fn fire_and_forget(&self, req: Payload) -> Mono<()> {
31-
info!("echo fire_and_forget: {:?}", req);
32+
info!("{:?}", req);
3233
Box::pin(future::ok::<(), RSocketError>(()))
3334
}
3435
fn request_response(&self, req: Payload) -> Mono<Payload> {
35-
info!("echo request_response: {:?}", req);
36+
info!("{:?}", req);
3637
Box::pin(future::ok::<Payload, RSocketError>(req))
3738
}
3839
fn request_stream(&self, req: Payload) -> Flux<Payload> {
39-
info!("echo request_stream: {:?}", req);
40+
info!("{:?}", req);
4041
Box::pin(futures::stream::iter(vec![
4142
Ok(req.clone()),
4243
Ok(req.clone()),
4344
Ok(req),
4445
]))
4546
}
47+
fn request_channel(&self, mut reqs: Flux<Payload>) -> Flux<Payload> {
48+
let (sender, receiver) = mpsc::unbounded_channel::<RSocketResult<Payload>>();
49+
tokio::spawn(async move {
50+
while let Some(it) = reqs.next().await {
51+
let pa = it.unwrap();
52+
info!("{:?}", pa);
53+
sender.send(Ok(pa)).unwrap();
54+
}
55+
});
56+
Box::pin(receiver)
57+
// or returns directly
58+
// reqs
59+
}
4660
}
4761

4862
pub struct EmptyRSocket;
@@ -66,8 +80,11 @@ impl RSocket for EmptyRSocket {
6680
Box::pin(future::err(self.must_failed()))
6781
}
6882

69-
fn request_stream(&self, req: Payload) -> Flux<Payload> {
70-
Box::pin(futures::stream::empty())
83+
fn request_stream(&self, _req: Payload) -> Flux<Payload> {
84+
Box::pin(futures::stream::iter(vec![Err(self.must_failed())]))
85+
}
86+
fn request_channel(&self, _reqs: Flux<Payload>) -> Flux<Payload> {
87+
Box::pin(futures::stream::iter(vec![Err(self.must_failed())]))
7188
}
7289
}
7390

src/transport/misc.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
collections::HashMap,
1010
future::Future,
1111
sync::{
12-
atomic::{AtomicU32, Ordering},
12+
atomic::{AtomicI64, AtomicU32, Ordering},
1313
Arc, Mutex, RwLock,
1414
},
1515
};
@@ -37,3 +37,30 @@ impl From<u32> for StreamID {
3737
StreamID::new(v)
3838
}
3939
}
40+
41+
#[derive(Debug, Clone)]
42+
pub(crate) struct Counter {
43+
inner: Arc<AtomicI64>,
44+
}
45+
46+
impl Counter {
47+
pub(crate) fn new(value: i64) -> Counter {
48+
Counter {
49+
inner: Arc::new(AtomicI64::new(value)),
50+
}
51+
}
52+
53+
pub(crate) fn count_down(&self) -> i64 {
54+
let c = self.inner.clone();
55+
c.fetch_add(-1, Ordering::SeqCst)
56+
}
57+
}
58+
59+
#[inline]
60+
pub(crate) fn debug_frame(snd: bool, f: &frame::Frame) {
61+
if snd {
62+
debug!("===> SND: {:?}", f);
63+
} else {
64+
debug!("<=== RCV: {:?}", f);
65+
}
66+
}

0 commit comments

Comments
 (0)