Skip to content

Commit 850b8ae

Browse files
authored
Merge pull request #543 from k-nasa/stream_unzip
Add stream unzip
2 parents ac7a796 + d146d95 commit 850b8ae

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

src/stream/stream/mod.rs

+40
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ cfg_unstable! {
123123

124124
use count::CountFuture;
125125
use partition::PartitionFuture;
126+
use unzip::UnzipFuture;
126127

127128
pub use merge::Merge;
128129
pub use flatten::Flatten;
@@ -137,6 +138,7 @@ cfg_unstable! {
137138
mod partition;
138139
mod timeout;
139140
mod throttle;
141+
mod unzip;
140142
}
141143

142144
extension_trait! {
@@ -1748,6 +1750,44 @@ extension_trait! {
17481750
Zip::new(self, other)
17491751
}
17501752

1753+
#[doc = r#"
1754+
Converts an stream of pairs into a pair of containers.
1755+
1756+
`unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements.
1757+
1758+
This function is, in some sense, the opposite of [`zip`].
1759+
1760+
[`zip`]: trait.Stream.html#method.zip
1761+
1762+
# Example
1763+
1764+
```
1765+
# fn main() { async_std::task::block_on(async {
1766+
#
1767+
use async_std::prelude::*;
1768+
use async_std::stream;
1769+
1770+
let s = stream::from_iter(vec![(1,2), (3,4)]);
1771+
1772+
let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1773+
1774+
assert_eq!(left, [1, 3]);
1775+
assert_eq!(right, [2, 4]);
1776+
#
1777+
# }) }
1778+
```
1779+
"#]
1780+
#[cfg(feature = "unstable")]
1781+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1782+
fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>]
1783+
where
1784+
FromA: Default + Extend<A>,
1785+
FromB: Default + Extend<B>,
1786+
Self: Stream<Item = (A, B)> + Sized,
1787+
{
1788+
UnzipFuture::new(self)
1789+
}
1790+
17511791
#[doc = r#"
17521792
Transforms a stream into a collection.
17531793

src/stream/stream/unzip.rs

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
4+
use pin_project_lite::pin_project;
5+
6+
use crate::stream::Stream;
7+
use crate::task::{Context, Poll};
8+
9+
pin_project! {
10+
#[derive(Clone, Debug)]
11+
#[cfg(all(feature = "default", feature = "unstable"))]
12+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
13+
pub struct UnzipFuture<S, FromA, FromB> {
14+
#[pin]
15+
stream: S,
16+
res: Option<(FromA, FromB)>,
17+
}
18+
}
19+
20+
impl<S: Stream, FromA, FromB> UnzipFuture<S, FromA, FromB>
21+
where
22+
FromA: Default,
23+
FromB: Default,
24+
{
25+
pub(super) fn new(stream: S) -> Self {
26+
UnzipFuture {
27+
stream,
28+
res: Some((FromA::default(), FromB::default())),
29+
}
30+
}
31+
}
32+
33+
impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
34+
where
35+
S: Stream<Item = (A, B)>,
36+
FromA: Default + Extend<A>,
37+
FromB: Default + Extend<B>,
38+
{
39+
type Output = (FromA, FromB);
40+
41+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42+
let mut this = self.project();
43+
44+
loop {
45+
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
46+
47+
match next {
48+
Some((a, b)) => {
49+
let res = this.res.as_mut().unwrap();
50+
res.0.extend(Some(a));
51+
res.1.extend(Some(b));
52+
}
53+
None => return Poll::Ready(this.res.take().unwrap()),
54+
}
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)