Skip to content

Commit 9baff2a

Browse files
authored
[AsyncThrowingChannel] make the fail terminal event non async (#164)
* asyncThrowingChannel: make fail set a terminal state * asyncChannel: align naming on asyncThrowingChannel * asyncChannel: remove a test base on Task.sleep * guides: update Channel with non async fail(_:)
1 parent 6df7fcd commit 9baff2a

File tree

4 files changed

+109
-108
lines changed

4 files changed

+109
-108
lines changed

Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
4444
public init(element elementType: Element.Type = Element.self, failure failureType: Failure.Type = Failure.self)
4545

4646
public func send(_ element: Element) async
47-
public func fail(_ error: Error) async where Failure == Error
47+
public func fail(_ error: Error) where Failure == Error
4848
public func finish()
4949

5050
public func makeAsyncIterator() -> Iterator
5151
}
5252
```
5353

54-
Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. On the one hand, the back pressure applied by `send(_:)` and `fail(_:)` via the suspension/resume ensure that the production of values does not exceed the consumption of values from iteration. Each of these methods suspend after enqueuing the event and are resumed when the next call to `next()` on the `Iterator` is made. On the other hand, the call to `finish()` immediately resumes all the pending operations for every producers and consumers. Thus, every suspended `send(_:)` operations instantly resume, so as every suspended `next()` operations by producing a nil value, indicating the termination of the iterations. Further calls to `send(_:)` will immediately resume.
54+
Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. On the one hand, the back pressure applied by `send(_:)` via the suspension/resume ensures that the production of values does not exceed the consumption of values from iteration. This method suspends after enqueuing the event and is resumed when the next call to `next()` on the `Iterator` is made. On the other hand, the call to `finish()` or `fail(_:)` immediately resumes all the pending operations for every producers and consumers. Thus, every suspended `send(_:)` operations instantly resume, so as every suspended `next()` operations by producing a nil value, or by throwing an error, indicating the termination of the iterations. Further calls to `send(_:)` will immediately resume.
5555

5656
```swift
5757
let channel = AsyncChannel<String>()

Sources/AsyncAlgorithms/AsyncChannel.swift

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
///
1414
/// The `AsyncChannel` class is intended to be used as a communication type between tasks,
1515
/// particularly when one task produces values and another task consumes those values. The back
16-
/// pressure applied by `send(_:)` and `finish()` via the suspension/resume ensures that
17-
/// the production of values does not exceed the consumption of values from iteration. Each of these
18-
/// methods suspends after enqueuing the event and is resumed when the next call to `next()`
19-
/// on the `Iterator` is made.
16+
/// pressure applied by `send(_:)` via the suspension/resume ensures that
17+
/// the production of values does not exceed the consumption of values from iteration. This method
18+
/// suspends after enqueuing the event and is resumed when the next call to `next()`
19+
/// on the `Iterator` is made, or when `finish()` is called from another Task.
20+
/// As `finish()` induces a terminal state, there is no need for a back pressure management.
21+
/// This function does not suspend and will finish all the pending iterations.
2022
public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
2123
/// The iterator for a `AsyncChannel` instance.
2224
public struct Iterator: AsyncIteratorProtocol, Sendable {
@@ -168,7 +170,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
168170
}
169171
}
170172

