Skip to content

Commit cfdb8f3

Browse files
bors[bot]yoshuawuytsStjepan Glavina
authored
Merge #149 #150
149: update deps r=stjepang a=yoshuawuyts Updates all deps. Thanks! 150: split stream into multiple files r=stjepang a=yoshuawuyts This splits `stream/mod.rs`'s combinators into multiple files, making it easier to contribute combinators. Additionally we've renamed `MinBy` to `MinByFuture` to make it the same as the other private futures. Ref #146 #129. Thanks! Co-authored-by: Yoshua Wuyts <[email protected]> Co-authored-by: Stjepan Glavina <[email protected]>
3 parents 8d3d80a + ff440db + d7b3a55 commit cfdb8f3

File tree

8 files changed

+181
-158
lines changed

8 files changed

+181
-158
lines changed

Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,18 @@ crossbeam-channel = "0.3.9"
3030
futures-core-preview = "0.3.0-alpha.18"
3131
futures-io-preview = "0.3.0-alpha.18"
3232
futures-timer = "0.4.0"
33-
lazy_static = "1.3.0"
33+
lazy_static = "1.4.0"
3434
log = { version = "0.4.8", features = ["kv_unstable"] }
3535
memchr = "2.2.1"
3636
mio = "0.6.19"
3737
mio-uds = "0.6.7"
38-
num_cpus = "1.10.0"
38+
num_cpus = "1.10.1"
3939
pin-utils = "0.1.0-alpha.4"
4040
slab = "0.4.2"
4141

4242
[dev-dependencies]
43-
femme = "1.1.0"
44-
surf = "1.0.1"
43+
femme = "1.2.0"
44+
surf = "1.0.2"
4545
tempdir = "0.3.7"
4646

4747
[dev-dependencies.futures-preview]

src/stream/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ pub use repeat::{repeat, Repeat};
2727
pub use stream::{Stream, Take};
2828

2929
mod empty;
30-
mod min_by;
3130
mod once;
3231
mod repeat;
3332
mod stream;

src/stream/stream/all.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use crate::future::Future;
2+
use crate::task::{Context, Poll};
3+
4+
use std::marker::PhantomData;
5+
use std::pin::Pin;
6+
7+
#[derive(Debug)]
8+
pub struct AllFuture<'a, S, F, T>
9+
where
10+
F: FnMut(T) -> bool,
11+
{
12+
pub(crate) stream: &'a mut S,
13+
pub(crate) f: F,
14+
pub(crate) result: bool,
15+
pub(crate) __item: PhantomData<T>,
16+
}
17+
18+
impl<'a, S, F, T> AllFuture<'a, S, F, T>
19+
where
20+
F: FnMut(T) -> bool,
21+
{
22+
pin_utils::unsafe_pinned!(stream: &'a mut S);
23+
pin_utils::unsafe_unpinned!(result: bool);
24+
pin_utils::unsafe_unpinned!(f: F);
25+
}
26+
27+
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
28+
where
29+
S: futures_core::stream::Stream + Unpin + Sized,
30+
F: FnMut(S::Item) -> bool,
31+
{
32+
type Output = bool;
33+
34+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35+
use futures_core::stream::Stream;
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
match next {
38+
Some(v) => {
39+
let result = (self.as_mut().f())(v);
40+
*self.as_mut().result() = result;
41+
if result {
42+
// don't forget to wake this task again to pull the next item from stream
43+
cx.waker().wake_by_ref();
44+
Poll::Pending
45+
} else {
46+
Poll::Ready(false)
47+
}
48+
}
49+
None => Poll::Ready(self.result),
50+
}
51+
}
52+
}

