Skip to content

Commit edf4e0e

Browse files
committed
Add recursive mutex type, remove protect
1 parent 796f046 commit edf4e0e

File tree

5 files changed

+64
-41
lines changed

5 files changed

+64
-41
lines changed

Flow/Disposable.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ public final class DisposeBag: Disposable {
8484

8585
/// Returns true if there is currently no disposables to dispose.
8686
public var isEmpty: Bool {
87-
return mutex.protect { disposables.isEmpty }
87+
mutex.lock()
88+
defer { mutex.unlock() }
89+
90+
return disposables.isEmpty
8891
}
8992

9093
public func dispose() {

Flow/Future.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,10 @@ func memPrint(_ str: String, _ count: Int32) {
329329
private extension Future {
330330

331331
private var protectedState: State {
332-
return mutex.protect { state }
332+
mutex.lock()
333+
defer { mutex.unlock() }
334+
335+
return state
333336
}
334337

335338
func lock() {

Flow/FutureQueue.swift

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public final class FutureQueue<Resource> {
1818
private let queueScheduler: Scheduler
1919
private var _closedError: Error?
2020
private let isEmptyCallbacker = Callbacker<Bool>()
21-
private var _mutex = pthread_mutex_t()
21+
private var mutex = pthread_mutex_t()
2222

2323
// enqueued items.
2424
private var items: [Executable] = [] {
@@ -41,9 +41,11 @@ public final class FutureQueue<Resource> {
4141
queueScheduler = executeOn
4242
OSAtomicIncrement32(&futureQueueUnitTestAliveCount)
4343
memPrint("Queue init", futureQueueUnitTestAliveCount)
44+
mutex.initialize(as: .recursive)
4445
}
4546

4647
deinit {
48+
mutex.deinitialize()
4749
OSAtomicDecrement32(&futureQueueUnitTestAliveCount)
4850
memPrint("Queue deinit", futureQueueUnitTestAliveCount)
4951
}
@@ -61,9 +63,9 @@ public extension FutureQueue {
6163
return Future { completion in
6264
let item = QueueItem<Output>(operation: operation, completion: completion)
6365

64-
self.mutex.protect {
65-
self.items.append(item)
66-
}
66+
self.mutex.lock()
67+
self.items.append(item)
68+
self.mutex.unlock()
6769

6870
self.executeNextItem()
6971

@@ -119,7 +121,10 @@ public extension FutureQueue {
119121
public extension FutureQueue {
120122
/// Do we have any enqueued operations?
121123
var isEmpty: Bool {
122-
return mutex.protect { items.isEmpty }
124+
mutex.lock()
125+
defer { mutex.unlock() }
126+
127+
return items.isEmpty
123128
}
124129

125130
/// Returns a signal that will signal when `isEmpty` is changed.
@@ -164,19 +169,21 @@ public extension FutureQueue {
164169

165170
/// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true.
166171
var closedError: Error? {
167-
return mutex.protect { _closedError }
172+
mutex.lock()
173+
defer { mutex.unlock() }
174+
175+
return _closedError
168176
}
169177
}
170178

171179
private extension FutureQueue {
172-
var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
173180
func lock() { mutex.lock() }
174181
func unlock() { mutex.unlock() }
175182

176183
func removeItem(_ item: Executable) {
177-
mutex.protect {
178-
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
179-
}
184+
mutex.lock()
185+
defer { mutex.unlock() }
186+
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
180187
}
181188

182189
func executeNextItem() {
@@ -188,9 +195,9 @@ private extension FutureQueue {
188195
unlock()
189196

190197
item.execute(on: queueScheduler) {
191-
self.mutex.protect {
192-
self.concurrentCount -= 1
193-
}
198+
self.mutex.lock()
199+
self.concurrentCount -= 1
200+
self.mutex.unlock()
194201
self.removeItem(item)
195202
self.executeNextItem()
196203
}

Flow/Locking.swift

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,22 @@ public final class Mutex {
3939
}
4040
}
4141

42+
@usableFromInline
43+
enum MutexType {
44+
case normal
45+
case recursive
46+
47+
var attrType: Int32 {
48+
switch self {
49+
case .normal: return PTHREAD_MUTEX_NORMAL
50+
case .recursive: return PTHREAD_MUTEX_RECURSIVE
51+
}
52+
}
53+
}
54+
4255
internal extension pthread_mutex_t {
4356

44-
@inlinable mutating func initialize() {
57+
@inlinable mutating func initialize(as type: MutexType = .normal) {
4558
withUnsafeMutablePointer(to: &self) {
4659
$0.initialize()
4760
}
@@ -65,12 +78,6 @@ internal extension pthread_mutex_t {
6578
}
6679
}
6780

68-
@inlinable mutating func protect<T>(_ block: () throws -> T) rethrows -> T {
69-
try withUnsafeMutablePointer(to: &self) {
70-
return try $0.protect(block)
71-
}
72-
}
73-
7481
}
7582

7683
typealias PThreadMutex = UnsafeMutablePointer<pthread_mutex_t>
@@ -79,14 +86,14 @@ typealias PThreadMutex = UnsafeMutablePointer<pthread_mutex_t>
7986
/// - Note: You have to explicity call `initialize()` before use (typically in a class init) and `deinitialize()` when done (typically in a class deinit)
8087
extension UnsafeMutablePointer where Pointee == pthread_mutex_t {
8188
@usableFromInline
82-
func initialize() {
89+
func initialize(as type: MutexType = .normal) {
8390
var attr = pthread_mutexattr_t()
8491
defer { pthread_mutexattr_destroy(&attr) }
8592
guard pthread_mutexattr_init(&attr) == 0 else {
8693
preconditionFailure()
8794
}
8895

89-
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL)
96+
pthread_mutexattr_settype(&attr, type.attrType)
9097

9198
guard pthread_mutex_init(self, &attr) == 0 else {
9299
preconditionFailure()
@@ -110,13 +117,6 @@ extension UnsafeMutablePointer where Pointee == pthread_mutex_t {
110117
pthread_mutex_unlock(self)
111118
}
112119

113-
/// Will lock `self`, call `block`, then unlock `self`
114-
@discardableResult @usableFromInline
115-
func protect<T>(_ block: () throws -> T) rethrows -> T {
116-
pthread_mutex_lock(self)
117-
defer { pthread_mutex_unlock(self) }
118-
return try block()
119-
}
120120
}
121121

122122
/// Internal helper to help manage state in stateful transforms.

Flow/OrderedCallbacker.swift

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,40 @@ public final class OrderedCallbacker<OrderedValue, CallbackValue> {
2626

2727
/// - Returns: True if no callbacks has been registered.
2828
public var isEmpty: Bool {
29-
return mutex.protect { callbacks.isEmpty }
29+
mutex.lock()
30+
defer { mutex.unlock() }
31+
32+
return callbacks.isEmpty
3033
}
3134

3235
/// Register a callback and orderedValue to be called when `callAll` is executed.
3336
/// - Parameter callback: The next callback won't be called until `callback` return `Future` completes
3437
/// - Parameter orderedValue: The value used to order this callback
3538
/// - Returns: A `Disposable` to be disposed to unregister the callback.
3639
public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable {
37-
return mutex.protect {
38-
let key = generateKey()
39-
callbacks[key] = (orderedValue, callback)
40-
return Disposer {
41-
self.mutex.protect { self.callbacks[key] = nil }
42-
}
40+
mutex.lock()
41+
defer { mutex.unlock() }
42+
43+
let key = generateKey()
44+
callbacks[key] = (orderedValue, callback)
45+
return Disposer {
46+
self.mutex.lock()
47+
self.callbacks[key] = nil
48+
self.mutex.unlock()
4349
}
4450
}
4551

4652
/// Will call all registered callbacks with `value` in the order set by `isOrderedBefore`
4753
/// - Returns: A `Future` that will complete when all callbacks has been called.
4854
@discardableResult
4955
public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> {
50-
return mutex.protect {
51-
callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 }
52-
}.mapToFuture { $0(value) }.toVoid()
56+
57+
mutex.lock()
58+
defer { mutex.unlock() }
59+
return callbacks.values
60+
.sorted { isOrderedBefore($0.0, $1.0) }
61+
.map { $1 }
62+
.mapToFuture { $0(value) }.toVoid()
5363
}
5464
}
5565

0 commit comments

Comments
 (0)