Skip to content

Commit 4284055

Browse files
taiki-ecramertj
authored andcommitted
Re-add and_then and or_else to TryStreamExt
1 parent 151d68c commit 4284055

File tree

4 files changed

+274
-3
lines changed

4 files changed

+274
-3
lines changed
+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use core::pin::Pin;
2+
use futures_core::future::TryFuture;
3+
use futures_core::stream::{Stream, TryStream};
4+
use futures_core::task::{Context, Poll};
5+
use futures_sink::Sink;
6+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
7+
8+
/// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
9+
#[derive(Debug)]
10+
#[must_use = "streams do nothing unless polled"]
11+
pub struct AndThen<St, Fut, F> {
12+
stream: St,
13+
future: Option<Fut>,
14+
f: F,
15+
}
16+
17+
impl<St, Fut, F> AndThen<St, Fut, F>
18+
where St: TryStream,
19+
F: FnMut(St::Ok) -> Fut,
20+
Fut: TryFuture<Error = St::Error>,
21+
{
22+
unsafe_pinned!(stream: St);
23+
unsafe_pinned!(future: Option<Fut>);
24+
unsafe_unpinned!(f: F);
25+
26+
pub(super) fn new(stream: St, f: F) -> Self {
27+
Self { stream, future: None, f }
28+
}
29+
30+
/// Acquires a reference to the underlying stream that this combinator is
31+
/// pulling from.
32+
pub fn get_ref(&self) -> &St {
33+
&self.stream
34+
}
35+
36+
/// Acquires a mutable reference to the underlying stream that this
37+
/// combinator is pulling from.
38+
///
39+
/// Note that care must be taken to avoid tampering with the state of the
40+
/// stream which may otherwise confuse this combinator.
41+
pub fn get_mut(&mut self) -> &mut St {
42+
&mut self.stream
43+
}
44+
45+
/// Consumes this combinator, returning the underlying stream.
46+
///
47+
/// Note that this may discard intermediate state of this combinator, so
48+
/// care should be taken to avoid losing resources when this is called.
49+
pub fn into_inner(self) -> St {
50+
self.stream
51+
}
52+
}
53+
54+
impl<St, Fut, F> Stream for AndThen<St, Fut, F>
55+
where St: TryStream,
56+
F: FnMut(St::Ok) -> Fut,
57+
Fut: TryFuture<Error = St::Error>,
58+
{
59+
type Item = Result<Fut::Ok, St::Error>;
60+
61+
fn poll_next(
62+
mut self: Pin<&mut Self>,
63+
cx: &mut Context<'_>,
64+
) -> Poll<Option<Self::Item>> {
65+
if self.future.is_none() {
66+
let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
67+
None => return Poll::Ready(None),
68+
Some(e) => e,
69+
};
70+
let fut = (self.as_mut().f())(item);
71+
self.as_mut().future().set(Some(fut));
72+
}
73+
74+
assert!(self.future.is_some());
75+
match ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)) {
76+
Ok(e) => {
77+
self.as_mut().future().set(None);
78+
Poll::Ready(Some(Ok(e)))
79+
}
80+
Err(e) => {
81+
self.as_mut().future().set(None);
82+
Poll::Ready(Some(Err(e)))
83+
}
84+
}
85+
}
86+
}
87+
88+
// Forwarding impl of Sink from the underlying stream
89+
impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F>
90+
where S: TryStream + Sink<Item>,
91+
F: FnMut(S::Ok) -> Fut,
92+
Fut: TryFuture<Error = S::Error>,
93+
{
94+
type SinkError = S::SinkError;
95+
96+
delegate_sink!(stream, Item);
97+
}

futures-util/src/try_stream/mod.rs

+77
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ use futures_core::task::{Context, Poll};
1111
#[cfg(feature = "compat")]
1212
use crate::compat::Compat;
1313

14+
mod and_then;
15+
pub use self::and_then::AndThen;
16+
1417
mod err_into;
1518
pub use self::err_into::ErrInto;
1619

@@ -23,6 +26,9 @@ pub use self::map_ok::MapOk;
2326
mod map_err;
2427
pub use self::map_err::MapErr;
2528

29+
mod or_else;
30+
pub use self::or_else::OrElse;
31+
2632
mod try_next;
2733
pub use self::try_next::TryNext;
2834

@@ -145,6 +151,77 @@ pub trait TryStreamExt: TryStream {
145151
MapErr::new(self, f)
146152
}
147153

