Skip to content

Commit 4b726a3

Browse files
committed
extract sem/time mod
1 parent 99fb734 commit 4b726a3

File tree

6 files changed

+213
-47
lines changed

6 files changed

+213
-47
lines changed

src/blk.rs

+5-16
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
use std::os::raw::c_void;
2-
use std::time::Duration;
32

43
use ffi::*;
5-
use {time_after_delay, Queue};
6-
7-
/// A type indicating whether a timed wait on a dispatch block returned due to a time out or not.
8-
#[derive(Clone, Copy, Debug, PartialEq)]
9-
pub struct WaitTimeout;
4+
use {Queue, Timeout, WaitTimeout};
105

116
/// Creates, synchronously executes, and releases a dispatch block from the specified block and flags.
127
pub fn perform<F>(flags: dispatch_block_flags_t, closure: F)
@@ -58,8 +53,8 @@ impl DispatchBlock {
5853
}
5954

6055
/// Waits synchronously until execution of the specified dispatch block has completed or until the specified timeout has elapsed.
61-
pub fn wait_timeout(&self, dur: Duration) -> Result<(), WaitTimeout> {
62-
let when = time_after_delay(dur);
56+
pub fn wait_timeout<T: Timeout>(&self, timeout: T) -> Result<(), WaitTimeout> {
57+
let when = timeout.as_raw();
6358

6459
if unsafe { dispatch_block_wait(self.ptr, when) } == 0 {
6560
Ok(())
@@ -142,19 +137,13 @@ mod tests {
142137

143138
assert!(!block.canceled());
144139

145-
assert_eq!(
146-
block.wait_timeout(Duration::from_millis(100)),
147-
Err(WaitTimeout)
148-
);
140+
assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout));
149141

150142
block.cancel();
151143

152144
assert!(block.canceled());
153145

154-
assert_eq!(
155-
block.wait_timeout(Duration::from_millis(100)),
156-
Err(WaitTimeout)
157-
);
146+
assert_eq!(block.wait_timeout(100u32), Err(WaitTimeout));
158147

159148
assert!(!block.done());
160149
}

src/data.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use Queue;
88

99
/// The destructor responsible for freeing the data when it is no longer needed.
1010
pub trait Destructor {
11+
/// Extracts the raw `dispatch_block_t`.
1112
fn as_raw(self) -> dispatch_block_t;
1213
}
1314

1415
impl Destructor for dispatch_block_t {
15-
/// Extracts the raw destructor object.
1616
fn as_raw(self) -> dispatch_block_t {
1717
self
1818
}
@@ -81,7 +81,7 @@ impl Data {
8181
Data { ptr }
8282
}
8383

84-
/// Extracts the raw dispatch data object.
84+
/// Extracts the raw `dispatch_data_t`.
8585
pub fn as_raw(&self) -> dispatch_data_t {
8686
self.ptr
8787
}
@@ -98,6 +98,7 @@ impl Data {
9898
unsafe { dispatch_data_get_size(self.ptr) }
9999
}
100100

101+
/// Returns `true` if the data has a length of 0.
101102
pub fn is_empty(&self) -> bool {
102103
self.ptr == unsafe { &_dispatch_data_empty as *const dispatch_object_s as dispatch_data_t }
103104
|| self.len() == 0

src/io.rs

-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ impl ChannelType {
2727
}
2828
}
2929

30-
pub type CleanupHandler = fn(i32);
31-
3230
pub struct Channel {
3331
ptr: dispatch_io_t,
3432
}

src/lib.rs

+12-27
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,22 @@ mod blk;
7373
mod data;
7474
#[cfg(target_os = "macos")]
7575
mod io;
76+
#[cfg(target_os = "macos")]
77+
mod sem;
78+
mod time;
7679

7780
#[cfg(target_os = "macos")]
78-
pub use blk::{perform, DispatchBlock, WaitTimeout};
81+
pub use blk::{perform, DispatchBlock};
7982
#[cfg(target_os = "macos")]
8083
pub use data::{
8184
dispatch_data_destructor_default, dispatch_data_destructor_free,
8285
dispatch_data_destructor_munmap, Data, Destructor,
8386
};
8487
#[cfg(target_os = "macos")]
8588
pub use ffi::{DISPATCH_IO_STOP, DISPATCH_IO_STRICT_INTERVAL};
89+
#[cfg(target_os = "macos")]
90+
pub use sem::Semaphore;
91+
pub use time::{after, at, now, Timeout, WaitTimeout};
8692

8793
/// The type of a dispatch queue.
8894
#[derive(Clone, Debug, Hash, PartialEq)]
@@ -149,28 +155,6 @@ pub struct Queue {
149155
ptr: dispatch_queue_t,
150156
}
151157

152-
fn time_after_delay(delay: Duration) -> dispatch_time_t {
153-
delay
154-
.as_secs()
155-
.checked_mul(1_000_000_000)
156-
.and_then(|i| i.checked_add(delay.subsec_nanos() as u64))
157-
.and_then(|i| {
158-
if i < (i64::max_value() as u64) {
159-
Some(i as i64)
160-
} else {
161-
None
162-
}
163-
})
164-
.map_or(DISPATCH_TIME_FOREVER, |i| unsafe {
165-
dispatch_time(DISPATCH_TIME_NOW, i)
166-
})
167-
}
168-
169-
/// Returns a `dispatch_time_t` corresponding to the wall time.
170-
pub fn now() -> dispatch_time_t {
171-
unsafe { dispatch_walltime(ptr::null(), 0) }
172-
}
173-
174158
fn context_and_function<F>(closure: F) -> (*mut c_void, dispatch_function_t)
175159
where
176160
F: FnOnce(),
@@ -325,11 +309,12 @@ impl Queue {
325309

326310
/// After the specified delay, submits a closure for asynchronous execution
327311
/// on self.
328-
pub fn after<F>(&self, delay: Duration, work: F)
312+
pub fn after<F, T>(&self, delay: T, work: F)
329313
where
330314
F: 'static + Send + FnOnce(),
315+
T: Timeout,
331316
{
332-
let when = time_after_delay(delay);
317+
let when = delay.as_raw();
333318
let (context, work) = context_and_function(work);
334319
unsafe {
335320
dispatch_after_f(when, self.ptr, context, work);
@@ -581,8 +566,8 @@ impl Group {
581566
/// Waits for all tasks associated with self to complete within the
582567
/// specified duration.
583568
/// Returns true if the tasks completed or false if the timeout elapsed.
584-
pub fn wait_timeout(&self, timeout: Duration) -> bool {
585-
let when = time_after_delay(timeout);
569+
pub fn wait_timeout<T: Timeout>(&self, timeout: T) -> bool {
570+
let when = timeout.as_raw();
586571
let result = unsafe { dispatch_group_wait(self.ptr, when) };
587572
result == 0
588573
}

src/sem.rs

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use ffi::*;
2+
use {Timeout, WaitTimeout};
3+
4+
/// A counting semaphore.
5+
///
6+
/// Calls to `Semaphore::signal` must be balanced with calls to `Semaphore::wait`.
7+
/// Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception.
8+
#[derive(Debug)]
9+
pub struct Semaphore {
10+
ptr: dispatch_semaphore_t,
11+
}
12+
13+
impl Semaphore {
14+
/// Creates new counting semaphore with an initial value.
15+
///
16+
/// Passing zero for the value is useful for
17+
/// when two threads need to reconcile the completion of a particular event.
18+
/// Passing a value greater than zero is useful for managing a finite pool of resources,
19+
/// where the pool size is equal to the value.
20+
pub fn new(n: u64) -> Self {
21+
let ptr = unsafe { dispatch_semaphore_create(n as i64) };
22+
23+
Semaphore { ptr }
24+
}
25+
26+
/// Wait (decrement) for a semaphore.
27+
///
28+
/// Decrement the counting semaphore.
29+
pub fn wait(&self) -> Result<(), WaitTimeout> {
30+
self.wait_timeout(DISPATCH_TIME_FOREVER)
31+
}
32+
33+
/// Wait (decrement) for a semaphoreor until the specified timeout has elapsed.
34+
///
35+
/// Decrement the counting semaphore.
36+
pub fn wait_timeout<T: Timeout>(&self, timeout: T) -> Result<(), WaitTimeout> {
37+
let when = timeout.as_raw();
38+
39+
let n = unsafe { dispatch_semaphore_wait(self.ptr, when) };
40+
41+
if n == 0 {
42+
Ok(())
43+
} else {
44+
Err(WaitTimeout)
45+
}
46+
}
47+
48+
/// Signal (increment) a semaphore.
49+
///
50+
/// Increment the counting semaphore.
51+
/// If the previous value was less than zero, this function wakes a waiting thread before returning.
52+
///
53+
/// This function returns `true` if a thread is woken. Otherwise, `false` is returned.
54+
pub fn signal(&self) -> bool {
55+
unsafe { dispatch_semaphore_signal(self.ptr) != 0 }
56+
}
57+
}
58+
59+
unsafe impl Sync for Semaphore {}
60+
unsafe impl Send for Semaphore {}
61+
62+
impl Clone for Semaphore {
63+
fn clone(&self) -> Self {
64+
unsafe {
65+
dispatch_retain(self.ptr);
66+
}
67+
Semaphore { ptr: self.ptr }
68+
}
69+
}
70+
71+
impl Drop for Semaphore {
72+
fn drop(&mut self) {
73+
unsafe {
74+
dispatch_release(self.ptr);
75+
}
76+
}
77+
}
78+
79+
#[cfg(test)]
80+
mod tests {
81+
use super::*;
82+
83+
#[test]
84+
fn test_semaphore() {
85+
let sem = Semaphore::new(1);
86+
87+
assert!(sem.wait().is_ok());
88+
assert_eq!(sem.wait_timeout(0).unwrap_err(), WaitTimeout);
89+
90+
assert!(!sem.signal());
91+
assert!(sem.wait_timeout(DISPATCH_TIME_FOREVER).is_ok());
92+
93+
// Calls to dispatch_semaphore_signal must be balanced with calls to wait().
94+
// Attempting to dispose of a semaphore with a count lower than value causes an EXC_BAD_INSTRUCTION exception.
95+
assert!(!sem.signal());
96+
}
97+
}

src/time.rs

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::ptr;
2+
use std::time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH};
3+
4+
use libc::timespec;
5+
6+
use ffi::*;
7+
8+
/// A type indicating whether a timed wait on a dispatch object returned due to a time out or not.
9+
#[derive(Clone, Copy, Debug, PartialEq)]
10+
pub struct WaitTimeout;
11+
12+
/// When to timeout.
13+
pub trait Timeout {
14+
/// Extracts the raw `dispatch_time_t`.
15+
fn as_raw(self) -> dispatch_time_t;
16+
}
17+
18+
impl<T: Timeout> Timeout for Option<T> {
19+
fn as_raw(self) -> dispatch_time_t {
20+
if let Some(timeout) = self {
21+
timeout.as_raw()
22+
} else {
23+
DISPATCH_TIME_NOW
24+
}
25+
}
26+
}
27+
28+
impl Timeout for i32 {
29+
fn as_raw(self) -> dispatch_time_t {
30+
Duration::from_millis(self as u64).as_raw()
31+
}
32+
}
33+
34+
impl Timeout for u32 {
35+
fn as_raw(self) -> dispatch_time_t {
36+
Duration::from_millis(self as u64).as_raw()
37+
}
38+
}
39+
40+
impl Timeout for Duration {
41+
fn as_raw(self) -> dispatch_time_t {
42+
after(self)
43+
}
44+
}
45+
46+
impl Timeout for dispatch_time_t {
47+
fn as_raw(self) -> dispatch_time_t {
48+
self
49+
}
50+
}
51+
52+
impl Timeout for Instant {
53+
fn as_raw(self) -> dispatch_time_t {
54+
self.duration_since(Instant::now()).as_raw()
55+
}
56+
}
57+
58+
impl Timeout for SystemTime {
59+
fn as_raw(self) -> dispatch_time_t {
60+
self.duration_since(SystemTime::now()).unwrap().as_raw()
61+
}
62+
}
63+
64+
/// Returns a `dispatch_time_t` corresponding to the given time.
65+
pub fn after(delay: Duration) -> dispatch_time_t {
66+
delay
67+
.as_secs()
68+
.checked_mul(1_000_000_000)
69+
.and_then(|i| i.checked_add(delay.subsec_nanos() as u64))
70+
.and_then(|i| {
71+
if i < (i64::max_value() as u64) {
72+
Some(i as i64)
73+
} else {
74+
None
75+
}
76+
})
77+
.map_or(DISPATCH_TIME_FOREVER, |i| unsafe {
78+
dispatch_time(DISPATCH_TIME_NOW, i)
79+
})
80+
}
81+
82+
/// Returns a `dispatch_time_t` corresponding to the wall time.
83+
pub fn now() -> dispatch_time_t {
84+
unsafe { dispatch_walltime(ptr::null(), 0) }
85+
}
86+
87+
/// Returns a `dispatch_time_t` corresponding to the given time.
88+
pub fn at(tm: SystemTime) -> Result<dispatch_time_t, SystemTimeError> {
89+
let dur = tm.duration_since(UNIX_EPOCH)?;
90+
let ts = timespec {
91+
tv_sec: dur.as_secs() as i64,
92+
tv_nsec: dur.subsec_nanos() as i64,
93+
};
94+
95+
Ok(unsafe { dispatch_walltime(&ts, 0) })
96+
}

0 commit comments

Comments
 (0)