Skip to content

Commit f273d3b

Browse files
authored
FuturesUnordered: Do not poll the same future twice per iteration (#2333)
1 parent 4959ecc commit f273d3b

File tree

2 files changed

+60
-18
lines changed

2 files changed

+60
-18
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,6 @@ use self::task::Task;
3030
mod ready_to_run_queue;
3131
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
3232

33-
/// Constant used for a `FuturesUnordered` to determine how many times it is
34-
/// allowed to poll underlying futures without yielding.
35-
///
36-
/// A single call to `poll_next` may potentially do a lot of work before
37-
/// yielding. This happens in particular if the underlying futures are awoken
38-
/// frequently but continue to return `Pending`. This is problematic if other
39-
/// tasks are waiting on the executor, since they do not get to run. This value
40-
/// caps the number of calls to `poll` on underlying futures a single call to
41-
/// `poll_next` is allowed to make.
42-
///
43-
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
44-
/// that amortize wakeup and scheduling costs, but low enough that we do not
45-
/// starve other tasks for long.
46-
///
47-
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
48-
const YIELD_EVERY: usize = 32;
4933

5034
/// A set of futures which may complete in any order.
5135
///
@@ -414,6 +398,22 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
414398
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
415399
-> Poll<Option<Self::Item>>
416400
{
401+
// Variable to determine how many times it is allowed to poll underlying
402+
// futures without yielding.
403+
//
404+
// A single call to `poll_next` may potentially do a lot of work before
405+
// yielding. This happens in particular if the underlying futures are awoken
406+
// frequently but continue to return `Pending`. This is problematic if other
407+
// tasks are waiting on the executor, since they do not get to run. This value
408+
// caps the number of calls to `poll` on underlying futures a single call to
409+
// `poll_next` is allowed to make.
410+
//
411+
// The value is the length of FuturesUnordered. This ensures that each
412+
// future is polled only once at most per iteration.
413+
//
414+
// See also https://github.com/rust-lang/futures-rs/issues/2047.
415+
let yield_every = self.len();
416+
417417
// Keep track of how many child futures we have polled,
418418
// in case we want to forcibly yield.
419419
let mut polled = 0;
@@ -548,7 +548,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
548548
let task = bomb.task.take().unwrap();
549549
bomb.queue.link(task);
550550

551-
if polled == YIELD_EVERY {
551+
if polled == yield_every {
552552
// We have polled a large number of futures in a row without yielding.
553553
// To ensure we do not starve other tasks waiting on the executor,
554554
// we yield here, but immediately wake ourselves up to continue.

futures/tests/futures_unordered.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
use futures::future::Future;
2+
use futures::stream::{FuturesUnordered, StreamExt};
3+
use futures::task::{Context, Poll};
4+
use futures_test::task::noop_context;
5+
use std::iter::FromIterator;
6+
use std::pin::Pin;
7+
18
#[test]
29
fn is_terminated() {
310
use futures::future;
@@ -270,12 +277,13 @@ fn futures_not_moved_after_poll() {
270277
use futures::future;
271278
use futures::stream::FuturesUnordered;
272279
use futures_test::future::FutureTestExt;
273-
use futures_test::{assert_stream_done, assert_stream_next};
280+
use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
274281

275282
// Future that will be ready after being polled twice,
276283
// asserting that it does not move.
277284
let fut = future::ready(()).pending_once().assert_unmoved();
278285
let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
286+
assert_stream_pending!(stream);
279287
assert_stream_next!(stream, ());
280288
assert_stream_next!(stream, ());
281289
assert_stream_next!(stream, ());
@@ -326,3 +334,37 @@ fn len_valid_during_out_of_order_completion() {
326334
assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
327335
assert_eq!(stream.len(), 0);
328336
}
337+
338+
#[test]
339+
fn polled_only_once_at_most_per_iteration() {
340+
#[derive(Debug, Clone, Copy, Default)]
341+
struct F {
342+
polled: bool,
343+
}
344+
345+
impl Future for F {
346+
type Output = ();
347+
348+
fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
349+
if self.polled {
350+
panic!("polled twice")
351+
} else {
352+
self.polled = true;
353+
Poll::Pending
354+
}
355+
}
356+
}
357+
358+
let cx = &mut noop_context();
359+
360+
let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
361+
assert!(tasks.poll_next_unpin(cx).is_pending());
362+
assert_eq!(10, tasks.iter().filter(|f| f.polled).count());
363+
364+
let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
365+
assert!(tasks.poll_next_unpin(cx).is_pending());
366+
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
367+
368+
let mut tasks = FuturesUnordered::<F>::new();
369+
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
370+
}

0 commit comments

Comments
 (0)