Skip to content

Commit 7cde07e

Browse files
committed
Make it possible to time out after a deadline
1 parent b1596a3 commit 7cde07e

File tree

3 files changed

+80
-3
lines changed

3 files changed

+80
-3
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
matrix:
1818
rust:
1919
- stable
20-
- 1.56.1
20+
- 1.77.1
2121
steps:
2222
- name: Checkout sources
2323
uses: actions/checkout@v3

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ name = "async-event"
88
version = "0.1.0"
99
authors = ["Serge Barral <[email protected]>"]
1010
edition = "2021"
11-
rust-version = "1.56"
11+
rust-version = "1.77"
1212
license = "MIT OR Apache-2.0"
1313
repository = "https://github.com/asynchronics/async-event"
1414
readme = "README.md"
@@ -18,6 +18,9 @@ An efficient async condition variable for lock-free algorithms.
1818
categories = ["asynchronous", "concurrency"]
1919
keywords = ["async", "event", "atomic", "futures"]
2020

21+
[dependencies]
22+
pin-project-lite = "0.2"
23+
2124
[dev-dependencies]
2225
tokio = { version = "1", features = ["full"] }
2326
futures-executor = "0.3"

src/lib.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@
6969
//! });
7070
//! ```
7171
72+
// Temporary workaround until the `async_event_loom` flag can be whitelisted
73+
// without a `build.rs` [1].
74+
//
75+
// [1]: (https://github.com/rust-lang/rust/issues/124800).
76+
#![allow(unexpected_cfgs)]
77+
7278
mod loom_exports;
7379

7480
use std::future::Future;
@@ -81,6 +87,7 @@ use std::task::{Context, Poll, Waker};
8187
use loom_exports::cell::UnsafeCell;
8288
use loom_exports::sync::atomic::{self, AtomicBool};
8389
use loom_exports::sync::Mutex;
90+
use pin_project_lite::pin_project;
8491

8592
/// An object that can receive or send notifications.
8693
pub struct Event {
@@ -130,9 +137,29 @@ impl Event {
130137

131138
/// Returns a future that can be `await`ed until the provided predicate is
132139
/// satisfied.
133-
pub fn wait_until<F: FnMut() -> Option<T>, T>(&self, predicate: F) -> WaitUntil<F, T> {
140+
pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<F, T>
141+
where
142+
F: FnMut() -> Option<T>,
143+
{
134144
WaitUntil::new(&self.wait_set, predicate)
135145
}
146+
147+
/// Returns a future that can be `await`ed until the provided predicate is
148+
/// satisfied or until the provided future completes.
149+
///
150+
/// The deadline is specified as a `Future` that is expected to resolves to
151+
/// `()` after some duration, such as a `tokio::time::Sleep` future.
152+
pub fn wait_until_or_timeout<F, T, D>(
153+
&self,
154+
predicate: F,
155+
deadline: D,
156+
) -> WaitUntilOrTimeout<F, T, D>
157+
where
158+
F: FnMut() -> Option<T>,
159+
D: Future<Output = ()>,
160+
{
161+
WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
162+
}
136163
}
137164

138165
impl Default for Event {
@@ -378,6 +405,53 @@ enum WaitUntilState {
378405
Completed,
379406
}
380407

408+
pin_project! {
409+
/// A future that can be `await`ed until a predicate is satisfied or until a
410+
/// deadline elapses.
411+
pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
412+
wait_until: WaitUntil<'a, F, T>,
413+
#[pin]
414+
deadline: D,
415+
}
416+
}
417+
418+
impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
419+
where
420+
F: FnMut() -> Option<T>,
421+
D: Future<Output = ()>,
422+
{
423+
/// Creates a future associated with the specified event sink that can be
424+
/// `await`ed until the specified predicate is satisfied, or until the
425+
/// specified timeout future completes.
426+
fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
427+
Self {
428+
wait_until: WaitUntil::new(wait_set, predicate),
429+
deadline,
430+
}
431+
}
432+
}
433+
434+
impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
435+
where
436+
F: FnMut() -> Option<T>,
437+
D: Future<Output = ()>,
438+
{
439+
type Output = Option<T>;
440+
441+
#[inline]
442+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
443+
let this = self.project();
444+
445+
if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
446+
Poll::Ready(Some(value))
447+
} else if this.deadline.poll(cx).is_ready() {
448+
Poll::Ready(None)
449+
} else {
450+
Poll::Pending
451+
}
452+
}
453+
}
454+
381455
/// A set of notifiers.
382456
///
383457
/// The set wraps a Mutex-protected list of notifiers and manages a flag for

0 commit comments

Comments
 (0)