src/stream/stream/any.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use crate::future::Future;
2+
use crate::task::{Context, Poll};
3+
4+
use std::marker::PhantomData;
5+
use std::pin::Pin;
6+
7+
#[derive(Debug)]
8+
pub struct AnyFuture<'a, S, F, T>
9+
where
10+
F: FnMut(T) -> bool,
11+
{
12+
pub(crate) stream: &'a mut S,
13+
pub(crate) f: F,
14+
pub(crate) result: bool,
15+
pub(crate) __item: PhantomData<T>,
16+
}
17+
18+
impl<'a, S, F, T> AnyFuture<'a, S, F, T>
19+
where
20+
F: FnMut(T) -> bool,
21+
{
22+
pin_utils::unsafe_pinned!(stream: &'a mut S);
23+
pin_utils::unsafe_unpinned!(result: bool);
24+
pin_utils::unsafe_unpinned!(f: F);
25+
}
26+
27+
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
28+
where
29+
S: futures_core::stream::Stream + Unpin + Sized,
30+
F: FnMut(S::Item) -> bool,
31+
{
32+
type Output = bool;
33+
34+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35+
use futures_core::stream::Stream;
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
match next {
38+
Some(v) => {
39+
let result = (self.as_mut().f())(v);
40+
*self.as_mut().result() = result;
41+
if result {
42+
Poll::Ready(true)
43+
} else {
44+
// don't forget to wake this task again to pull the next item from stream
45+
cx.waker().wake_by_ref();
46+
Poll::Pending
47+
}
48+
}
49+
None => Poll::Ready(self.result),
50+
}
51+
}
52+
}

src/stream/min_by.rs renamed to src/stream/stream/min_by.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,31 @@
11
use std::cmp::Ordering;
22
use std::pin::Pin;
33

4-
use super::stream::Stream;
54
use crate::future::Future;
5+
use crate::stream::Stream;
66
use crate::task::{Context, Poll};
77

