Skip to content

Commit fc0fbb2

Browse files
KannenOlivier Kannengieserjkelleyrtp
authored
Fix dioxus-fullstack::Streaming<T,E> only deserializing one client value (#4975)
* dioxus-fullstack::Streaming: decode multiple client values from Bytes * small nits * remove the bidirectional stream since it doesn't work in the browser * final cleanups * nits --------- Co-authored-by: Olivier Kannengieser <[email protected]> Co-authored-by: Jonathan Kelley <[email protected]>
1 parent a220e9a commit fc0fbb2

File tree

3 files changed

+89
-122
lines changed

3 files changed

+89
-122
lines changed

examples/07-fullstack/streaming.rs

Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use dioxus::{
2121
fullstack::{JsonEncoding, Streaming, TextStream},
2222
prelude::*,
2323
};
24-
use futures::{StreamExt as _, TryStreamExt};
2524

2625
fn main() {
2726
dioxus::launch(app)
@@ -30,8 +29,6 @@ fn main() {
3029
fn app() -> Element {
3130
let mut text_responses = use_signal(String::new);
3231
let mut json_responses = use_signal(Vec::new);
33-
let mut echo_responses = use_signal(Vec::new);
34-
let mut transform_responses = use_signal(Vec::new);
3532

3633
let mut start_text_stream = use_action(move || async move {
3734
text_responses.clear();
@@ -56,64 +53,6 @@ fn app() -> Element {
5653
dioxus::Ok(())
5754
});
5855

59-
let mut continue_echo_stream = use_signal_sync(|| false);
60-
let mut start_echo_stream = use_action(move || async move {
61-
continue_echo_stream.set(true);
62-
echo_responses.clear();
63-
let stream = echo_stream(Streaming::new(futures::stream::unfold(
64-
0,
65-
move |index| async move {
66-
if !continue_echo_stream() {
67-
return None;
68-
}
69-
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
70-
let dog = Dog {
71-
name: format!("Dog {}", index),
72-
age: (index % 10) as u8,
73-
};
74-
Some((dog, index + 1))
75-
},
76-
)))
77-
.await?;
78-
stream
79-
.into_inner()
80-
.try_for_each(move |dog| async move {
81-
echo_responses.push(dog);
82-
Ok(())
83-
})
84-
.await?;
85-
dioxus::Ok(())
86-
});
87-
88-
let mut continue_transform_stream = use_signal_sync(|| false);
89-
let mut start_transform_stream = use_action(move || async move {
90-
continue_transform_stream.set(true);
91-
transform_responses.clear();
92-
let stream = transform_stream(Streaming::new(futures::stream::unfold(
93-
0,
94-
move |index| async move {
95-
if !continue_transform_stream() {
96-
return None;
97-
}
98-
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
99-
let dog = Dog {
100-
name: format!("Dog {}", index),
101-
age: (index % 10) as u8,
102-
};
103-
Some((dog, index + 1))
104-
},
105-
)))
106-
.await?;
107-
stream
108-
.into_inner()
109-
.try_for_each(move |text| async move {
110-
transform_responses.push(text);
111-
Ok(())
112-
})
113-
.await?;
114-
dioxus::Ok(())
115-
});
116-
11756
rsx! {
11857
div {
11958
button { onclick: move |_| start_text_stream.call(), "Start text stream" }
@@ -127,20 +66,6 @@ fn app() -> Element {
12766
pre { "{dog:?}" }
12867
}
12968
}
130-
div {
131-
button { onclick: move |_| start_echo_stream.call(), "Start echo stream" }
132-
button { onclick: move |_| continue_echo_stream.set(false), "Stop echo stream" }
133-
for dog in echo_responses.read().iter() {
134-
pre { "{dog:?}" }
135-
}
136-
}
137-
div {
138-
button { onclick: move |_| start_transform_stream.call(), "Start transform stream" }
139-
button { onclick: move |_| continue_transform_stream.set(false), "Stop transform stream" }
140-
for text in transform_responses.read().iter() {
141-
pre { "{text}" }
142-
}
143-
}
14469
}
14570
}
14671

@@ -220,20 +145,3 @@ async fn byte_stream() -> Result<Streaming<Bytes>> {
220145

221146
Ok(Streaming::new(rx))
222147
}
223-
224-
/// An example of echoing the stream back to the client.
225-
#[post("/api/echo_stream")]
226-
async fn echo_stream(stream: Streaming<Dog, JsonEncoding>) -> Result<Streaming<Dog, JsonEncoding>> {
227-
Ok(stream)
228-
}
229-
230-
/// An example of transforming the stream on the server.
231-
#[post("/api/transform_stream")]
232-
async fn transform_stream(stream: Streaming<Dog, JsonEncoding>) -> Result<TextStream> {
233-
Ok(Streaming::new(stream.into_inner().filter_map(
234-
|dog| async {
235-
dog.ok()
236-
.map(|dog| format!("name: {}, age: {}", dog.name, dog.age))
237-
},
238-
)))
239-
}

packages/fullstack/src/encoding.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use serde::{de::DeserializeOwned, Serialize};
44
/// A trait for encoding and decoding data.
55
///
66
/// This takes an owned self to make it easier for zero-copy encodings.
7-
pub trait Encoding {
7+
pub trait Encoding: 'static {
88
fn content_type() -> &'static str;
99
fn stream_content_type() -> &'static str;
1010
fn to_bytes(data: impl Serialize) -> Option<Bytes> {

packages/fullstack/src/payloads/stream.rs

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
};
77
use axum::extract::{FromRequest, Request};
88
use axum_core::response::IntoResponse;
9-
use bytes::Bytes;
9+
use bytes::{Buf as _, Bytes};
1010
use dioxus_fullstack_core::{HttpError, RequestError};
1111
use futures::{Stream, StreamExt};
1212
#[cfg(feature = "server")]
@@ -276,18 +276,8 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding> FromResponse
276276
{
277277
fn from_response(res: ClientResponse) -> impl Future<Output = Result<Self, ServerFnError>> {
278278
SendWrapper::new(async move {
279-
let client_stream = Box::pin(SendWrapper::new(res.bytes_stream().map(
280-
|byte| match byte {
281-
Ok(bytes) => match decode_stream_frame::<T, E>(bytes) {
282-
Some(res) => Ok(res),
283-
None => Err(StreamingError::Decoding),
284-
},
285-
Err(_) => Err(StreamingError::Failed),
286-
},
287-
)));
288-
289279
Ok(Self {
290-
stream: client_stream,
280+
stream: byte_stream_to_client_stream::<E, _, _, _>(res.bytes_stream()),
291281
encoding: PhantomData,
292282
})
293283
})
@@ -385,13 +375,7 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding, S> FromReque
385375
let stream = body.into_data_stream();
386376

387377
Ok(Self {
388-
stream: Box::pin(stream.map(|byte| match byte {
389-
Ok(bytes) => match decode_stream_frame::<T, E>(bytes) {
390-
Some(res) => Ok(res),
391-
None => Err(StreamingError::Decoding),
392-
},
393-
Err(_) => Err(StreamingError::Failed),
394-
})),
378+
stream: byte_stream_to_client_stream::<E, _, _, _>(stream),
395379
encoding: PhantomData,
396380
})
397381
}
@@ -504,22 +488,98 @@ pub fn encode_stream_frame<T: Serialize, E: Encoding>(data: T) -> Option<Bytes>
504488
Some(Bytes::from(bytes).slice(offset..))
505489
}
506490

491+
fn byte_stream_to_client_stream<E, T, S, E1>(
492+
stream: S,
493+
) -> Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>
494+
where
495+
S: Stream<Item = Result<Bytes, E1>> + 'static + Send,
496+
E: Encoding,
497+
T: DeserializeOwned + 'static,
498+
{
499+
Box::pin(stream.flat_map(|bytes| {
500+
enum DecodeIteratorState {
501+
Empty,
502+
Failed,
503+
Checked(Bytes),
504+
UnChecked(Bytes),
505+
}
506+
507+
let mut state = match bytes {
508+
Ok(bytes) => DecodeIteratorState::UnChecked(bytes),
509+
Err(_) => DecodeIteratorState::Failed,
510+
};
511+
512+
futures::stream::iter(std::iter::from_fn(move || {
513+
match std::mem::replace(&mut state, DecodeIteratorState::Empty) {
514+
DecodeIteratorState::Empty => None,
515+
DecodeIteratorState::Failed => Some(Err(StreamingError::Failed)),
516+
DecodeIteratorState::Checked(mut bytes) => {
517+
let r = decode_stream_frame_multi::<T, E>(&mut bytes);
518+
if r.is_some() {
519+
state = DecodeIteratorState::Checked(bytes)
520+
}
521+
r
522+
}
523+
DecodeIteratorState::UnChecked(mut bytes) => {
524+
let r = decode_stream_frame_multi::<T, E>(&mut bytes);
525+
if r.is_some() {
526+
state = DecodeIteratorState::Checked(bytes);
527+
r
528+
} else {
529+
Some(Err(StreamingError::Decoding))
530+
}
531+
}
532+
}
533+
}))
534+
}))
535+
}
536+
507537
/// Decode a websocket-framed streaming payload produced by [`encode_stream_frame`].
508538
///
509539
/// This function returns `None` if the frame is invalid or cannot be decoded.
510540
///
511541
/// It cannot handle masked frames, as those are not produced by our encoding function.
512-
pub fn decode_stream_frame<T, E>(frame: Bytes) -> Option<T>
542+
pub fn decode_stream_frame<T, E>(mut frame: Bytes) -> Option<T>
513543
where
514544
E: Encoding,
515545
T: DeserializeOwned,
516546
{
547+
decode_stream_frame_multi::<T, E>(&mut frame).and_then(|r| r.ok())
548+
}
549+
550+
/// Decode one value and advance the bytes pointer
551+
///
552+
/// If the frame is empty return None.
553+
///
554+
/// Otherwise, if the initial opcode is not the one expected for binary stream
555+
/// or the frame is not large enough return error StreamingError::Decoding
556+
fn decode_stream_frame_multi<T, E>(frame: &mut Bytes) -> Option<Result<T, StreamingError>>
557+
where
558+
E: Encoding,
559+
T: DeserializeOwned,
560+
{
561+
let (offset, payload_len) = match offset_payload_len(frame)? {
562+
Ok(r) => r,
563+
Err(e) => return Some(Err(e)),
564+
};
565+
566+
let r = E::decode(frame.slice(offset..offset + payload_len));
567+
frame.advance(offset + payload_len);
568+
r.map(|r| Ok(r))
569+
}
570+
571+
/// Compute (offset,len) for decoding data
572+
fn offset_payload_len(frame: &Bytes) -> Option<Result<(usize, usize), StreamingError>> {
517573
let data = frame.as_ref();
518574

519-
if data.len() < 2 {
575+
if data.is_empty() {
520576
return None;
521577
}
522578

579+
if data.len() < 2 {
580+
return Some(Err(StreamingError::Decoding));
581+
}
582+
523583
let first = data[0];
524584
let second = data[1];
525585

@@ -528,44 +588,43 @@ where
528588
let opcode = first & 0x0F;
529589
let rsv = first & 0x70;
530590
if !fin || opcode != 0x02 || rsv != 0 {
531-
return None;
591+
return Some(Err(StreamingError::Decoding));
532592
}
533593

534594
// Mask bit must be zero for our framing
535595
if second & 0x80 != 0 {
536-
return None;
596+
return Some(Err(StreamingError::Decoding));
537597
}
538598

539599
let mut offset = 2usize;
540600
let mut payload_len = (second & 0x7F) as usize;
541601

542602
if payload_len == 126 {
543603
if data.len() < offset + 2 {
544-
return None;
604+
return Some(Err(StreamingError::Decoding));
545605
}
546606

547607
payload_len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
548608
offset += 2;
549609
} else if payload_len == 127 {
550610
if data.len() < offset + 8 {
551-
return None;
611+
return Some(Err(StreamingError::Decoding));
552612
}
553613

554614
let mut len_bytes = [0u8; 8];
555615
len_bytes.copy_from_slice(&data[offset..offset + 8]);
556616
let len_u64 = u64::from_be_bytes(len_bytes);
557617

558618
if len_u64 > usize::MAX as u64 {
559-
return None;
619+
return Some(Err(StreamingError::Decoding));
560620
}
561621

562622
payload_len = len_u64 as usize;
563623
offset += 8;
564624
}
565625

566626
if data.len() < offset + payload_len {
567-
return None;
627+
return Some(Err(StreamingError::Decoding));
568628
}
569-
570-
E::decode(frame.slice(offset..offset + payload_len))
629+
Some(Ok((offset, payload_len)))
571630
}

0 commit comments

Comments
 (0)