Skip to content

Remove futures_[un]ordered functions #1482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions futures-util/src/stream/futures_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ impl<T> Future for OrderWrapper<T>
/// some of the later futures have already completed.
///
/// Note that you can create a ready-made `FuturesOrdered` via the
/// `futures_ordered` function in the `stream` module, or you can start with an
/// empty queue with the `FuturesOrdered::new` constructor.
/// [`collect`](Iterator::collect) method, or you can start with an empty queue
/// with the `FuturesOrdered::new` constructor.
#[must_use = "streams do nothing unless polled"]
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
Expand All @@ -97,25 +97,6 @@ pub struct FuturesOrdered<T: Future> {

impl<T: Future> Unpin for FuturesOrdered<T> {}

/// Converts a list of futures into a `Stream` of results from the futures.
///
/// This function will take a list of futures (e.g. a vector, an iterator,
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that their originating
/// futures were submitted to the queue. If the futures complete out of order,
/// items will be stored internally within `FuturesOrdered` until all preceding
/// items have been yielded.
///
/// Note that the returned queue can also be used to dynamically push more
/// futures into the queue as they become available.
pub fn futures_ordered<I>(futures: I) -> FuturesOrdered<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
futures.into_iter().collect()
}

impl<Fut: Future> FuturesOrdered<Fut> {
/// Constructs a new, empty `FuturesOrdered`
///
Expand Down
25 changes: 3 additions & 22 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use alloc::sync::{Arc, Weak};
mod abort;

mod iter;
use self::iter::{IterMut, IterPinMut};
pub use self::iter::{IterMut, IterPinMut};

mod task;
use self::task::Task;
Expand Down Expand Up @@ -51,8 +51,8 @@ const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value();
/// wake-ups for new futures.
///
/// Note that you can create a ready-made [`FuturesUnordered`] via the
/// [`futures_unordered`](futures_unordered()) function, or you can start with
/// an empty set with the [`FuturesUnordered::new`] constructor.
/// [`collect`](Iterator::collect) method, or you can start with an empty set
/// with the [`FuturesUnordered::new`] constructor.
#[must_use = "streams do nothing unless polled"]
pub struct FuturesUnordered<Fut> {
ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>,
Expand Down Expand Up @@ -469,25 +469,6 @@ impl<Fut: Future> FromIterator<Fut> for FuturesUnordered<Fut> {
}
}

/// Converts a list of futures into a [`Stream`] of outputs from the futures.
///
/// This function will take a list of futures (e.g. a [`Vec`], an [`Iterator`],
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that they become
/// available. This function is similar to
/// [`buffer_unordered`](super::StreamExt::buffer_unordered) in that it may
/// return items in a different order than in the list specified.
///
/// Note that the returned set can also be used to dynamically push more
/// futures into the set as they become available.
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
futures.into_iter().collect()
}

impl<Fut: Future> FusedStream for FuturesUnordered<Fut> {
fn is_terminated(&self) -> bool {
self.len == TERMINATED_SENTINEL_LENGTH
Expand Down
7 changes: 4 additions & 3 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
mod futures_ordered;
#[cfg(feature = "alloc")]
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
pub use self::futures_ordered::FuturesOrdered;

#[cfg(feature = "alloc")]
mod futures_unordered;
pub mod futures_unordered;
#[cfg(feature = "alloc")]
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
#[doc(inline)]
pub use self::futures_unordered::FuturesUnordered;

#[cfg(feature = "alloc")]
mod split;
Expand Down
8 changes: 3 additions & 5 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,8 @@ pub mod stream {
//! asynchronously produce a sequence of values.
//! - The [`StreamExt`](crate::stream::StreamExt) trait, which provides
//! adapters for chaining and composing streams.
//! - Top-level stream contructors like [`iter_ok`](crate::stream::iter)
//! which creates a stream from an iterator, and
//! [`futures_unordered`](crate::stream::futures_unordered()), which
//! constructs a stream from a collection of futures.
//! - Top-level stream contructors like [`iter`](crate::stream::iter)
//! which creates a stream from an iterator.

pub use futures_core::stream::{
Stream, TryStream, FusedStream,
Expand Down Expand Up @@ -358,7 +356,7 @@ pub mod stream {
)]
#[cfg(feature = "alloc")]
pub use futures_util::stream::{
futures_ordered, FuturesOrdered,
FuturesOrdered,
futures_unordered, FuturesUnordered,

// For StreamExt:
Expand Down
8 changes: 4 additions & 4 deletions futures/tests/futures_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, FutureExt, FutureObj};
use futures::stream::{StreamExt, futures_ordered, FuturesOrdered};
use futures::stream::{StreamExt, FuturesOrdered};
use futures_test::task::noop_waker_ref;

#[test]
Expand All @@ -12,7 +12,7 @@ fn works_1() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = futures_ordered(vec![a_rx, b_rx, c_rx]);
let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();

b_tx.send(99).unwrap();
assert!(stream.poll_next_unpin(&noop_waker_ref()).is_pending());
Expand All @@ -33,10 +33,10 @@ fn works_2() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = futures_ordered(vec![
let mut stream = vec![
FutureObj::new(Box::new(a_rx)),
FutureObj::new(Box::new(b_rx.join(c_rx).map(|(a, b)| Ok(a? + b?)))),
]);
].into_iter().collect::<FuturesOrdered<_>>();

let lw = &noop_waker_ref();
a_tx.send(33).unwrap();
Expand Down
16 changes: 8 additions & 8 deletions futures/tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, FutureExt, FutureObj};
use futures::stream::{StreamExt, futures_unordered, FuturesUnordered};
use futures::stream::{StreamExt, FuturesUnordered};
use futures::task::Poll;
use futures_test::{assert_stream_done, assert_stream_next};
use futures_test::future::FutureTestExt;
Expand All @@ -16,7 +16,7 @@ fn works_1() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut iter = block_on_stream(futures_unordered(vec![a_rx, b_rx, c_rx]));
let mut iter = block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());

b_tx.send(99).unwrap();
assert_eq!(Some(Ok(99)), iter.next());
Expand All @@ -34,10 +34,10 @@ fn works_2() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = futures_unordered(vec![
let mut stream = vec![
FutureObj::new(Box::new(a_rx)),
FutureObj::new(Box::new(b_rx.join(c_rx).map(|(a, b)| Ok(a? + b?)))),
]);
].into_iter().collect::<FuturesUnordered<_>>();

a_tx.send(9).unwrap();
b_tx.send(10).unwrap();
Expand Down Expand Up @@ -91,7 +91,7 @@ fn iter_mut_cancel() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);
let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();

for rx in stream.iter_mut() {
rx.close();
Expand All @@ -111,11 +111,11 @@ fn iter_mut_cancel() {

#[test]
fn iter_mut_len() {
let mut stream = futures_unordered(vec![
let mut stream = vec![
future::empty::<()>(),
future::empty::<()>(),
future::empty::<()>()
]);
].into_iter().collect::<FuturesUnordered<_>>();

let mut iter_mut = stream.iter_mut();
assert_eq!(iter_mut.len(), 3);
Expand All @@ -133,7 +133,7 @@ fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
// asserting that it does not move.
let fut = future::ready(()).pending_once().assert_unmoved();
let mut stream = futures_unordered(vec![fut; 3]);
let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
assert_stream_next!(stream, ());
assert_stream_next!(stream, ());
assert_stream_next!(stream, ());
Expand Down