Skip to content

Commit 3309357

Browse files
taiki-ecramertj
authored andcommitted
Add TryStreamExt::{inspect_ok, inspect_err}
1 parent 87a8780 commit 3309357

File tree

6 files changed

+236
-6
lines changed

6 files changed

+236
-6
lines changed

futures-util/src/stream/inspect.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ impl<St: Stream + FusedStream, F> FusedStream for Inspect<St, F> {
6464
}
6565
}
6666

67+
// used by `TryStreamExt::{inspect_ok, inspect_err}`
68+
#[inline]
69+
pub(crate) fn inspect<T, F: FnMut(&T)>(x: T, mut f: F) -> T {
70+
f(&x);
71+
x
72+
}
73+
6774
impl<St, F> Stream for Inspect<St, F>
6875
where St: Stream,
6976
F: FnMut(&St::Item),
@@ -74,11 +81,10 @@ impl<St, F> Stream for Inspect<St, F>
7481
mut self: Pin<&mut Self>,
7582
cx: &mut Context<'_>,
7683
) -> Poll<Option<St::Item>> {
77-
let item = ready!(self.as_mut().stream().poll_next(cx));
78-
Poll::Ready(item.map(|e| {
79-
(self.as_mut().f())(&e);
80-
e
81-
}))
84+
self.as_mut()
85+
.stream()
86+
.poll_next(cx)
87+
.map(|opt| opt.map(|e| inspect(e, self.as_mut().f())))
8288
}
8389
}
8490

futures-util/src/stream/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ mod into_future;
5858
pub use self::into_future::StreamFuture;
5959

6060
mod inspect;
61+
pub(crate) use self::inspect::inspect; // used by `TryStreamExt::{inspect_ok, inspect_err}`
6162
pub use self::inspect::Inspect;
6263

6364
mod map;
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use crate::stream::inspect;
2+
use core::pin::Pin;
3+
use futures_core::stream::{FusedStream, Stream, TryStream};
4+
use futures_core::task::{Context, Poll};
5+
use futures_sink::Sink;
6+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
7+
8+
/// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
9+
#[derive(Debug)]
10+
#[must_use = "streams do nothing unless polled"]
11+
pub struct InspectErr<St, F> {
12+
stream: St,
13+
f: F,
14+
}
15+
16+
impl<St: TryStream + Unpin, F> Unpin for InspectErr<St, F> {}
17+
18+
impl<St, F> InspectErr<St, F>
19+
where
20+
St: TryStream,
21+
F: FnMut(&St::Error),
22+
{
23+
unsafe_pinned!(stream: St);
24+
unsafe_unpinned!(f: F);
25+
26+
pub(super) fn new(stream: St, f: F) -> Self {
27+
Self { stream, f }
28+
}
29+
30+
/// Acquires a reference to the underlying stream that this combinator is
31+
/// pulling from.
32+
pub fn get_ref(&self) -> &St {
33+
&self.stream
34+
}
35+
36+
/// Acquires a mutable reference to the underlying stream that this
37+
/// combinator is pulling from.
38+
///
39+
/// Note that care must be taken to avoid tampering with the state of the
40+
/// stream which may otherwise confuse this combinator.
41+
pub fn get_mut(&mut self) -> &mut St {
42+
&mut self.stream
43+
}
44+
45+
/// Acquires a pinned mutable reference to the underlying stream that this
46+
/// combinator is pulling from.
47+
///
48+
/// Note that care must be taken to avoid tampering with the state of the
49+
/// stream which may otherwise confuse this combinator.
50+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> {
51+
self.stream()
52+
}
53+
54+
/// Consumes this combinator, returning the underlying stream.
55+
///
56+
/// Note that this may discard intermediate state of this combinator, so
57+
/// care should be taken to avoid losing resources when this is called.
58+
pub fn into_inner(self) -> St {
59+
self.stream
60+
}
61+
}
62+
63+
impl<St: TryStream + FusedStream, F> FusedStream for InspectErr<St, F> {
64+
fn is_terminated(&self) -> bool {
65+
self.stream.is_terminated()
66+
}
67+
}
68+
69+
impl<St, F> Stream for InspectErr<St, F>
70+
where
71+
St: TryStream,
72+
F: FnMut(&St::Error),
73+
{
74+
type Item = Result<St::Ok, St::Error>;
75+
76+
fn poll_next(
77+
mut self: Pin<&mut Self>,
78+
cx: &mut Context<'_>,
79+
) -> Poll<Option<Self::Item>> {
80+
self.as_mut()
81+
.stream()
82+
.try_poll_next(cx)
83+
.map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f()))))
84+
}
85+
}
86+
87+
// Forwarding impl of Sink from the underlying stream
88+
impl<S, F, Item> Sink<Item> for InspectErr<S, F>
89+
where
90+
S: TryStream + Sink<Item>,
91+
F: FnMut(&S::Error),
92+
{
93+
type SinkError = S::SinkError;
94+
95+
delegate_sink!(stream, Item);
96+
}
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use crate::stream::inspect;
2+
use core::pin::Pin;
3+
use futures_core::stream::{FusedStream, Stream, TryStream};
4+
use futures_core::task::{Context, Poll};
5+
use futures_sink::Sink;
6+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
7+
8+
/// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
9+
#[derive(Debug)]
10+
#[must_use = "streams do nothing unless polled"]
11+
pub struct InspectOk<St, F> {
12+
stream: St,
13+
f: F,
14+
}
15+
16+
impl<St: TryStream + Unpin, F> Unpin for InspectOk<St, F> {}
17+
18+
impl<St, F> InspectOk<St, F>
19+
where
20+
St: TryStream,
21+
F: FnMut(&St::Ok),
22+
{
23+
unsafe_pinned!(stream: St);
24+
unsafe_unpinned!(f: F);
25+
26+
pub(super) fn new(stream: St, f: F) -> Self {
27+
Self { stream, f }
28+
}
29+
30+
/// Acquires a reference to the underlying stream that this combinator is
31+
/// pulling from.
32+
pub fn get_ref(&self) -> &St {
33+
&self.stream
34+
}
35+
36+
/// Acquires a mutable reference to the underlying stream that this
37+
/// combinator is pulling from.
38+
///
39+
/// Note that care must be taken to avoid tampering with the state of the
40+
/// stream which may otherwise confuse this combinator.
41+
pub fn get_mut(&mut self) -> &mut St {
42+
&mut self.stream
43+
}
44+
45+
/// Acquires a pinned mutable reference to the underlying stream that this
46+
/// combinator is pulling from.
47+
///
48+
/// Note that care must be taken to avoid tampering with the state of the
49+
/// stream which may otherwise confuse this combinator.
50+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> {
51+
self.stream()
52+
}
53+
54+
/// Consumes this combinator, returning the underlying stream.
55+
///
56+
/// Note that this may discard intermediate state of this combinator, so
57+
/// care should be taken to avoid losing resources when this is called.
58+
pub fn into_inner(self) -> St {
59+
self.stream
60+
}
61+
}
62+
63+
impl<St: TryStream + FusedStream, F> FusedStream for InspectOk<St, F> {
64+
fn is_terminated(&self) -> bool {
65+
self.stream.is_terminated()
66+
}
67+
}
68+
69+
impl<St, F> Stream for InspectOk<St, F>
70+
where
71+
St: TryStream,
72+
F: FnMut(&St::Ok),
73+
{
74+
type Item = Result<St::Ok, St::Error>;
75+
76+
fn poll_next(
77+
mut self: Pin<&mut Self>,
78+
cx: &mut Context<'_>,
79+
) -> Poll<Option<Self::Item>> {
80+
self.as_mut()
81+
.stream()
82+
.try_poll_next(cx)
83+
.map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f()))))
84+
}
85+
}
86+
87+
// Forwarding impl of Sink from the underlying stream
88+
impl<S, F, Item> Sink<Item> for InspectOk<S, F>
89+
where
90+
S: TryStream + Sink<Item>,
91+
F: FnMut(&S::Ok),
92+
{
93+
type SinkError = S::SinkError;
94+
95+
delegate_sink!(stream, Item);
96+
}

