Skip to content

Commit 1f8339f

Browse files
committed
Implement FuturesUnordered::iter_mut
1 parent 2f85d73 commit 1f8339f

File tree

3 files changed

+85
-1
lines changed

3 files changed

+85
-1
lines changed

src/stream/futures_unordered.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! An unbounded set of futures.
2+
13
use std::cell::UnsafeCell;
24
use std::fmt::{self, Debug};
35
use std::iter::FromIterator;
@@ -207,6 +209,15 @@ impl<T> FuturesUnordered<T> {
207209
self.inner.enqueue(ptr);
208210
}
209211

212+
/// Returns an iterator that allows modifying each future in the set.
213+
pub fn iter_mut(&mut self) -> IterMut<T> {
214+
IterMut {
215+
node: self.head_all,
216+
len: self.len,
217+
_marker: PhantomData
218+
}
219+
}
220+
210221
fn release_node(&mut self, node: Arc<Node<T>>) {
211222
// The future is done, try to reset the queued flag. This will prevent
212223
// `notify` from doing any work in the future
@@ -440,6 +451,37 @@ impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
440451
}
441452
}
442453

454+
#[derive(Debug)]
455+
/// Mutable iterator over all futures in the unordered set.
456+
pub struct IterMut<'a, F: 'a> {
457+
node: *const Node<F>,
458+
len: usize,
459+
_marker: PhantomData<&'a F>
460+
}
461+
462+
impl<'a, F> Iterator for IterMut<'a, F> {
463+
type Item = &'a mut F;
464+
465+
fn next(&mut self) -> Option<&'a mut F> {
466+
if self.node.is_null() {
467+
return None;
468+
}
469+
unsafe {
470+
let future = (*(*self.node).future.get()).as_mut().unwrap();
471+
let next = *(*self.node).next_all.get();
472+
self.node = next;
473+
self.len -= 1;
474+
return Some(future);
475+
}
476+
}
477+
478+
fn size_hint(&self) -> (usize, Option<usize>) {
479+
(self.len, Some(self.len))
480+
}
481+
}
482+
483+
impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
484+
443485
impl<T> Inner<T> {
444486
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
445487
fn enqueue(&self, node: *const Node<T>) {

src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ if_std! {
103103
mod wait;
104104
mod channel;
105105
mod split;
106-
mod futures_unordered;
106+
pub mod futures_unordered;
107107
mod futures_ordered;
108108
pub use self::buffered::Buffered;
109109
pub use self::buffer_unordered::BufferUnordered;

tests/futures_unordered.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,45 @@ fn finished_future_ok() {
8383
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
8484
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
8585
}
86+
87+
#[test]
88+
fn iter_mut_cancel() {
89+
let (a_tx, a_rx) = oneshot::channel::<u32>();
90+
let (b_tx, b_rx) = oneshot::channel::<u32>();
91+
let (c_tx, c_rx) = oneshot::channel::<u32>();
92+
93+
let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);
94+
95+
for rx in stream.iter_mut() {
96+
rx.close();
97+
}
98+
99+
assert!(a_tx.is_canceled());
100+
assert!(b_tx.is_canceled());
101+
assert!(c_tx.is_canceled());
102+
103+
let mut spawn = futures::executor::spawn(stream);
104+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
105+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
106+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
107+
assert_eq!(None, spawn.wait_stream());
108+
}
109+
110+
#[test]
111+
fn iter_mut_len() {
112+
let mut stream = futures_unordered(vec![
113+
futures::future::empty::<(),()>(),
114+
futures::future::empty::<(),()>(),
115+
futures::future::empty::<(),()>()
116+
]);
117+
118+
let mut iter_mut = stream.iter_mut();
119+
assert_eq!(iter_mut.len(), 3);
120+
assert!(iter_mut.next().is_some());
121+
assert_eq!(iter_mut.len(), 2);
122+
assert!(iter_mut.next().is_some());
123+
assert_eq!(iter_mut.len(), 1);
124+
assert!(iter_mut.next().is_some());
125+
assert_eq!(iter_mut.len(), 0);
126+
assert!(iter_mut.next().is_none());
127+
}

0 commit comments

Comments
 (0)