|
| 1 | +//! Opt-in yield points for improved cooperative scheduling. |
| 2 | +//! |
| 3 | +//! A single call to [`poll`] on a top-level task may potentially do a lot of work before it |
| 4 | +//! returns `Poll::Pending`. If a task runs for a long period of time without yielding back to the |
| 5 | +//! executor, it can starve other tasks waiting on that executor to execute them, or drive |
| 6 | +//! underlying resources. Since Rust does not have a runtime, it is difficult to forcibly preempt a |
| 7 | +//! long-running task. Instead, this module provides an opt-in mechanism for futures to collaborate |
| 8 | +//! with the executor to avoid starvation. |
| 9 | +//! |
| 10 | +//! Consider a future like this one: |
| 11 | +//! |
| 12 | +//! ``` |
| 13 | +//! # use tokio::stream::{Stream, StreamExt}; |
| 14 | +//! async fn drop_all<I: Stream + Unpin>(mut input: I) { |
| 15 | +//! while let Some(_) = input.next().await {} |
| 16 | +//! } |
| 17 | +//! ``` |
| 18 | +//! |
| 19 | +//! It may look harmless, but consider what happens under heavy load if the input stream is |
| 20 | +//! _always_ ready. If we spawn `drop_all`, the task will never yield, and will starve other tasks |
| 21 | +//! and resources on the same executor. With opt-in yield points, this problem is alleviated: |
| 22 | +//! |
| 23 | +//! ```ignore |
| 24 | +//! # use tokio::stream::{Stream, StreamExt}; |
| 25 | +//! async fn drop_all<I: Stream + Unpin>(mut input: I) { |
| 26 | +//! while let Some(_) = input.next().await { |
| 27 | +//! tokio::coop::proceed().await; |
| 28 | +//! } |
| 29 | +//! } |
| 30 | +//! ``` |
| 31 | +//! |
| 32 | +//! The `proceed` future will coordinate with the executor to make sure that every so often control |
| 33 | +//! is yielded back to the executor so it can run other tasks. |
| 34 | +//! |
| 35 | +//! # Placing yield points |
| 36 | +//! |
| 37 | +//! Voluntary yield points should be placed _after_ at least some work has been done. If they are |
| 38 | +//! not, a future sufficiently deep in the task hierarchy may end up _never_ getting to run because |
| 39 | +//! of the number of yield points that inevitably appear before it is reached. In general, you will |
| 40 | +//! want yield points to only appear in "leaf" futures -- those that do not themselves poll other |
| 41 | +//! futures. By doing this, you avoid double-counting each iteration of the outer future against |
| 42 | +//! the cooperating budget. |
| 43 | +//! |
| 44 | +//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll |
| 45 | +
|
| 46 | +// NOTE: The doctests in this module are ignored since the whole module is (currently) private. |
| 47 | + |
| 48 | +use std::cell::Cell; |
| 49 | +use std::task::{Context, Poll}; |
| 50 | + |
| 51 | +/// Constant used to determine how much "work" a task is allowed to do without yielding. |
| 52 | +/// |
| 53 | +/// The value itself is chosen somewhat arbitrarily. It needs to be high enough to amortize wakeup |
| 54 | +/// and scheduling costs, but low enough that we do not starve other tasks for too long. The value |
| 55 | +/// also needs to be high enough that particularly deep tasks are able to do at least some useful |
| 56 | +/// work at all. |
| 57 | +/// |
| 58 | +/// Note that as more yield points are added in the ecosystem, this value will probably also have |
| 59 | +/// to be raised. |
| 60 | +const BUDGET: usize = 128; |
| 61 | + |
| 62 | +/// Constant used to determine if budgeting has been disabled. |
| 63 | +const UNCONSTRAINED: usize = usize::max_value(); |
| 64 | + |
| 65 | +thread_local! { |
| 66 | + static HITS: Cell<usize> = Cell::new(UNCONSTRAINED); |
| 67 | +} |
| 68 | + |
| 69 | +/// Run the given closure with a cooperative task budget. |
| 70 | +/// |
| 71 | +/// Enabling budgeting when it is already enabled is a no-op. |
| 72 | +#[inline(always)] |
| 73 | +pub(crate) fn budget<F, R>(f: F) -> R |
| 74 | +where |
| 75 | + F: FnOnce() -> R, |
| 76 | +{ |
| 77 | + HITS.with(move |hits| { |
| 78 | + if hits.get() != UNCONSTRAINED { |
| 79 | + // We are already being budgeted. |
| 80 | + // |
| 81 | + // Arguably this should be an error, but it can happen "correctly" |
| 82 | + // such as with block_on + LocalSet, so we make it a no-op. |
| 83 | + return f(); |
| 84 | + } |
| 85 | + |
| 86 | + struct Guard<'a>(&'a Cell<usize>); |
| 87 | + impl<'a> Drop for Guard<'a> { |
| 88 | + fn drop(&mut self) { |
| 89 | + self.0.set(UNCONSTRAINED); |
| 90 | + } |
| 91 | + } |
| 92 | + |
| 93 | + hits.set(BUDGET); |
| 94 | + let _guard = Guard(hits); |
| 95 | + f() |
| 96 | + }) |
| 97 | +} |
| 98 | + |
| 99 | +cfg_blocking_impl! { |
| 100 | + /// Forcibly remove the budgeting constraints early. |
| 101 | + pub(crate) fn stop() { |
| 102 | + HITS.with(|hits| { |
| 103 | + hits.set(UNCONSTRAINED); |
| 104 | + }); |
| 105 | + } |
| 106 | +} |
| 107 | + |
| 108 | +/// Invoke `f` with a subset of the remaining budget. |
| 109 | +/// |
| 110 | +/// This is useful if you have sub-futures that you need to poll, but that you want to restrict |
| 111 | +/// from using up your entire budget. For example, imagine the following future: |
| 112 | +/// |
| 113 | +/// ```rust |
| 114 | +/// # use std::{future::Future, pin::Pin, task::{Context, Poll}}; |
| 115 | +/// use futures::stream::FuturesUnordered; |
| 116 | +/// struct MyFuture<F1, F2> { |
| 117 | +/// big: FuturesUnordered<F1>, |
| 118 | +/// small: F2, |
| 119 | +/// } |
| 120 | +/// |
| 121 | +/// use tokio::stream::Stream; |
| 122 | +/// impl<F1, F2> Future for MyFuture<F1, F2> |
| 123 | +/// where F1: Future, F2: Future |
| 124 | +/// # , F1: Unpin, F2: Unpin |
| 125 | +/// { |
| 126 | +/// type Output = F2::Output; |
| 127 | +/// |
| 128 | +/// // fn poll(...) |
| 129 | +/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> { |
| 130 | +/// # let this = &mut *self; |
| 131 | +/// let mut big = // something to pin self.big |
| 132 | +/// # Pin::new(&mut this.big); |
| 133 | +/// let small = // something to pin self.small |
| 134 | +/// # Pin::new(&mut this.small); |
| 135 | +/// |
| 136 | +/// // see if any of the big futures have finished |
| 137 | +/// while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) { |
| 138 | +/// // do something with e |
| 139 | +/// # let _ = e; |
| 140 | +/// } |
| 141 | +/// |
| 142 | +/// // see if the small future has finished |
| 143 | +/// small.poll(cx) |
| 144 | +/// } |
| 145 | +/// # } |
| 146 | +/// ``` |
| 147 | +/// |
| 148 | +/// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and |
| 149 | +/// `small` never gets polled. That would be sad. If you want to stick up for the little future, |
| 150 | +/// that's what `limit` is for. It lets you portion out a smaller part of the yield budget to a |
| 151 | +/// particular segment of your code. In the code above, you would write |
| 152 | +/// |
| 153 | +/// ```rust,ignore |
| 154 | +/// # use std::{future::Future, pin::Pin, task::{Context, Poll}}; |
| 155 | +/// # use futures::stream::FuturesUnordered; |
| 156 | +/// # struct MyFuture<F1, F2> { |
| 157 | +/// # big: FuturesUnordered<F1>, |
| 158 | +/// # small: F2, |
| 159 | +/// # } |
| 160 | +/// # |
| 161 | +/// # use tokio::stream::Stream; |
| 162 | +/// # impl<F1, F2> Future for MyFuture<F1, F2> |
| 163 | +/// # where F1: Future, F2: Future |
| 164 | +/// # , F1: Unpin, F2: Unpin |
| 165 | +/// # { |
| 166 | +/// # type Output = F2::Output; |
| 167 | +/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> { |
| 168 | +/// # let this = &mut *self; |
| 169 | +/// # let mut big = Pin::new(&mut this.big); |
| 170 | +/// # let small = Pin::new(&mut this.small); |
| 171 | +/// # |
| 172 | +/// // see if any of the big futures have finished |
| 173 | +/// while let Some(e) = futures::ready!(tokio::coop::limit(64, || big.as_mut().poll_next(cx))) { |
| 174 | +/// # // do something with e |
| 175 | +/// # let _ = e; |
| 176 | +/// # } |
| 177 | +/// # small.poll(cx) |
| 178 | +/// # } |
| 179 | +/// # } |
| 180 | +/// ``` |
| 181 | +/// |
| 182 | +/// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left |
| 183 | +/// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`, |
| 184 | +/// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its |
| 185 | +/// entire budget. |
| 186 | +/// |
| 187 | +/// Note that you cannot _increase_ your budget by calling `limit`. The budget provided to the code |
| 188 | +/// inside the buget is the _minimum_ of the _current_ budget and the bound. |
| 189 | +/// |
| 190 | +#[allow(unreachable_pub, dead_code)] |
| 191 | +pub fn limit<R>(bound: usize, f: impl FnOnce() -> R) -> R { |
| 192 | + HITS.with(|hits| { |
| 193 | + let budget = hits.get(); |
| 194 | + // with_bound cannot _increase_ the remaining budget |
| 195 | + let bound = std::cmp::min(budget, bound); |
| 196 | + // When f() exits, how much should we add to what is left? |
| 197 | + let floor = budget.saturating_sub(bound); |
| 198 | + // Make sure we restore the remaining budget even on panic |
| 199 | + struct RestoreBudget<'a>(&'a Cell<usize>, usize); |
| 200 | + impl<'a> Drop for RestoreBudget<'a> { |
| 201 | + fn drop(&mut self) { |
| 202 | + let left = self.0.get(); |
| 203 | + self.0.set(self.1 + left); |
| 204 | + } |
| 205 | + } |
| 206 | + // Time to restrict! |
| 207 | + hits.set(bound); |
| 208 | + let _restore = RestoreBudget(&hits, floor); |
| 209 | + f() |
| 210 | + }) |
| 211 | +} |
| 212 | + |
| 213 | +/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. |
| 214 | +#[allow(unreachable_pub, dead_code)] |
| 215 | +#[inline] |
| 216 | +pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> { |
| 217 | + HITS.with(|hits| { |
| 218 | + let n = hits.get(); |
| 219 | + if n == UNCONSTRAINED { |
| 220 | + // opted out of budgeting |
| 221 | + Poll::Ready(()) |
| 222 | + } else if n == 0 { |
| 223 | + cx.waker().wake_by_ref(); |
| 224 | + Poll::Pending |
| 225 | + } else { |
| 226 | + hits.set(n.saturating_sub(1)); |
| 227 | + Poll::Ready(()) |
| 228 | + } |
| 229 | + }) |
| 230 | +} |
| 231 | + |
| 232 | +/// Resolves immediately unless the current task has already exceeded its budget. |
| 233 | +/// |
| 234 | +/// This should be placed after at least some work has been done. Otherwise a future sufficiently |
| 235 | +/// deep in the task hierarchy may end up never getting to run because of the number of yield |
| 236 | +/// points that inevitably appear before it is even reached. For example: |
| 237 | +/// |
| 238 | +/// ```ignore |
| 239 | +/// # use tokio::stream::{Stream, StreamExt}; |
| 240 | +/// async fn drop_all<I: Stream + Unpin>(mut input: I) { |
| 241 | +/// while let Some(_) = input.next().await { |
| 242 | +/// tokio::coop::proceed().await; |
| 243 | +/// } |
| 244 | +/// } |
| 245 | +/// ``` |
| 246 | +#[allow(unreachable_pub, dead_code)] |
| 247 | +#[inline] |
| 248 | +pub async fn proceed() { |
| 249 | + use crate::future::poll_fn; |
| 250 | + poll_fn(|cx| poll_proceed(cx)).await; |
| 251 | +} |
| 252 | + |
| 253 | +#[cfg(all(test, not(loom)))] |
| 254 | +mod test { |
| 255 | + use super::*; |
| 256 | + |
| 257 | + fn get() -> usize { |
| 258 | + HITS.with(|hits| hits.get()) |
| 259 | + } |
| 260 | + |
| 261 | + #[test] |
| 262 | + fn bugeting() { |
| 263 | + use tokio_test::*; |
| 264 | + |
| 265 | + assert_eq!(get(), UNCONSTRAINED); |
| 266 | + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 267 | + assert_eq!(get(), UNCONSTRAINED); |
| 268 | + budget(|| { |
| 269 | + assert_eq!(get(), BUDGET); |
| 270 | + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 271 | + assert_eq!(get(), BUDGET - 1); |
| 272 | + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 273 | + assert_eq!(get(), BUDGET - 2); |
| 274 | + }); |
| 275 | + assert_eq!(get(), UNCONSTRAINED); |
| 276 | + |
| 277 | + budget(|| { |
| 278 | + limit(3, || { |
| 279 | + assert_eq!(get(), 3); |
| 280 | + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 281 | + assert_eq!(get(), 2); |
| 282 | + limit(4, || { |
| 283 | + assert_eq!(get(), 2); |
| 284 | + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 285 | + assert_eq!(get(), 1); |
| 286 | + }); |
| 287 | + assert_eq!(get(), 1); |
| 288 | + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 289 | + assert_eq!(get(), 0); |
| 290 | + assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 291 | + assert_eq!(get(), 0); |
| 292 | + assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 293 | + assert_eq!(get(), 0); |
| 294 | + }); |
| 295 | + assert_eq!(get(), BUDGET - 3); |
| 296 | + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| 297 | + assert_eq!(get(), BUDGET - 4); |
| 298 | + assert_ready!(task::spawn(proceed()).poll()); |
| 299 | + assert_eq!(get(), BUDGET - 5); |
| 300 | + }); |
| 301 | + } |
| 302 | +} |
0 commit comments