Skip to content

Commit cce3c07

Browse files
committed
Implement FuturesUnordered::iter_mut
1 parent 2f85d73 commit cce3c07

File tree

3 files changed

+89
-2
lines changed

3 files changed

+89
-2
lines changed

src/stream/futures_unordered.rs renamed to src/stream/futures_unordered_impl.rs

+40
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,15 @@ impl<T> FuturesUnordered<T> {
207207
self.inner.enqueue(ptr);
208208
}
209209

210+
/// Returns an iterator that allows modifying each future in the set.
211+
pub fn iter_mut(&mut self) -> IterMut<T> {
212+
IterMut {
213+
node: self.head_all,
214+
len: self.len,
215+
_set: self
216+
}
217+
}
218+
210219
fn release_node(&mut self, node: Arc<Node<T>>) {
211220
// The future is done, try to reset the queued flag. This will prevent
212221
// `notify` from doing any work in the future
@@ -440,6 +449,37 @@ impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
440449
}
441450
}
442451

452+
#[derive(Debug)]
453+
/// Mutable iterator over all futures in the unordered set.
454+
pub struct IterMut<'a, F: 'a> {
455+
node: *const Node<F>,
456+
len: usize,
457+
_set: &'a mut FuturesUnordered<F>
458+
}
459+
460+
impl<'a, F> Iterator for IterMut<'a, F> {
461+
type Item = &'a mut F;
462+
463+
fn next(&mut self) -> Option<&'a mut F> {
464+
if self.node.is_null() {
465+
return None;
466+
}
467+
unsafe {
468+
let future = (*(*self.node).future.get()).as_mut().unwrap();
469+
let next = *(*self.node).next_all.get();
470+
self.node = next;
471+
self.len -= 1;
472+
return Some(future);
473+
}
474+
}
475+
476+
fn size_hint(&self) -> (usize, Option<usize>) {
477+
(self.len, Some(self.len))
478+
}
479+
}
480+
481+
impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
482+
443483
impl<T> Inner<T> {
444484
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
445485
fn enqueue(&self, node: *const Node<T>) {

src/stream/mod.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ if_std! {
103103
mod wait;
104104
mod channel;
105105
mod split;
106-
mod futures_unordered;
106+
mod futures_unordered_impl;
107107
mod futures_ordered;
108108
pub use self::buffered::Buffered;
109109
pub use self::buffer_unordered::BufferUnordered;
@@ -112,9 +112,14 @@ if_std! {
112112
pub use self::collect::Collect;
113113
pub use self::wait::Wait;
114114
pub use self::split::{SplitStream, SplitSink};
115-
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
115+
pub use self::futures_unordered_impl::{futures_unordered, FuturesUnordered};
116116
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
117117

118+
pub mod futures_unordered {
119+
//! An unbounded set of futures.
120+
pub use super::futures_unordered_impl::IterMut;
121+
}
122+
118123
#[doc(hidden)]
119124
#[cfg(feature = "with-deprecated")]
120125
#[allow(deprecated)]

tests/futures_unordered.rs

+42
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,45 @@ fn finished_future_ok() {
8383
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
8484
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
8585
}
86+
87+
#[test]
88+
fn iter_mut_cancel() {
89+
let (a_tx, a_rx) = oneshot::channel::<u32>();
90+
let (b_tx, b_rx) = oneshot::channel::<u32>();
91+
let (c_tx, c_rx) = oneshot::channel::<u32>();
92+
93+
let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);
94+
95+
for rx in stream.iter_mut() {
96+
rx.close();
97+
}
98+
99+
assert!(a_tx.is_canceled());
100+
assert!(b_tx.is_canceled());
101+
assert!(c_tx.is_canceled());
102+
103+
let mut spawn = futures::executor::spawn(stream);
104+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
105+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
106+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
107+
assert_eq!(None, spawn.wait_stream());
108+
}
109+
110+
#[test]
111+
fn iter_mut_len() {
112+
let mut stream = futures_unordered(vec![
113+
futures::future::empty::<(),()>(),
114+
futures::future::empty::<(),()>(),
115+
futures::future::empty::<(),()>()
116+
]);
117+
118+
let mut iter_mut = stream.iter_mut();
119+
assert_eq!(iter_mut.len(), 3);
120+
assert!(iter_mut.next().is_some());
121+
assert_eq!(iter_mut.len(), 2);
122+
assert!(iter_mut.next().is_some());
123+
assert_eq!(iter_mut.len(), 1);
124+
assert!(iter_mut.next().is_some());
125+
assert_eq!(iter_mut.len(), 0);
126+
assert!(iter_mut.next().is_none());
127+
}

0 commit comments

Comments
 (0)