154+
/// Chain on a computation for when a value is ready, passing the successful
155+
/// results to the provided closure `f`.
156+
///
157+
/// This function can be used to run a unit of work when the next successful
158+
/// value on a stream is ready. The closure provided will be yielded a value
159+
/// when ready, and the returned future will then be run to completion to
160+
/// produce the next value on this stream.
161+
///
162+
/// Any errors produced by this stream will not be passed to the closure,
163+
/// and will be passed through.
164+
///
165+
/// The returned value of the closure must implement the `TryFuture` trait
166+
/// and can represent some more work to be done before the composed stream
167+
/// is finished.
168+
///
169+
/// Note that this function consumes the receiving stream and returns a
170+
/// wrapped version of it.
171+
///
172+
/// To process the entire stream and return a single future representing
173+
/// success or error, use `try_for_each` instead.
174+
///
175+
/// # Examples
176+
///
177+
/// ```
178+
/// use futures::channel::mpsc;
179+
/// use futures::future;
180+
/// use futures::stream::TryStreamExt;
181+
///
182+
/// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
183+
///
184+
/// let rx = rx.and_then(|result| {
185+
/// future::ok(if result % 2 == 0 {
186+
/// Some(result)
187+
/// } else {
188+
/// None
189+
/// })
190+
/// });
191+
/// ```
192+
fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
193+
where F: FnMut(Self::Ok) -> Fut,
194+
Fut: TryFuture<Error = Self::Error>,
195+
Self: Sized,
196+
{
197+
AndThen::new(self, f)
198+
}
199+
200+
/// Chain on a computation for when an error happens, passing the
201+
/// erroneous result to the provided closure `f`.
202+
///
203+
/// This function can be used to run a unit of work and attempt to recover from
204+
/// an error if one happens. The closure provided will be yielded an error
205+
/// when one appears, and the returned future will then be run to completion
206+
/// to produce the next value on this stream.
207+
///
208+
/// Any successful values produced by this stream will not be passed to the
209+
/// closure, and will be passed through.
210+
///
211+
/// The returned value of the closure must implement the [`TryFuture`] trait
212+
/// and can represent some more work to be done before the composed stream
213+
/// is finished.
214+
///
215+
/// Note that this function consumes the receiving stream and returns a
216+
/// wrapped version of it.
217+
fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
218+
where F: FnMut(Self::Error) -> Fut,
219+
Fut: TryFuture<Ok = Self::Ok>,
220+
Self: Sized,
221+
{
222+
OrElse::new(self, f)
223+
}
224+
148225
/// Wraps a [`TryStream`] into a type that implements
149226
/// [`Stream`](futures_core::Stream)
150227
///
+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use core::pin::Pin;
2+
use futures_core::future::TryFuture;
3+
use futures_core::stream::{Stream, TryStream};
4+
use futures_core::task::{Context, Poll};
5+
use futures_sink::Sink;
6+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
7+
8+
/// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
9+
#[derive(Debug)]
10+
#[must_use = "streams do nothing unless polled"]
11+
pub struct OrElse<St, Fut, F> {
12+
stream: St,
13+
future: Option<Fut>,
14+
f: F,
15+
}
16+
17+
impl<St, Fut, F> OrElse<St, Fut, F>
18+
where St: TryStream,
19+
F: FnMut(St::Error) -> Fut,
20+
Fut: TryFuture<Ok = St::Ok>,
21+
{
22+
unsafe_pinned!(stream: St);
23+
unsafe_pinned!(future: Option<Fut>);
24+
unsafe_unpinned!(f: F);
25+
26+
pub(super) fn new(stream: St, f: F) -> Self {
27+
Self { stream, future: None, f }
28+
}
29+
30+
/// Acquires a reference to the underlying stream that this combinator is
31+
/// pulling from.
32+
pub fn get_ref(&self) -> &St {
33+
&self.stream
34+
}
35+
36+
/// Acquires a mutable reference to the underlying stream that this
37+
/// combinator is pulling from.
38+
///
39+
/// Note that care must be taken to avoid tampering with the state of the
40+
/// stream which may otherwise confuse this combinator.
41+
pub fn get_mut(&mut self) -> &mut St {
42+
&mut self.stream
43+
}
44+
45+
/// Consumes this combinator, returning the underlying stream.
46+
///
47+
/// Note that this may discard intermediate state of this combinator, so
48+
/// care should be taken to avoid losing resources when this is called.
49+
pub fn into_inner(self) -> St {
50+
self.stream
51+
}
52+
}
53+
54+
impl<St, Fut, F> Stream for OrElse<St, Fut, F>
55+
where St: TryStream,
56+
F: FnMut(St::Error) -> Fut,
57+
Fut: TryFuture<Ok = St::Ok>,
58+
{
59+
type Item = Result<St::Ok, Fut::Error>;
60+
61+
fn poll_next(
62+
mut self: Pin<&mut Self>,
63+
cx: &mut Context<'_>,
64+
) -> Poll<Option<Self::Item>> {
65+
if self.future.is_none() {
66+
let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
67+
None => return Poll::Ready(None),
68+
Some(Ok(e)) => return Poll::Ready(Some(Ok(e))),
69+
Some(Err(e)) => e,
70+
};
71+
let fut = (self.as_mut().f())(item);
72+
self.as_mut().future().set(Some(fut));
73+
}
74+
75+
assert!(self.future.is_some());
76+
match ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)) {
77+
Ok(e) => {
78+
self.as_mut().future().set(None);
79+
Poll::Ready(Some(Ok(e)))
80+
}
81+
Err(e) => {
82+
self.as_mut().future().set(None);
83+
Poll::Ready(Some(Err(e)))
84+
}
85+
}
86+
}
87+
}
88+
89+
// Forwarding impl of Sink from the underlying stream
90+
impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F>
91+
where S: TryStream + Sink<Item>,
92+
F: FnMut(S::Error) -> Fut,
93+
Fut: TryFuture<Ok = S::Ok>,
94+
{
95+
type SinkError = S::SinkError;
96+
97+
delegate_sink!(stream, Item);
98+
}

futures/src/lib.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,11 @@ pub mod stream {
383383

384384
pub use futures_util::try_stream::{
385385
TryStreamExt,
386-
ErrInto, MapOk, MapErr,
386+
AndThen, ErrInto, MapOk, MapErr, OrElse,
387387
TryNext, TryForEach, TryFilterMap,
388388
TryCollect, TryFold, TrySkipWhile,
389389
IntoStream,
390-
// ToDo: AndThen, InspectErr, OrElse
390+
// ToDo: InspectErr
391391
};
392392

393393
#[cfg_attr(
@@ -398,7 +398,6 @@ pub mod stream {
398398
pub use futures_util::try_stream::{
399399
// For TryStreamExt:
400400
TryBufferUnordered, TryForEachConcurrent,
401-
// ToDo: AndThen, InspectErr, OrElse
402401
};
403402

404403
#[cfg(feature = "std")]

0 commit comments

Comments
 (0)