Skip to content

Commit a220e9a

Browse files
authored
fix(fullstack): Streaming::into_inner() returning empty stream (#4961)
* fix(fullstack): Streaming::into_inner() returning empty stream The Streaming struct previously maintained separate input_stream and output_stream fields, but into_inner() always returned input_stream. This caused it to return an empty stream on both client and server sides where data was placed in output_stream. Consolidate to a single stream field that properly returns the actual stream data when into_inner() is called. Also add examples demonstrating bidirectional streaming: - echo_stream: passes client stream through server unchanged - transform_stream: transforms client stream on server using filter_map - Use stream::unfold for cleaner client-side stream generation Reviewed-by: Claude <[email protected]> Commit-message-by: Claude <[email protected]> * fmt
1 parent 946a925 commit a220e9a

File tree

2 files changed

+110
-28
lines changed

2 files changed

+110
-28
lines changed

examples/07-fullstack/streaming.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use dioxus::{
2121
fullstack::{JsonEncoding, Streaming, TextStream},
2222
prelude::*,
2323
};
24+
use futures::{StreamExt as _, TryStreamExt};
2425

2526
fn main() {
2627
dioxus::launch(app)
@@ -29,6 +30,8 @@ fn main() {
2930
fn app() -> Element {
3031
let mut text_responses = use_signal(String::new);
3132
let mut json_responses = use_signal(Vec::new);
33+
let mut echo_responses = use_signal(Vec::new);
34+
let mut transform_responses = use_signal(Vec::new);
3235

3336
let mut start_text_stream = use_action(move || async move {
3437
text_responses.clear();
@@ -53,6 +56,64 @@ fn app() -> Element {
5356
dioxus::Ok(())
5457
});
5558

59+
let mut continue_echo_stream = use_signal_sync(|| false);
60+
let mut start_echo_stream = use_action(move || async move {
61+
continue_echo_stream.set(true);
62+
echo_responses.clear();
63+
let stream = echo_stream(Streaming::new(futures::stream::unfold(
64+
0,
65+
move |index| async move {
66+
if !continue_echo_stream() {
67+
return None;
68+
}
69+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
70+
let dog = Dog {
71+
name: format!("Dog {}", index),
72+
age: (index % 10) as u8,
73+
};
74+
Some((dog, index + 1))
75+
},
76+
)))
77+
.await?;
78+
stream
79+
.into_inner()
80+
.try_for_each(move |dog| async move {
81+
echo_responses.push(dog);
82+
Ok(())
83+
})
84+
.await?;
85+
dioxus::Ok(())
86+
});
87+
88+
let mut continue_transform_stream = use_signal_sync(|| false);
89+
let mut start_transform_stream = use_action(move || async move {
90+
continue_transform_stream.set(true);
91+
transform_responses.clear();
92+
let stream = transform_stream(Streaming::new(futures::stream::unfold(
93+
0,
94+
move |index| async move {
95+
if !continue_transform_stream() {
96+
return None;
97+
}
98+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
99+
let dog = Dog {
100+
name: format!("Dog {}", index),
101+
age: (index % 10) as u8,
102+
};
103+
Some((dog, index + 1))
104+
},
105+
)))
106+
.await?;
107+
stream
108+
.into_inner()
109+
.try_for_each(move |text| async move {
110+
transform_responses.push(text);
111+
Ok(())
112+
})
113+
.await?;
114+
dioxus::Ok(())
115+
});
116+
56117
rsx! {
57118
div {
58119
button { onclick: move |_| start_text_stream.call(), "Start text stream" }
@@ -66,6 +127,20 @@ fn app() -> Element {
66127
pre { "{dog:?}" }
67128
}
68129
}
130+
div {
131+
button { onclick: move |_| start_echo_stream.call(), "Start echo stream" }
132+
button { onclick: move |_| continue_echo_stream.set(false), "Stop echo stream" }
133+
for dog in echo_responses.read().iter() {
134+
pre { "{dog:?}" }
135+
}
136+
}
137+
div {
138+
button { onclick: move |_| start_transform_stream.call(), "Start transform stream" }
139+
button { onclick: move |_| continue_transform_stream.set(false), "Stop transform stream" }
140+
for text in transform_responses.read().iter() {
141+
pre { "{text}" }
142+
}
143+
}
69144
}
70145
}
71146

@@ -145,3 +220,20 @@ async fn byte_stream() -> Result<Streaming<Bytes>> {
145220

146221
Ok(Streaming::new(rx))
147222
}
223+
224+
/// An example of echoing the stream back to the client.
225+
#[post("/api/echo_stream")]
226+
async fn echo_stream(stream: Streaming<Dog, JsonEncoding>) -> Result<Streaming<Dog, JsonEncoding>> {
227+
Ok(stream)
228+
}
229+
230+
/// An example of transforming the stream on the server.
231+
#[post("/api/transform_stream")]
232+
async fn transform_stream(stream: Streaming<Dog, JsonEncoding>) -> Result<TextStream> {
233+
Ok(Streaming::new(stream.into_inner().filter_map(
234+
|dog| async {
235+
dog.ok()
236+
.map(|dog| format!("name: {}, age: {}", dog.name, dog.age))
237+
},
238+
)))
239+
}

packages/fullstack/src/payloads/stream.rs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ pub type ChunkedTextStream = Streaming<String, CborEncoding>;
9898
///
9999
/// Also note that not all browsers support streaming bodies to servers.
100100
pub struct Streaming<T = String, E = ()> {
101-
output_stream: Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
102-
input_stream: Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
101+
stream: Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
103102
encoding: PhantomData<E>,
104103
}
105104

@@ -123,9 +122,8 @@ impl<T: 'static + Send, E> Streaming<T, E> {
123122
pub fn new(value: impl Stream<Item = T> + Send + 'static) -> Self {
124123
// Box and pin the incoming stream and store as a trait object
125124
Self {
126-
input_stream: Box::pin(value.map(|item| Ok(item)))
125+
stream: Box::pin(value.map(|item| Ok(item)))
127126
as Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
128-
output_stream: Box::pin(futures::stream::empty()) as _,
129127
encoding: PhantomData,
130128
}
131129
}
@@ -148,21 +146,20 @@ impl<T: 'static + Send, E> Streaming<T, E> {
148146

149147
/// Returns the next item in the stream, or `None` if the stream has ended.
150148
pub async fn next(&mut self) -> Option<Result<T, StreamingError>> {
151-
self.output_stream.as_mut().next().await
149+
self.stream.as_mut().next().await
152150
}
153151

154152
/// Consumes the wrapper, returning the inner stream.
155153
pub fn into_inner(self) -> impl Stream<Item = Result<T, StreamingError>> + Send {
156-
self.input_stream
154+
self.stream
157155
}
158156

159157
/// Creates a streaming payload from an existing stream of bytes.
160158
///
161159
/// This uses the internal framing mechanism to decode the stream into items of type `T`.
162160
fn from_bytes(stream: impl Stream<Item = Result<T, StreamingError>> + Send + 'static) -> Self {
163161
Self {
164-
input_stream: Box::pin(stream),
165-
output_stream: Box::pin(futures::stream::empty()) as _,
162+
stream: Box::pin(stream),
166163
encoding: PhantomData,
167164
}
168165
}
@@ -184,8 +181,7 @@ where
184181
{
185182
fn from(value: S) -> Self {
186183
Self {
187-
input_stream: Box::pin(value.map(|data| data.map_err(|_| StreamingError::Failed))),
188-
output_stream: Box::pin(futures::stream::empty()) as _,
184+
stream: Box::pin(value.map(|data| data.map_err(|_| StreamingError::Failed))),
189185
encoding: PhantomData,
190186
}
191187
}
@@ -207,7 +203,7 @@ impl IntoResponse for Streaming<String> {
207203
fn into_response(self) -> axum_core::response::Response {
208204
axum::response::Response::builder()
209205
.header("Content-Type", "text/plain; charset=utf-8")
210-
.body(axum::body::Body::from_stream(self.input_stream))
206+
.body(axum::body::Body::from_stream(self.stream))
211207
.unwrap()
212208
}
213209
}
@@ -216,14 +212,14 @@ impl IntoResponse for Streaming<Bytes> {
216212
fn into_response(self) -> axum_core::response::Response {
217213
axum::response::Response::builder()
218214
.header("Content-Type", "application/octet-stream")
219-
.body(axum::body::Body::from_stream(self.input_stream))
215+
.body(axum::body::Body::from_stream(self.stream))
220216
.unwrap()
221217
}
222218
}
223219

224220
impl<T: DeserializeOwned + Serialize + 'static, E: Encoding> IntoResponse for Streaming<T, E> {
225221
fn into_response(self) -> axum_core::response::Response {
226-
let res = self.input_stream.map(|r| match r {
222+
let res = self.stream.map(|r| match r {
227223
Ok(res) => match encode_stream_frame::<T, E>(res) {
228224
Some(bytes) => Ok(bytes),
229225
None => Err(StreamingError::Failed),
@@ -250,8 +246,7 @@ impl FromResponse for Streaming<String> {
250246
}));
251247

252248
Ok(Self {
253-
output_stream: client_stream,
254-
input_stream: Box::pin(futures::stream::empty()),
249+
stream: client_stream,
255250
encoding: PhantomData,
256251
})
257252
})
@@ -269,8 +264,7 @@ impl FromResponse for Streaming<Bytes> {
269264
)));
270265

271266
Ok(Self {
272-
output_stream: client_stream,
273-
input_stream: Box::pin(futures::stream::empty()),
267+
stream: client_stream,
274268
encoding: PhantomData,
275269
})
276270
}
@@ -293,8 +287,7 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding> FromResponse
293287
)));
294288

295289
Ok(Self {
296-
output_stream: client_stream,
297-
input_stream: Box::pin(futures::stream::empty()),
290+
stream: client_stream,
298291
encoding: PhantomData,
299292
})
300293
})
@@ -323,8 +316,7 @@ impl<S> FromRequest<S> for Streaming<String> {
323316
let stream = body.into_data_stream();
324317

325318
Ok(Self {
326-
input_stream: Box::pin(futures::stream::empty()),
327-
output_stream: Box::pin(stream.map(|byte| match byte {
319+
stream: Box::pin(stream.map(|byte| match byte {
328320
Ok(bytes) => match String::from_utf8(bytes.to_vec()) {
329321
Ok(string) => Ok(string),
330322
Err(_) => Err(StreamingError::Decoding),
@@ -359,8 +351,7 @@ impl<S> FromRequest<S> for ByteStream {
359351
let stream = body.into_data_stream();
360352

361353
Ok(Self {
362-
input_stream: Box::pin(futures::stream::empty()),
363-
output_stream: Box::pin(stream.map(|byte| match byte {
354+
stream: Box::pin(stream.map(|byte| match byte {
364355
Ok(bytes) => Ok(bytes),
365356
Err(_) => Err(StreamingError::Failed),
366357
})),
@@ -394,8 +385,7 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding, S> FromReque
394385
let stream = body.into_data_stream();
395386

396387
Ok(Self {
397-
input_stream: Box::pin(futures::stream::empty()),
398-
output_stream: Box::pin(stream.map(|byte| match byte {
388+
stream: Box::pin(stream.map(|byte| match byte {
399389
Ok(bytes) => match decode_stream_frame::<T, E>(bytes) {
400390
Some(res) => Ok(res),
401391
None => Err(StreamingError::Decoding),
@@ -416,7 +406,7 @@ impl IntoRequest for Streaming<String> {
416406
async move {
417407
builder
418408
.header("Content-Type", "text/plain; charset=utf-8")?
419-
.send_body_stream(self.input_stream.map(|e| e.map(Bytes::from)))
409+
.send_body_stream(self.stream.map(|e| e.map(Bytes::from)))
420410
.await
421411
}
422412
}
@@ -430,7 +420,7 @@ impl IntoRequest for ByteStream {
430420
async move {
431421
builder
432422
.header(ContentType::name(), "application/octet-stream")?
433-
.send_body_stream(self.input_stream)
423+
.send_body_stream(self.stream)
434424
.await
435425
}
436426
}
@@ -446,7 +436,7 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding> IntoRequest
446436
async move {
447437
builder
448438
.header("Content-Type", E::stream_content_type())?
449-
.send_body_stream(self.input_stream.map(|r| {
439+
.send_body_stream(self.stream.map(|r| {
450440
r.and_then(|item| {
451441
encode_stream_frame::<T, E>(item).ok_or(StreamingError::Failed)
452442
})

0 commit comments

Comments
 (0)