171-
func finishAll() {
173+
func terminateAll() {
172174
let (sends, nexts) = state.withCriticalRegion { state -> ([UnsafeContinuation<UnsafeContinuation<Element?, Never>?, Never>], Set<Awaiting>) in
173175
if state.terminal {
174176
return ([], [])
@@ -195,7 +197,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
195197

196198
func _send(_ element: Element) async {
197199
await withTaskCancellationHandler {
198-
finishAll()
200+
terminateAll()
199201
} operation: {
200202
let continuation: UnsafeContinuation<Element?, Never>? = await withUnsafeContinuation { continuation in
201203
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Never>?, Never>? in
@@ -225,15 +227,17 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
225227
}
226228
}
227229

228-
/// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made.
230+
/// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made
231+
/// or when a call to `finish()` is made from another Task.
229232
/// If the channel is already finished then this returns immediately
230233
public func send(_ element: Element) async {
231234
await _send(element)
232235
}
233236

234237
/// Send a finish to all awaiting iterations.
238+
/// All subsequent calls to `next(_:)` will resume immediately.
235239
public func finish() {
236-
finishAll()
240+
terminateAll()
237241
}
238242

239243
/// Create an `Iterator` for iteration of an `AsyncChannel`

Sources/AsyncAlgorithms/AsyncThrowingChannel.swift

Lines changed: 79 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@
1111

1212
/// An error-throwing channel for sending elements from on task to another with back pressure.
1313
///
14-
/// The `AsyncThrowingChannel` class is intended to be used as a communication types between tasks, particularly when one task produces values and another task consumes those values. The back pressure applied by `send(_:)`, `fail(_:)` and `finish()` via suspension/resume ensures that the production of values does not exceed the consumption of values from iteration. Each of these methods suspends after enqueuing the event and is resumed when the next call to `next()` on the `Iterator` is made.
14+
/// The `AsyncThrowingChannel` class is intended to be used as a communication types between tasks,
15+
/// particularly when one task produces values and another task consumes those values. The back
16+
/// pressure applied by `send(_:)` via suspension/resume ensures that the production of values does
17+
/// not exceed the consumption of values from iteration. This method suspends after enqueuing the event
18+
/// and is resumed when the next call to `next()` on the `Iterator` is made, or when `finish()`/`fail(_:)` is called
19+
/// from another Task. As `finish()` and `fail(_:)` induce a terminal state, there is no need for a back pressure management.
20+
/// Those functions do not suspend and will finish all the pending iterations.
1521
public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: AsyncSequence, Sendable {
1622
/// The iterator for an `AsyncThrowingChannel` instance.
1723
public struct Iterator: AsyncIteratorProtocol, Sendable {
@@ -78,12 +84,23 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
7884
return lhs.generation == rhs.generation
7985
}
8086
}
87+
88+
enum Termination {
89+
case finished
90+
case failed(Error)
91+
}
8192

8293
enum Emission {
8394
case idle
8495
case pending([UnsafeContinuation<UnsafeContinuation<Element?, Error>?, Never>])
8596
case awaiting(Set<Awaiting>)
86-
97+
case terminated(Termination)
98+
99+
var isTerminated: Bool {
100+
guard case .terminated = self else { return false }
101+
return true
102+
}
103+
87104
mutating func cancel(_ generation: Int) -> UnsafeContinuation<Element?, Error>? {
88105
switch self {
89106
case .awaiting(var awaiting):
@@ -106,9 +123,8 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
106123
struct State {
107124
var emission: Emission = .idle
108125
var generation = 0
109-
var terminal = false
110126
}
111-
127+
112128
let state = ManagedCriticalState(State())
113129

114130
public init(_ elementType: Element.Type = Element.self) { }
@@ -129,12 +145,9 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
129145
func next(_ generation: Int) async throws -> Element? {
130146
return try await withUnsafeThrowingContinuation { continuation in
131147
var cancelled = false
132-
var terminal = false
148+
var potentialTermination: Termination?
149+
133150
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Error>?, Never>? in
134-
if state.terminal {
135-
terminal = true
136-
return nil
137-
}
138151
switch state.emission {
139152
case .idle:
140153
state.emission = .awaiting([Awaiting(generation: generation, continuation: continuation)])
@@ -158,53 +171,78 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
158171
state.emission = .awaiting(nexts)
159172
}
160173
return nil
174+
case .terminated(let termination):
175+
potentialTermination = termination
176+
state.emission = .terminated(.finished)
177+
return nil
161178
}
162179
}?.resume()
163-
if cancelled || terminal {
180+
181+
if cancelled {
164182
continuation.resume(returning: nil)
183+
return
184+
}
185+
186+
switch potentialTermination {
187+
case .none:
188+
return
189+
case .failed(let error):
190+
continuation.resume(throwing: error)
191+
return
192+
case .finished:
193+
continuation.resume(returning: nil)
194+
return
165195
}
166196
}
167197
}
168-
169-
func finishAll() {
198+
199+
func terminateAll(error: Failure? = nil) {
170200
let (sends, nexts) = state.withCriticalRegion { state -> ([UnsafeContinuation<UnsafeContinuation<Element?, Error>?, Never>], Set<Awaiting>) in
171-
if state.terminal {
172-
return ([], [])
201+
202+
let nextState: Emission
203+
if let error = error {
204+
nextState = .terminated(.failed(error))
205+
} else {
206+
nextState = .terminated(.finished)
173207
}
174-
state.terminal = true
208+
175209
switch state.emission {
176210
case .idle:
211+
state.emission = nextState
177212
return ([], [])
178213
case .pending(let nexts):
179-
state.emission = .idle
214+
state.emission = nextState
180215
return (nexts, [])
181216
case .awaiting(let nexts):
182-
state.emission = .idle
217+
state.emission = nextState
183218
return ([], nexts)
219+
case .terminated:
220+
return ([], [])
184221
}
185222
}
223+
186224
for send in sends {
187225
send.resume(returning: nil)
188226
}
189-
for next in nexts {
190-
next.continuation?.resume(returning: nil)
227+
228+
if let error = error {
229+
for next in nexts {
230+
next.continuation?.resume(throwing: error)
231+
}
232+
} else {
233+
for next in nexts {
234+
next.continuation?.resume(returning: nil)
235+
}
191236
}
237+
192238
}
193239

194-
func _send(_ result: Result<Element, Error>) async {
240+
func _send(_ element: Element) async {
195241
await withTaskCancellationHandler {
196-
finishAll()
242+
terminateAll()
197243
} operation: {
198244
let continuation: UnsafeContinuation<Element?, Error>? = await withUnsafeContinuation { continuation in
199245
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Error>?, Never>? in
200-
if state.terminal {
201-
return UnsafeResumption(continuation: continuation, success: nil)
202-
}
203-
204-
if case .failure = result {
205-
state.terminal = true
206-
}
207-
208246
switch state.emission {
209247
case .idle:
210248
state.emission = .pending([continuation])
@@ -221,28 +259,32 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
221259
state.emission = .awaiting(nexts)
222260
}
223261
return UnsafeResumption(continuation: continuation, success: next)
262+
case .terminated:
263+
return UnsafeResumption(continuation: continuation, success: nil)
224264
}
225265
}?.resume()
226266
}
227-
continuation?.resume(with: result.map { $0 as Element? })
267+
continuation?.resume(returning: element)
228268
}
229269
}
230270

231-
/// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made.
271+
/// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made
272+
/// or when a call to `finish()`/`fail(_:)` is made from another Task.
232273
/// If the channel is already finished then this returns immediately
233274
public func send(_ element: Element) async {
234-
await _send(.success(element))
275+
await _send(element)
235276
}
236277

237-
/// Send an error to an awaiting iteration. This function will resume when the next call to `next()` is made.
238-
/// If the channel is already finished then this returns immediately
239-
public func fail(_ error: Error) async where Failure == Error {
240-
await _send(.failure(error))
278+
/// Send an error to all awaiting iterations.
279+
/// All subsequent calls to `next(_:)` will resume immediately.
280+
public func fail(_ error: Error) where Failure == Error {
281+
terminateAll(error: error)
241282
}
242283

243284
/// Send a finish to all awaiting iterations.
285+
/// All subsequent calls to `next(_:)` will resume immediately.
244286
public func finish() {
245-
finishAll()
287+
terminateAll()
246288
}
247289

248290
public func makeAsyncIterator() -> Iterator {

Tests/AsyncAlgorithmsTests/TestChannel.swift

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,30 @@ final class TestChannel: XCTestCase {
6363
XCTAssertEqual(collected, expected)
6464
}
6565

66-
func test_asyncThrowingChannel_throws_when_fail_is_called() async {
66+
func test_asyncThrowingChannel_throws_and_discards_additional_sent_values_when_fail_is_called() async {
67+
let sendImmediatelyResumes = expectation(description: "Send immediately resumes after fail")
68+
6769
let channel = AsyncThrowingChannel<String, Error>()
68-
Task {
69-
await channel.fail(Failure())
70-
}
70+
channel.fail(Failure())
71+
7172
var iterator = channel.makeAsyncIterator()
7273
do {
7374
let _ = try await iterator.next()
7475
XCTFail("The AsyncThrowingChannel should have thrown")
7576
} catch {
7677
XCTAssertEqual(error as? Failure, Failure())
7778
}
79+
80+
do {
81+
let pastFailure = try await iterator.next()
82+
XCTAssertNil(pastFailure)
83+
} catch {
84+
XCTFail("The AsyncThrowingChannel should not fail when failure has already been fired")
85+
}
86+
87+
await channel.send("send")
88+
sendImmediatelyResumes.fulfill()
89+
wait(for: [sendImmediatelyResumes], timeout: 1.0)
7890
}
7991

8092
func test_asyncChannel_ends_alls_iterators_and_discards_additional_sent_values_when_finish_is_called() async {
@@ -132,63 +144,6 @@ final class TestChannel: XCTestCase {
132144
wait(for: [additionalSend], timeout: 1.0)
133145
}
134146

135-
func test_asyncChannel_ends_alls_iterators_and_discards_additional_sent_values_when_finish_is_called2() async throws {
136-
let channel = AsyncChannel<String>()
137-
let complete = ManagedCriticalState(false)
138-
let finished = expectation(description: "finished")
139-
140-
let valueFromConsumer1 = ManagedCriticalState<String?>(nil)
141-
let valueFromConsumer2 = ManagedCriticalState<String?>(nil)
142-
143-
let received = expectation(description: "received")
144-
received.expectedFulfillmentCount = 2
145-
146-
let pastEnd = expectation(description: "pastEnd")
147-
pastEnd.expectedFulfillmentCount = 2
148-
149-
Task(priority: .high) {
150-
var iterator = channel.makeAsyncIterator()
151-
let ending = await iterator.next()
152-
valueFromConsumer1.withCriticalRegion { $0 = ending }
153-
received.fulfill()
154-
let item = await iterator.next()
155-
XCTAssertNil(item)
156-
pastEnd.fulfill()
157-
}
158-
159-
Task(priority: .high) {
160-
var iterator = channel.makeAsyncIterator()
161-
let ending = await iterator.next()
162-
valueFromConsumer2.withCriticalRegion { $0 = ending }
163-
received.fulfill()
164-
let item = await iterator.next()
165-
XCTAssertNil(item)
166-
pastEnd.fulfill()
167-
}
168-
169-
try await Task.sleep(nanoseconds: 1_000_000_000)
170-
171-
Task(priority: .low) {
172-
channel.finish()
173-
complete.withCriticalRegion { $0 = true }
174-
finished.fulfill()
175-
}
176-
177-
wait(for: [finished, received], timeout: 1.0)
178-
179-
XCTAssertTrue(complete.withCriticalRegion { $0 })
180-
XCTAssertEqual(valueFromConsumer1.withCriticalRegion { $0 }, nil)
181-
XCTAssertEqual(valueFromConsumer2.withCriticalRegion { $0 }, nil)
182-
183-
wait(for: [pastEnd], timeout: 1.0)
184-
let additionalSend = expectation(description: "additional send")
185-
Task {
186-
await channel.send("test")
187-
additionalSend.fulfill()
188-
}
189-
wait(for: [additionalSend], timeout: 1.0)
190-
}
191-
192147
func test_asyncThrowingChannel_ends_alls_iterators_and_discards_additional_sent_values_when_finish_is_called() async {
193148
let channel = AsyncThrowingChannel<String, Error>()
194149
let complete = ManagedCriticalState(false)

0 commit comments

Comments
 (0)