Skip to content

Commit 4516102

Browse files
committed
optimize setup data and metadata for messaging requester.
1 parent d90ef92 commit 4516102

File tree

5 files changed

+60
-87
lines changed

5 files changed

+60
-87
lines changed

rsocket-messaging/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ authors = ["Jeffsky <[email protected]>"]
55
edition = "2018"
66

77
[dependencies]
8-
log = "0.4.8"
98
futures = "0.3.5"
109
bytes = "0.5.4"
11-
lazy_static = "1.4.0"
1210
serde = "1.0.110"
1311
serde_json = "1.0.53"
1412
serde_cbor = "0.11.1"

rsocket-messaging/src/lib.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
#[macro_use]
2-
extern crate log;
3-
// #[macro_use]
4-
// extern crate lazy_static;
5-
61
mod misc;
72
mod requester;
83

rsocket-messaging/src/misc.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use serde::{de::DeserializeOwned, Serialize};
22
use std::error::Error;
33

44
pub trait SerDe {
5-
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
5+
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>
66
where
77
Self: Sized,
88
T: Sized + Serialize;
99

10-
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error>>
10+
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error + Send + Sync>>
1111
where
1212
Self: Sized,
1313
T: Sized + DeserializeOwned;
@@ -17,14 +17,14 @@ pub trait SerDe {
1717
struct JsonSerDe {}
1818

1919
impl SerDe for JsonSerDe {
20-
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
20+
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>
2121
where
2222
T: Sized + Serialize,
2323
{
2424
Ok(serde_json::to_vec(data)?)
2525
}
2626

27-
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error>>
27+
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error + Send + Sync>>
2828
where
2929
T: Sized + DeserializeOwned,
3030
{
@@ -43,29 +43,29 @@ pub fn cbor() -> impl SerDe {
4343
struct CborSerDe {}
4444

4545
impl SerDe for CborSerDe {
46-
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
46+
fn marshal<T>(&self, data: &T) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>
4747
where
4848
T: Sized + Serialize,
4949
{
5050
Ok(serde_cbor::to_vec(data)?)
5151
}
5252

53-
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error>>
53+
fn unmarshal<T>(&self, raw: &[u8]) -> Result<T, Box<dyn Error + Send + Sync>>
5454
where
5555
T: Sized + DeserializeOwned,
5656
{
5757
Ok(serde_cbor::from_slice(raw)?)
5858
}
5959
}
6060

61-
pub(crate) fn marshal<T>(ser: impl SerDe, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
61+
pub(crate) fn marshal<T>(ser: impl SerDe, data: &T) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>
6262
where
6363
T: Sized + Serialize,
6464
{
6565
ser.marshal(data)
6666
}
6767

68-
pub(crate) fn unmarshal<T>(de: impl SerDe, raw: &[u8]) -> Result<T, Box<dyn Error>>
68+
pub(crate) fn unmarshal<T>(de: impl SerDe, raw: &[u8]) -> Result<T, Box<dyn Error + Send + Sync>>
6969
where
7070
T: Sized + DeserializeOwned,
7171
{

rsocket-messaging/src/requester.rs

Lines changed: 48 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use super::misc::{self, marshal, unmarshal};
22
use bytes::{Bytes, BytesMut};
33
use rsocket_rust::error::RSocketError;
4-
use rsocket_rust::extension::{
5-
CompositeMetadata, CompositeMetadataEntry, MimeType, RoutingMetadata,
6-
};
4+
use rsocket_rust::extension::{CompositeMetadata, MimeType, RoutingMetadata};
75
use rsocket_rust::prelude::*;
86
use rsocket_rust::utils::Writeable;
97
use rsocket_rust_transport_tcp::TcpClientTransport;
@@ -16,11 +14,13 @@ use std::result::Result;
1614
use std::sync::Arc;
1715
use url::Url;
1816

19-
type FnMetadata = Box<dyn FnMut() -> Result<(MimeType, Vec<u8>), Box<dyn Error>>>;
20-
type FnData = Box<dyn FnMut(&MimeType) -> Result<Vec<u8>, Box<dyn Error>>>;
21-
type PreflightResult = Result<(Payload, MimeType, Arc<Box<dyn RSocket>>), Box<dyn Error>>;
22-
type UnpackerResult = Result<(MimeType, Payload), RSocketError>;
23-
type UnpackersResult = Result<(MimeType, Flux<Result<Payload, RSocketError>>), Box<dyn Error>>;
17+
type FnMetadata = Box<dyn FnMut() -> Result<(MimeType, Vec<u8>), Box<dyn Error + Sync + Send>>>;
18+
type FnData = Box<dyn FnMut(&MimeType) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>>>;
19+
type PreflightResult =
20+
Result<(Payload, MimeType, Arc<Box<dyn RSocket>>), Box<dyn Error + Sync + Send>>;
21+
type UnpackerResult = Result<(MimeType, Payload), Box<dyn Error + Sync + Send>>;
22+
type UnpackersResult =
23+
Result<(MimeType, Flux<Result<Payload, RSocketError>>), Box<dyn Error + Sync + Send>>;
2424

2525
enum TransportKind {
2626
TCP(String, u16),
@@ -41,8 +41,8 @@ pub struct RequestSpec {
4141
pub struct RequesterBuilder {
4242
data_mime_type: Option<MimeType>,
4343
route: Option<String>,
44-
metadata: Vec<CompositeMetadataEntry>,
45-
data: Option<Vec<u8>>,
44+
metadata: LinkedList<FnMetadata>,
45+
data: Option<FnData>,
4646
tp: Option<TransportKind>,
4747
}
4848

@@ -83,40 +83,26 @@ impl RequesterBuilder {
8383
self
8484
}
8585

86-
pub fn setup_data<D>(mut self, data: &D) -> Self
86+
pub fn setup_data<D>(mut self, data: D) -> Self
8787
where
88-
D: Sized + Serialize,
88+
D: Sized + Serialize + 'static,
8989
{
90-
// TODO: lazy set
91-
let result = match &self.data_mime_type {
92-
Some(m) => do_marshal(m, data),
93-
None => do_marshal(&MimeType::APPLICATION_JSON, data),
94-
};
95-
match result {
96-
Ok(raw) => {
97-
self.data = Some(raw);
98-
}
99-
Err(e) => {
100-
error!("marshal failed: {:?}", e);
101-
}
102-
}
90+
self.data = Some(Box::new(move |mime_type: &MimeType| {
91+
do_marshal(mime_type, &data)
92+
}));
10393
self
10494
}
10595

106-
pub fn setup_metadata<M, T>(mut self, metadata: &M, mime_type: T) -> Self
96+
pub fn setup_metadata<M, T>(mut self, metadata: M, mime_type: T) -> Self
10797
where
108-
M: Sized + Serialize,
98+
M: Sized + Serialize + 'static,
10999
T: Into<MimeType>,
110100
{
111-
// TODO: lazy set
112101
let mime_type = mime_type.into();
113-
match do_marshal(&mime_type, metadata) {
114-
Ok(raw) => {
115-
let entry = CompositeMetadataEntry::new(mime_type, Bytes::from(raw));
116-
self.metadata.push(entry);
117-
}
118-
Err(e) => error!("marshal failed: {:?}", e),
119-
}
102+
self.metadata.push_back(Box::new(move || {
103+
let raw = do_marshal(&mime_type, &metadata)?;
104+
Ok((mime_type.clone(), raw))
105+
}));
120106
self
121107
}
122108

@@ -148,8 +134,10 @@ impl RequesterBuilder {
148134
composite_builder.push(MimeType::MESSAGE_X_RSOCKET_ROUTING_V0, routing.bytes());
149135
added += 1;
150136
}
151-
for it in self.metadata.into_iter() {
152-
composite_builder = composite_builder.push_entry(it);
137+
138+
for mut gen in self.metadata.into_iter() {
139+
let (mime_type, raw) = gen()?;
140+
composite_builder = composite_builder.push(mime_type, raw);
153141
added += 1;
154142
}
155143

@@ -159,8 +147,8 @@ impl RequesterBuilder {
159147
payload_builder = payload_builder.set_metadata(composite_builder.build());
160148
}
161149

162-
if let Some(raw) = self.data {
163-
payload_builder = payload_builder.set_data(raw);
150+
if let Some(mut gen) = self.data {
151+
payload_builder = payload_builder.set_data(gen(&data_mime_type)?);
164152
}
165153

166154
let setup = payload_builder.build();
@@ -291,16 +279,12 @@ impl RequestSpec {
291279
Ok(v) => Unpacker {
292280
inner: Ok((mime_type, v)),
293281
},
294-
Err(e) => Unpacker { inner: Err(e) },
295-
}
296-
}
297-
Err(e) => {
298-
// TODO: better error
299-
let msg = format!("{}", e);
300-
Unpacker {
301-
inner: Err(RSocketError::from(msg)),
282+
Err(e) => Unpacker {
283+
inner: Err(e.into()),
284+
},
302285
}
303286
}
287+
Err(e) => Unpacker { inner: Err(e) },
304288
}
305289
}
306290

@@ -337,7 +321,7 @@ impl RequestSpec {
337321
}
338322

339323
impl Unpackers {
340-
pub async fn block<T>(self) -> Result<Vec<T>, Box<dyn Error>>
324+
pub async fn block<T>(self) -> Result<Vec<T>, Box<dyn Error + Sync + Send>>
341325
where
342326
T: Sized + DeserializeOwned,
343327
{
@@ -353,12 +337,13 @@ impl Unpackers {
353337
}
354338
}
355339
}
356-
Err(e) => return Err(format!("{}", e).into()),
340+
Err(e) => return Err(e.into()),
357341
}
358342
}
359343
Ok(res)
360344
}
361-
pub async fn foreach<T>(self, callback: impl Fn(T)) -> Result<(), Box<dyn Error>>
345+
346+
pub async fn foreach<T>(self, callback: impl Fn(T)) -> Result<(), Box<dyn Error + Send + Sync>>
362347
where
363348
T: Sized + DeserializeOwned,
364349
{
@@ -381,39 +366,34 @@ impl Unpackers {
381366
}
382367

383368
impl Unpacker {
384-
pub fn block<T>(self) -> Result<Option<T>, Box<dyn Error>>
369+
pub fn block<T>(self) -> Result<Option<T>, Box<dyn Error + Send + Sync>>
385370
where
386371
T: Sized + DeserializeOwned,
387372
{
388-
match self.inner {
389-
Ok((mime_type, inner)) => match inner.data() {
390-
// TODO: support more mime types.
391-
Some(raw) => do_unmarshal(&mime_type, raw),
392-
None => Ok(None),
393-
},
394-
Err(e) => Err(format!("{}", e).into()),
373+
let (mime_type, inner) = self.inner?;
374+
match inner.data() {
375+
Some(raw) => do_unmarshal(&mime_type, raw),
376+
None => Ok(None),
395377
}
396378
}
397379
}
398380

399-
fn do_unmarshal<T>(mime_type: &MimeType, raw: &Bytes) -> Result<Option<T>, Box<dyn Error>>
381+
fn do_unmarshal<T>(
382+
mime_type: &MimeType,
383+
raw: &Bytes,
384+
) -> Result<Option<T>, Box<dyn Error + Send + Sync>>
400385
where
401386
T: Sized + DeserializeOwned,
402387
{
388+
// TODO: support more mime types
403389
match *mime_type {
404-
MimeType::APPLICATION_JSON => {
405-
let t = unmarshal(misc::json(), &raw.as_ref())?;
406-
Ok(Some(t))
407-
}
408-
MimeType::APPLICATION_CBOR => {
409-
let t = unmarshal(misc::cbor(), &raw.as_ref())?;
410-
Ok(Some(t))
411-
}
390+
MimeType::APPLICATION_JSON => Ok(Some(unmarshal(misc::json(), &raw.as_ref())?)),
391+
MimeType::APPLICATION_CBOR => Ok(Some(unmarshal(misc::cbor(), &raw.as_ref())?)),
412392
_ => Err("unsupported mime type!".into()),
413393
}
414394
}
415395

416-
fn do_marshal<T>(mime_type: &MimeType, data: &T) -> Result<Vec<u8>, Box<dyn Error>>
396+
fn do_marshal<T>(mime_type: &MimeType, data: &T) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>
417397
where
418398
T: Sized + Serialize,
419399
{

rsocket-test/tests/test_messaging.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct Token {
1919
access: String,
2020
}
2121

22-
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
22+
#[derive(Serialize, Deserialize, Debug, Default)]
2323
pub struct Student {
2424
id: i64,
2525
name: String,
@@ -44,13 +44,13 @@ pub struct Response<T> {
4444
#[ignore]
4545
async fn test_messaging() {
4646
init();
47-
let token = Token {
47+
let token = || Token {
4848
app: "xxx".to_owned(),
4949
access: "yyy".to_owned(),
5050
};
5151
let requester = Requester::builder()
52-
.setup_metadata(&token, MimeType::APPLICATION_JSON)
53-
.setup_data(&token)
52+
.setup_metadata(token(), MimeType::APPLICATION_JSON)
53+
.setup_data(token())
5454
.connect_tcp("127.0.0.1", 7878)
5555
.build()
5656
.await

0 commit comments

Comments
 (0)