Skip to content

Commit 6435bc4

Browse files
committed
Implement stream::inspect
1 parent 2ec9d2d commit 6435bc4

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ Stream
4949
- [x] stream::flatten
5050
- [x] stream::fold
5151
- [x] stream::for_each
52+
- [x] stream::inspect
5253
- [x] stream::into_future
5354
- [x] stream::iter
5455
- [x] stream::map

src/stream.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,26 @@ where
219219
})
220220
}
221221

222+
/// Do something with each item of this stream, afterwards passing it on.
223+
///
224+
/// This is similar to the `Iterator::inspect` method in the standard
225+
/// library where it allows easily inspecting each value as it passes
226+
/// through the stream, for example to debug what's going on.
227+
pub fn inspect<St, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
228+
where
229+
St: Stream,
230+
F: FnMut(&St::Item),
231+
{
232+
let stream = Box::pin(stream);
233+
crate::stream::unfold((stream, f), |(mut stream, mut f)| async {
234+
let item = next(&mut stream).await;
235+
item.map(|item| {
236+
f(&item);
237+
(item, (stream, f))
238+
})
239+
})
240+
}
241+
222242
/// Converts this stream into a future of `(next_item, tail_of_stream)`.
223243
/// If the stream terminates, then the next item is [`None`].
224244
///
@@ -932,6 +952,19 @@ mod tests {
932952
);
933953
}
934954

955+
#[test]
956+
fn test_inspect() {
957+
let mut seen = Vec::new();
958+
let stream = iter(1..=3);
959+
let stream = inspect(stream, |x| seen.push(*x));
960+
961+
assert_eq!(
962+
vec![1, 2, 3],
963+
executor::block_on(collect::<_, Vec<_>>(stream))
964+
);
965+
assert_eq!(seen, [1, 2, 3]);
966+
}
967+
935968
#[test]
936969
fn test_into_future() {
937970
let stream = iter(1..=2);

0 commit comments

Comments
 (0)