Skip to content

Commit a79206d

Browse files
fabianfettglbrntt
andauthored
Add an AsyncWriter (#519)
This PR adds an AsyncWriter that we can be used to test our new async upload streaming API. Co-authored-by: George Barnett <[email protected]>
1 parent cc17c9c commit a79206d

File tree

1 file changed

+194
-0
lines changed

1 file changed

+194
-0
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if swift(>=5.5) && canImport(_Concurrency)
16+
import NIOConcurrencyHelpers
17+
import NIOCore
18+
19+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
20+
class AsyncSequenceWriter<Element>: AsyncSequence {
21+
typealias AsyncIterator = Iterator
22+
23+
struct Iterator: AsyncIteratorProtocol {
24+
private let writer: AsyncSequenceWriter<Element>
25+
26+
init(_ writer: AsyncSequenceWriter<Element>) {
27+
self.writer = writer
28+
}
29+
30+
mutating func next() async throws -> Element? {
31+
try await self.writer.next()
32+
}
33+
}
34+
35+
func makeAsyncIterator() -> Iterator {
36+
return Iterator(self)
37+
}
38+
39+
private enum State {
40+
case buffering(CircularBuffer<Element?>, CheckedContinuation<Void, Never>?)
41+
case finished
42+
case waiting(CheckedContinuation<Element?, Error>)
43+
case failed(Error, CheckedContinuation<Void, Never>?)
44+
}
45+
46+
private var _state = State.buffering(.init(), nil)
47+
private let lock = Lock()
48+
49+
public var hasDemand: Bool {
50+
self.lock.withLock {
51+
switch self._state {
52+
case .failed, .finished, .buffering:
53+
return false
54+
case .waiting:
55+
return true
56+
}
57+
}
58+
}
59+
60+
/// Wait until a downstream consumer has issued more demand by calling `next`.
61+
public func demand() async {
62+
self.lock.lock()
63+
64+
switch self._state {
65+
case .buffering(let buffer, .none):
66+
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
67+
self._state = .buffering(buffer, continuation)
68+
self.lock.unlock()
69+
}
70+
71+
case .waiting:
72+
self.lock.unlock()
73+
return
74+
75+
case .buffering(_, .some), .failed(_, .some):
76+
let state = self._state
77+
self.lock.unlock()
78+
preconditionFailure("Already waiting for demand. Invalid state: \(state)")
79+
80+
case .finished, .failed:
81+
let state = self._state
82+
self.lock.unlock()
83+
preconditionFailure("Invalid state: \(state)")
84+
}
85+
}
86+
87+
private func next() async throws -> Element? {
88+
self.lock.lock()
89+
switch self._state {
90+
case .buffering(let buffer, let demandContinuation) where buffer.isEmpty:
91+
return try await withCheckedThrowingContinuation { continuation in
92+
self._state = .waiting(continuation)
93+
self.lock.unlock()
94+
demandContinuation?.resume(returning: ())
95+
}
96+
97+
case .buffering(var buffer, let demandContinuation):
98+
let first = buffer.removeFirst()
99+
if first != nil {
100+
self._state = .buffering(buffer, demandContinuation)
101+
} else {
102+
self._state = .finished
103+
}
104+
self.lock.unlock()
105+
return first
106+
107+
case .failed(let error, let demandContinuation):
108+
self._state = .finished
109+
self.lock.unlock()
110+
demandContinuation?.resume()
111+
throw error
112+
113+
case .finished:
114+
self.lock.unlock()
115+
return nil
116+
117+
case .waiting:
118+
let state = self._state
119+
self.lock.unlock()
120+
preconditionFailure("Expected that there is always only one concurrent call to next. Invalid state: \(state)")
121+
}
122+
}
123+
124+
public func write(_ element: Element) {
125+
self.writeBufferOrEnd(element)
126+
}
127+
128+
public func end() {
129+
self.writeBufferOrEnd(nil)
130+
}
131+
132+
private enum WriteAction {
133+
case succeedContinuation(CheckedContinuation<Element?, Error>, Element?)
134+
case none
135+
}
136+
137+
private func writeBufferOrEnd(_ element: Element?) {
138+
let writeAction = self.lock.withLock { () -> WriteAction in
139+
switch self._state {
140+
case .buffering(var buffer, let continuation):
141+
buffer.append(element)
142+
self._state = .buffering(buffer, continuation)
143+
return .none
144+
145+
case .waiting(let continuation):
146+
self._state = .buffering(.init(), nil)
147+
return .succeedContinuation(continuation, element)
148+
149+
case .finished, .failed:
150+
preconditionFailure("Invalid state: \(self._state)")
151+
}
152+
}
153+
154+
switch writeAction {
155+
case .succeedContinuation(let continuation, let element):
156+
continuation.resume(returning: element)
157+
158+
case .none:
159+
break
160+
}
161+
}
162+
163+
private enum ErrorAction {
164+
case failContinuation(CheckedContinuation<Element?, Error>, Error)
165+
case none
166+
}
167+
168+
/// Drops all buffered writes and emits an error on the waiting `next`. If there is no call to `next`
169+
/// waiting, will emit the error on the next call to `next`.
170+
public func fail(_ error: Error) {
171+
let errorAction = self.lock.withLock { () -> ErrorAction in
172+
switch self._state {
173+
case .buffering(_, let demandContinuation):
174+
self._state = .failed(error, demandContinuation)
175+
return .none
176+
177+
case .failed, .finished:
178+
return .none
179+
180+
case .waiting(let continuation):
181+
self._state = .finished
182+
return .failContinuation(continuation, error)
183+
}
184+
}
185+
186+
switch errorAction {
187+
case .failContinuation(let checkedContinuation, let error):
188+
checkedContinuation.resume(throwing: error)
189+
case .none:
190+
break
191+
}
192+
}
193+
}
194+
#endif

0 commit comments

Comments
 (0)