Skip to content

Expose AtomicTask #550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/stream/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,8 @@ impl<T> Stream for FuturesUnordered<T>
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
// Ensure `parent` is correctly set. Note that the `unsafe` here is
// because the `park` method underneath needs mutual exclusion from
// other calls to `park`, which we guarantee with `&mut self` above and
// this is the only method which calls park.
unsafe { self.inner.parent.park() };
// Ensure `parent` is correctly set.
self.inner.parent.register();

loop {
let node = match unsafe { self.inner.dequeue() } {
Expand Down
2 changes: 1 addition & 1 deletion src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#[allow(deprecated)]
pub use task_impl::{Spawn, spawn, Unpark, Executor, Run, park};

pub use task_impl::{Task, current, init};
pub use task_impl::{Task, AtomicTask, current, init};

#[allow(deprecated)]
#[cfg(feature = "use_std")]
Expand Down
88 changes: 63 additions & 25 deletions src/task_impl/atomic_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,25 @@ use core::cell::UnsafeCell;
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::{Acquire, Release};

/// A coordinated `Task` handle enabling concurrent operations on a task.
/// A synchronization primitive for task notification.
///
/// `AtomicTask` will coordinate concurrent notifications with the consumer
/// potentially "updating" the underlying task to notify. This is useful in
/// scenarios where a computation completes in another thread and wants to
/// notify the consumer, but the consumer is in the process of being migrated to
/// a new logical task.
///
/// Consumers should call `register` before checking the result of a computation
/// and producers should call `notify` after producing the computation (this
/// differs from the usual `thread::park` pattern). It is also permitted for
/// `notify` to be called **before** `register`. This results in a no-op.
///
/// A single `AtomicTask` may be reused for any number of calls to `register` or
/// `notify`.
///
/// `AtomicTask` does not provide any memory ordering guarantees, as such the
/// user should use caution and use other synchronization primitives to guard
/// the result of the underlying computation.
pub struct AtomicTask {
state: AtomicUsize,
task: UnsafeCell<Option<Task>>,
Expand All @@ -21,15 +39,16 @@ pub struct AtomicTask {
/// a lock.
const WAITING: usize = 2;

/// The `park` function has determined that the task is no longer current. This
/// implies that `AtomicTask::park` is being called from a different task than
/// is represented by the currently stored task. The write lock is obtained to
/// update the task cell.
/// The `register` function has determined that the task is no longer current.
/// This implies that `AtomicTask::register` is being called from a different
/// task than is represented by the currently stored task. The write lock is
/// obtained to update the task cell.
const LOCKED_WRITE: usize = 0;

/// At least one call to `notify` happened concurrently to `park` updating the
/// task cell. This state is detected when `park` exits the mutation code and
/// signals to `park` that it is responsible for notifying its own task.
/// At least one call to `notify` happened concurrently to `register` updating
/// the task cell. This state is detected when `register` exits the mutation
/// code and signals to `register` that it is responsible for notifying its own
/// task.
const LOCKED_WRITE_NOTIFIED: usize = 1;


Expand All @@ -52,30 +71,46 @@ impl AtomicTask {
}
}

/// The caller must ensure mutual exclusion
pub unsafe fn park(&self) {
if let Some(ref task) = *self.task.get() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this one of the big use cases of this cell? Is the perf impact not measureable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was never measured. The main point was managing the logic around atomically swapping the consumer task while the producer is notifying. The original API was unsafe mostly because it was for a specific use case (FuturesUnordered) where we knew there was only a single consumer and it had to be &self.

Either way, if it proves to be a perf issue, we can always bring it back as park_unchecked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me!

if task.will_notify_current() {
// Nothing more to do
return
}
}

/// Registers the current task to be notified on calls to `notify`.
///
/// The new task will take place of any previous tasks that were registered
/// by previous calls to `register`. Any calls to `notify` that happen after
/// a call to `register` (as defined by the memory ordering rules), will
/// notify the `register` caller's task.
///
/// It is safe to call `register` with multiple other threads concurrently
/// calling `notify`. This will result in the `register` caller's current
/// task being notified once.
///
/// This function is safe to call concurrently, but this is generally a bad
/// idea. Concurrent calls to `register` will attempt to register different
/// tasks to be notified. One of the callers will win and have its task set,
/// but there is no guarantee as to which caller will succeed.
pub fn register(&self) {
// Get a new task handle
let task = super::current();

match self.state.compare_and_swap(WAITING, LOCKED_WRITE, Acquire) {
WAITING => {
// Locked acquired, update the task cell
*self.task.get() = Some(task);

// Release the lock. If the state transitioned to
// `LOCKED_NOTIFIED`, this means that an notify has been
// signaled, so notify the task.
if LOCKED_WRITE_NOTIFIED == self.state.swap(WAITING, Release) {
(*self.task.get()).as_ref().unwrap().notify();
unsafe {
// Locked acquired, update the task cell
*self.task.get() = Some(task);

// Release the lock. If the state transitioned to
// `LOCKED_NOTIFIED`, this means that an notify has been
// signaled, so notify the task.
if LOCKED_WRITE_NOTIFIED == self.state.swap(WAITING, Release) {
(*self.task.get()).as_ref().unwrap().notify();
}
}
}
LOCKED_WRITE | LOCKED_WRITE_NOTIFIED => {
// A thread is concurrently calling `register`. This shouldn't
// happen as it doesn't really make much sense, but it isn't
// unsafe per se. Since two threads are concurrently trying to
// update the task, it's undefined which one "wins" (no ordering
// guarantees), so we can just do nothing.
}
state => {
debug_assert!(state != LOCKED_WRITE, "unexpected state LOCKED_WRITE");
debug_assert!(state != LOCKED_WRITE_NOTIFIED, "unexpected state LOCKED_WRITE_NOTIFIED");
Expand All @@ -88,6 +123,9 @@ impl AtomicTask {
}
}

/// Notifies the task that last called `register`.
///
/// If `register` has not been called yet, then this does nothing.
pub fn notify(&self) {
let mut curr = WAITING;

Expand Down