Skip to content

Commit 7a6b64c

Browse files
authored
Merge pull request #550 from alexcrichton/expose-atomic-task
Expose AtomicTask
2 parents cbc2e3b + c727e17 commit 7a6b64c

File tree

3 files changed

+66
-31
lines changed

3 files changed

+66
-31
lines changed

src/stream/futures_unordered.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,8 @@ impl<T> Stream for FuturesUnordered<T>
279279
type Error = T::Error;
280280

281281
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
282-
// Ensure `parent` is correctly set. Note that the `unsafe` here is
283-
// because the `park` method underneath needs mutual exclusion from
284-
// other calls to `park`, which we guarantee with `&mut self` above and
285-
// this is the only method which calls park.
286-
unsafe { self.inner.parent.park() };
282+
// Ensure `parent` is correctly set.
283+
self.inner.parent.register();
287284

288285
loop {
289286
let node = match unsafe { self.inner.dequeue() } {

src/task.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
#[allow(deprecated)]
3434
pub use task_impl::{Spawn, spawn, Unpark, Executor, Run, park};
3535

36-
pub use task_impl::{Task, current, init};
36+
pub use task_impl::{Task, AtomicTask, current, init};
3737

3838
#[allow(deprecated)]
3939
#[cfg(feature = "use_std")]

src/task_impl/atomic_task.rs

+63-25
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,25 @@ use core::cell::UnsafeCell;
77
use core::sync::atomic::AtomicUsize;
88
use core::sync::atomic::Ordering::{Acquire, Release};
99

10-
/// A coordinated `Task` handle enabling concurrent operations on a task.
10+
/// A synchronization primitive for task notification.
11+
///
12+
/// `AtomicTask` will coordinate concurrent notifications with the consumer
13+
/// potentially "updating" the underlying task to notify. This is useful in
14+
/// scenarios where a computation completes in another thread and wants to
15+
/// notify the consumer, but the consumer is in the process of being migrated to
16+
/// a new logical task.
17+
///
18+
/// Consumers should call `register` before checking the result of a computation
19+
/// and producers should call `notify` after producing the computation (this
20+
/// differs from the usual `thread::park` pattern). It is also permitted for
21+
/// `notify` to be called **before** `register`. This results in a no-op.
22+
///
23+
/// A single `AtomicTask` may be reused for any number of calls to `register` or
24+
/// `notify`.
25+
///
26+
/// `AtomicTask` does not provide any memory ordering guarantees, as such the
27+
/// user should use caution and use other synchronization primitives to guard
28+
/// the result of the underlying computation.
1129
pub struct AtomicTask {
1230
state: AtomicUsize,
1331
task: UnsafeCell<Option<Task>>,
@@ -21,15 +39,16 @@ pub struct AtomicTask {
2139
/// a lock.
2240
const WAITING: usize = 2;
2341

24-
/// The `park` function has determined that the task is no longer current. This
25-
/// implies that `AtomicTask::park` is being called from a different task than
26-
/// is represented by the currently stored task. The write lock is obtained to
27-
/// update the task cell.
42+
/// The `register` function has determined that the task is no longer current.
43+
/// This implies that `AtomicTask::register` is being called from a different
44+
/// task than is represented by the currently stored task. The write lock is
45+
/// obtained to update the task cell.
2846
const LOCKED_WRITE: usize = 0;
2947

30-
/// At least one call to `notify` happened concurrently to `park` updating the
31-
/// task cell. This state is detected when `park` exits the mutation code and
32-
/// signals to `park` that it is responsible for notifying its own task.
48+
/// At least one call to `notify` happened concurrently to `register` updating
49+
/// the task cell. This state is detected when `register` exits the mutation
50+
/// code and signals to `register` that it is responsible for notifying its own
51+
/// task.
3352
const LOCKED_WRITE_NOTIFIED: usize = 1;
3453

3554

@@ -52,30 +71,46 @@ impl AtomicTask {
5271
}
5372
}
5473

55-
/// The caller must ensure mutual exclusion
56-
pub unsafe fn park(&self) {
57-
if let Some(ref task) = *self.task.get() {
58-
if task.will_notify_current() {
59-
// Nothing more to do
60-
return
61-
}
62-
}
63-
74+
/// Registers the current task to be notified on calls to `notify`.
75+
///
76+
/// The new task will take place of any previous tasks that were registered
77+
/// by previous calls to `register`. Any calls to `notify` that happen after
78+
/// a call to `register` (as defined by the memory ordering rules), will
79+
/// notify the `register` caller's task.
80+
///
81+
/// It is safe to call `register` with multiple other threads concurrently
82+
/// calling `notify`. This will result in the `register` caller's current
83+
/// task being notified once.
84+
///
85+
/// This function is safe to call concurrently, but this is generally a bad
86+
/// idea. Concurrent calls to `register` will attempt to register different
87+
/// tasks to be notified. One of the callers will win and have its task set,
88+
/// but there is no guarantee as to which caller will succeed.
89+
pub fn register(&self) {
6490
// Get a new task handle
6591
let task = super::current();
6692

6793
match self.state.compare_and_swap(WAITING, LOCKED_WRITE, Acquire) {
6894
WAITING => {
69-
// Locked acquired, update the task cell
70-
*self.task.get() = Some(task);
71-
72-
// Release the lock. If the state transitioned to
73-
// `LOCKED_NOTIFIED`, this means that an notify has been
74-
// signaled, so notify the task.
75-
if LOCKED_WRITE_NOTIFIED == self.state.swap(WAITING, Release) {
76-
(*self.task.get()).as_ref().unwrap().notify();
95+
unsafe {
96+
// Locked acquired, update the task cell
97+
*self.task.get() = Some(task);
98+
99+
// Release the lock. If the state transitioned to
100+
// `LOCKED_NOTIFIED`, this means that an notify has been
101+
// signaled, so notify the task.
102+
if LOCKED_WRITE_NOTIFIED == self.state.swap(WAITING, Release) {
103+
(*self.task.get()).as_ref().unwrap().notify();
104+
}
77105
}
78106
}
107+
LOCKED_WRITE | LOCKED_WRITE_NOTIFIED => {
108+
// A thread is concurrently calling `register`. This shouldn't
109+
// happen as it doesn't really make much sense, but it isn't
110+
// unsafe per se. Since two threads are concurrently trying to
111+
// update the task, it's undefined which one "wins" (no ordering
112+
// guarantees), so we can just do nothing.
113+
}
79114
state => {
80115
debug_assert!(state != LOCKED_WRITE, "unexpected state LOCKED_WRITE");
81116
debug_assert!(state != LOCKED_WRITE_NOTIFIED, "unexpected state LOCKED_WRITE_NOTIFIED");
@@ -88,6 +123,9 @@ impl AtomicTask {
88123
}
89124
}
90125

126+
/// Notifies the task that last called `register`.
127+
///
128+
/// If `register` has not been called yet, then this does nothing.
91129
pub fn notify(&self) {
92130
let mut curr = WAITING;
93131

0 commit comments

Comments
 (0)