futures-util/src/try_stream/mod.rs

+31
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ pub use self::and_then::AndThen;
1717
mod err_into;
1818
pub use self::err_into::ErrInto;
1919

20+
mod inspect_ok;
21+
pub use self::inspect_ok::InspectOk;
22+
23+
mod inspect_err;
24+
pub use self::inspect_err::InspectErr;
25+
2026
mod into_stream;
2127
pub use self::into_stream::IntoStream;
2228

@@ -223,6 +229,31 @@ pub trait TryStreamExt: TryStream {
223229
OrElse::new(self, f)
224230
}
225231

232+
/// Do something with the success value of this stream, afterwards passing
233+
/// it on.
234+
///
235+
/// This is similar to the `StreamExt::inspect` method where it allows
236+
/// easily inspecting the success value as it passes through the stream, for
237+
/// example to debug what's going on.
238+
fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
239+
where F: FnMut(&Self::Ok),
240+
Self: Sized,
241+
{
242+
InspectOk::new(self, f)
243+
}
244+
245+
/// Do something with the error value of this stream, afterwards passing it on.
246+
///
247+
/// This is similar to the `StreamExt::inspect` method where it allows
248+
/// easily inspecting the error value as it passes through the stream, for
249+
/// example to debug what's going on.
250+
fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
251+
where F: FnMut(&Self::Error),
252+
Self: Sized,
253+
{
254+
InspectErr::new(self, f)
255+
}
256+
226257
/// Wraps a [`TryStream`] into a type that implements
227258
/// [`Stream`](futures_core::Stream)
228259
///

futures/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -404,10 +404,10 @@ pub mod stream {
404404
pub use futures_util::try_stream::{
405405
TryStreamExt,
406406
AndThen, ErrInto, MapOk, MapErr, OrElse,
407+
InspectOk, InspectErr,
407408
TryNext, TryForEach, TryFilterMap,
408409
TryCollect, TryFold, TrySkipWhile,
409410
IntoStream,
410-
// ToDo: InspectErr
411411
};
412412

413413
#[cfg_attr(

0 commit comments

Comments
 (0)