88
/// A future that yields the minimum item in a stream by a given comparison function.
99
#[derive(Clone, Debug)]
10-
pub struct MinBy<S: Stream, F> {
10+
pub struct MinByFuture<S: Stream, F> {
1111
stream: S,
1212
compare: F,
1313
min: Option<S::Item>,
1414
}
1515

16-
impl<S: Stream + Unpin, F> Unpin for MinBy<S, F> {}
16+
impl<S: Stream + Unpin, F> Unpin for MinByFuture<S, F> {}
1717

18-
impl<S: Stream + Unpin, F> MinBy<S, F> {
18+
impl<S: Stream + Unpin, F> MinByFuture<S, F> {
1919
pub(super) fn new(stream: S, compare: F) -> Self {
20-
MinBy {
20+
MinByFuture {
2121
stream,
2222
compare,
2323
min: None,
2424
}
2525
}
2626
}
2727

28-
impl<S, F> Future for MinBy<S, F>
28+
impl<S, F> Future for MinByFuture<S, F>
2929
where
3030
S: futures_core::stream::Stream + Unpin,
3131
S::Item: Copy,

src/stream/stream.rs renamed to src/stream/stream/mod.rs

+16-147
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,24 @@
2121
//! # }) }
2222
//! ```
2323
24-
use std::cmp::Ordering;
25-
use std::pin::Pin;
24+
mod all;
25+
mod any;
26+
mod min_by;
27+
mod next;
28+
mod take;
2629

27-
use cfg_if::cfg_if;
30+
pub use take::Take;
31+
32+
use all::AllFuture;
33+
use any::AnyFuture;
34+
use min_by::MinByFuture;
35+
use next::NextFuture;
2836

29-
use super::min_by::MinBy;
30-
use crate::future::Future;
31-
use crate::task::{Context, Poll};
37+
use std::cmp::Ordering;
3238
use std::marker::PhantomData;
3339

40+
use cfg_if::cfg_if;
41+
3442
cfg_if! {
3543
if #[cfg(feature = "docs")] {
3644
#[doc(hidden)]
@@ -145,12 +153,12 @@ pub trait Stream {
145153
/// #
146154
/// # }) }
147155
/// ```
148-
fn min_by<F>(self, compare: F) -> MinBy<Self, F>
156+
fn min_by<F>(self, compare: F) -> MinByFuture<Self, F>
149157
where
150158
Self: Sized + Unpin,
151159
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
152160
{
153-
MinBy::new(self, compare)
161+
MinByFuture::new(self, compare)
154162
}
155163

156164
/// Tests if every element of the stream matches a predicate.
@@ -278,142 +286,3 @@ impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T {
278286
NextFuture { stream: self }
279287
}
280288
}
281-
282-
#[doc(hidden)]
283-
#[allow(missing_debug_implementations)]
284-
pub struct NextFuture<'a, T: Unpin + ?Sized> {
285-
stream: &'a mut T,
286-
}
287-
288-
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
289-
type Output = Option<T::Item>;
290-
291-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
292-
Pin::new(&mut *self.stream).poll_next(cx)
293-
}
294-
}
295-
296-
/// A stream that yields the first `n` items of another stream.
297-
#[derive(Clone, Debug)]
298-
pub struct Take<S> {
299-
stream: S,
300-
remaining: usize,
301-
}
302-
303-
impl<S: Unpin> Unpin for Take<S> {}
304-
305-
impl<S: futures_core::stream::Stream> Take<S> {
306-
pin_utils::unsafe_pinned!(stream: S);
307-
pin_utils::unsafe_unpinned!(remaining: usize);
308-
}
309-
310-
impl<S: futures_core::stream::Stream> futures_core::stream::Stream for Take<S> {
311-
type Item = S::Item;
312-
313-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
314-
if self.remaining == 0 {
315-
Poll::Ready(None)
316-
} else {
317-
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
318-
match next {
319-
Some(_) => *self.as_mut().remaining() -= 1,
320-
None => *self.as_mut().remaining() = 0,
321-
}
322-
Poll::Ready(next)
323-
}
324-
}
325-
}
326-
327-
#[derive(Debug)]
328-
pub struct AllFuture<'a, S, F, T>
329-
where
330-
F: FnMut(T) -> bool,
331-
{
332-
stream: &'a mut S,
333-
f: F,
334-
result: bool,
335-
__item: PhantomData<T>,
336-
}
337-
338-
impl<'a, S, F, T> AllFuture<'a, S, F, T>
339-
where
340-
F: FnMut(T) -> bool,
341-
{
342-
pin_utils::unsafe_pinned!(stream: &'a mut S);
343-
pin_utils::unsafe_unpinned!(result: bool);
344-
pin_utils::unsafe_unpinned!(f: F);
345-
}
346-
347-
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
348-
where
349-
S: futures_core::stream::Stream + Unpin + Sized,
350-
F: FnMut(S::Item) -> bool,
351-
{
352-
type Output = bool;
353-
354-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
355-
use futures_core::stream::Stream;
356-
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
357-
match next {
358-
Some(v) => {
359-
let result = (self.as_mut().f())(v);
360-
*self.as_mut().result() = result;
361-
if result {
362-
// don't forget to wake this task again to pull the next item from stream
363-
cx.waker().wake_by_ref();
364-
Poll::Pending
365-
} else {
366-
Poll::Ready(false)
367-
}
368-
}
369-
None => Poll::Ready(self.result),
370-
}
371-
}
372-
}
373-
374-
#[derive(Debug)]
375-
pub struct AnyFuture<'a, S, F, T>
376-
where
377-
F: FnMut(T) -> bool,
378-
{
379-
stream: &'a mut S,
380-
f: F,
381-
result: bool,
382-
__item: PhantomData<T>,
383-
}
384-
385-
impl<'a, S, F, T> AnyFuture<'a, S, F, T>
386-
where
387-
F: FnMut(T) -> bool,
388-
{
389-
pin_utils::unsafe_pinned!(stream: &'a mut S);
390-
pin_utils::unsafe_unpinned!(result: bool);
391-
pin_utils::unsafe_unpinned!(f: F);
392-
}
393-
394-
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
395-
where
396-
S: futures_core::stream::Stream + Unpin + Sized,
397-
F: FnMut(S::Item) -> bool,
398-
{
399-
type Output = bool;
400-
401-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
402-
use futures_core::stream::Stream;
403-
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
404-
match next {
405-
Some(v) => {
406-
let result = (self.as_mut().f())(v);
407-
*self.as_mut().result() = result;
408-
if result {
409-
Poll::Ready(true)
410-
} else {
411-
// don't forget to wake this task again to pull the next item from stream
412-
cx.waker().wake_by_ref();
413-
Poll::Pending
414-
}
415-
}
416-
None => Poll::Ready(self.result),
417-
}
418-
}
419-
}

src/stream/stream/next.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use crate::future::Future;
2+
use crate::task::{Context, Poll};
3+
use std::pin::Pin;
4+
5+
#[doc(hidden)]
6+
#[allow(missing_debug_implementations)]
7+
pub struct NextFuture<'a, T: Unpin + ?Sized> {
8+
pub(crate) stream: &'a mut T,
9+
}
10+
11+
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
12+
type Output = Option<T::Item>;
13+
14+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
15+
Pin::new(&mut *self.stream).poll_next(cx)
16+
}
17+
}

0 commit comments

Comments
 (0)