Skip to content

Commit 265f1ff

Browse files
bors[bot]tirr-c
andauthored
Merge #204
204: Add Stream::zip r=stjepang a=tirr-c Co-authored-by: Wonwoo Choi <[email protected]>
2 parents dd92d8d + 73db46c commit 265f1ff

File tree

3 files changed

+100
-1
lines changed

3 files changed

+100
-1
lines changed

src/stream/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub use double_ended_stream::DoubleEndedStream;
2525
pub use empty::{empty, Empty};
2626
pub use once::{once, Once};
2727
pub use repeat::{repeat, Repeat};
28-
pub use stream::{Scan, Stream, Take};
28+
pub use stream::{Scan, Stream, Take, Zip};
2929

3030
mod double_ended_stream;
3131
mod empty;

src/stream/stream/mod.rs

+45
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ mod next;
3131
mod nth;
3232
mod scan;
3333
mod take;
34+
mod zip;
3435

3536
pub use scan::Scan;
3637
pub use take::Take;
38+
pub use zip::Zip;
3739

3840
use all::AllFuture;
3941
use any::AnyFuture;
@@ -545,6 +547,49 @@ pub trait Stream {
545547
{
546548
Scan::new(self, initial_state, f)
547549
}
550+
551+
/// 'Zips up' two streams into a single stream of pairs.
552+
///
553+
/// `zip()` returns a new stream that will iterate over two other streams, returning a tuple
554+
/// where the first element comes from the first stream, and the second element comes from the
555+
/// second stream.
556+
///
557+
/// In other words, it zips two streams together, into a single one.
558+
///
559+
/// If either stream returns [`None`], [`poll_next`] from the zipped stream will return
560+
/// [`None`]. If the first stream returns [`None`], `zip` will short-circuit and `poll_next`
561+
/// will not be called on the second stream.
562+
///
563+
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
564+
/// [`poll_next`]: #tymethod.poll_next
565+
///
566+
/// ## Examples
567+
///
568+
/// ```
569+
/// # fn main() { async_std::task::block_on(async {
570+
/// #
571+
/// use std::collections::VecDeque;
572+
/// use async_std::stream::Stream;
573+
///
574+
/// let l: VecDeque<isize> = vec![1, 2, 3].into_iter().collect();
575+
/// let r: VecDeque<isize> = vec![4, 5, 6, 7].into_iter().collect();
576+
/// let mut s = l.zip(r);
577+
///
578+
/// assert_eq!(s.next().await, Some((1, 4)));
579+
/// assert_eq!(s.next().await, Some((2, 5)));
580+
/// assert_eq!(s.next().await, Some((3, 6)));
581+
/// assert_eq!(s.next().await, None);
582+
/// #
583+
/// # }) }
584+
/// ```
585+
#[inline]
586+
fn zip<U>(self, other: U) -> Zip<Self, U>
587+
where
588+
Self: Sized,
589+
U: Stream,
590+
{
591+
Zip::new(self, other)
592+
}
548593
}
549594

550595
impl<T: futures_core::stream::Stream + ?Sized> Stream for T {

src/stream/stream/zip.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::fmt;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
/// An iterator that iterates two other iterators simultaneously.
8+
pub struct Zip<A: Stream, B> {
9+
item_slot: Option<A::Item>,
10+
first: A,
11+
second: B,
12+
}
13+
14+
impl<A: fmt::Debug + Stream, B: fmt::Debug> fmt::Debug for Zip<A, B> {
15+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
16+
fmt.debug_struct("Zip")
17+
.field("first", &self.first)
18+
.field("second", &self.second)
19+
.finish()
20+
}
21+
}
22+
23+
impl<A: Unpin + Stream, B: Unpin> Unpin for Zip<A, B> {}
24+
25+
impl<A: Stream, B> Zip<A, B> {
26+
pub(crate) fn new(first: A, second: B) -> Self {
27+
Zip {
28+
item_slot: None,
29+
first,
30+
second,
31+
}
32+
}
33+
34+
pin_utils::unsafe_unpinned!(item_slot: Option<A::Item>);
35+
pin_utils::unsafe_pinned!(first: A);
36+
pin_utils::unsafe_pinned!(second: B);
37+
}
38+
39+
impl<A: Stream, B: Stream> futures_core::stream::Stream for Zip<A, B> {
40+
type Item = (A::Item, B::Item);
41+
42+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
if self.as_mut().item_slot().is_none() {
44+
match self.as_mut().first().poll_next(cx) {
45+
Poll::Pending => return Poll::Pending,
46+
Poll::Ready(None) => return Poll::Ready(None),
47+
Poll::Ready(Some(item)) => *self.as_mut().item_slot() = Some(item),
48+
}
49+
}
50+
let second_item = futures_core::ready!(self.as_mut().second().poll_next(cx));
51+
let first_item = self.as_mut().item_slot().take().unwrap();
52+
Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
53+
}
54+
}

0 commit comments

Comments
 (0)