Skip to content

Commit d022efb

Browse files
committed
Implement FuturesUnordered::iter_mut
1 parent 2b3f0f0 commit d022efb

File tree

3 files changed

+83
-1
lines changed

3 files changed

+83
-1
lines changed

src/stream/futures_unordered.rs

+40
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,15 @@ impl<T> FuturesUnordered<T> {
206206
self.inner.enqueue(ptr);
207207
}
208208

209+
/// Returns an iterator that allows modifying each future in the set.
210+
pub fn iter_mut(&mut self) -> FuturesUnorderedIterMut<T> {
211+
FuturesUnorderedIterMut {
212+
node: self.head_all,
213+
len: self.len,
214+
set: self
215+
}
216+
}
217+
209218
fn release_node(&mut self, node: Arc<Node<T>>) {
210219
// The future is done, try to reset the queued flag. This will prevent
211220
// `notify` from doing any work in the future
@@ -427,6 +436,37 @@ impl<T> Drop for FuturesUnordered<T> {
427436
}
428437
}
429438

439+
#[derive(Debug)]
440+
/// Mutable iterator over all futures in the unordered set.
441+
pub struct FuturesUnorderedIterMut<'a, F: 'a> {
442+
set: &'a mut FuturesUnordered<F>,
443+
len: usize,
444+
node: *const Node<F>
445+
}
446+
447+
impl<'a, F> Iterator for FuturesUnorderedIterMut<'a, F> {
448+
type Item = &'a mut F;
449+
450+
fn next(&mut self) -> Option<&'a mut F> {
451+
if self.node.is_null() {
452+
return None;
453+
}
454+
unsafe {
455+
let future = (*(*self.node).future.get()).as_mut().unwrap();
456+
let next = *(*self.node).next_all.get();
457+
self.node = next;
458+
self.len -= 1;
459+
return Some(future);
460+
}
461+
}
462+
463+
fn size_hint(&self) -> (usize, Option<usize>) {
464+
(self.len, Some(self.len))
465+
}
466+
}
467+
468+
impl<'a, F> ExactSizeIterator for FuturesUnorderedIterMut<'a, F> {}
469+
430470
impl<T> Inner<T> {
431471
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
432472
fn enqueue(&self, node: *const Node<T>) {

src/stream/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ if_std! {
112112
pub use self::collect::Collect;
113113
pub use self::wait::Wait;
114114
pub use self::split::{SplitStream, SplitSink};
115-
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
115+
pub use self::futures_unordered::{futures_unordered, FuturesUnordered, FuturesUnorderedIterMut};
116116
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};
117117

118118
#[doc(hidden)]

tests/futures_unordered.rs

+42
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,45 @@ fn finished_future_ok() {
6969
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
7070
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
7171
}
72+
73+
#[test]
74+
fn iter_mut_cancel() {
75+
let (a_tx, a_rx) = oneshot::channel::<u32>();
76+
let (b_tx, b_rx) = oneshot::channel::<u32>();
77+
let (c_tx, c_rx) = oneshot::channel::<u32>();
78+
79+
let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);
80+
81+
for rx in stream.iter_mut() {
82+
rx.close();
83+
}
84+
85+
assert!(a_tx.is_canceled());
86+
assert!(b_tx.is_canceled());
87+
assert!(c_tx.is_canceled());
88+
89+
let mut spawn = futures::executor::spawn(stream);
90+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
91+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
92+
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
93+
assert_eq!(None, spawn.wait_stream());
94+
}
95+
96+
#[test]
97+
fn iter_mut_len() {
98+
let mut stream = futures_unordered(vec![
99+
futures::future::empty::<(),()>(),
100+
futures::future::empty::<(),()>(),
101+
futures::future::empty::<(),()>()
102+
]);
103+
104+
let mut iter_mut = stream.iter_mut();
105+
assert_eq!(iter_mut.len(), 3);
106+
assert!(iter_mut.next().is_some());
107+
assert_eq!(iter_mut.len(), 2);
108+
assert!(iter_mut.next().is_some());
109+
assert_eq!(iter_mut.len(), 1);
110+
assert!(iter_mut.next().is_some());
111+
assert_eq!(iter_mut.len(), 0);
112+
assert!(iter_mut.next().is_none());
113+
}

0 commit comments

Comments
 (0)