Skip to content

Commit 223f44a

Browse files
committed
Make body stream writer sendable
Motivation: The body stream writer can be sent across isolation domains so should be sendable. Modifications: - Make it explicitly sendable - Add appropriate preconcurrency annotations - Wrap an iterator from swift-algorithms as it hasn't yet annotated its types with Sendable Result: Body stream writer is sendable
1 parent 086524f commit 223f44a

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

Sources/AsyncHTTPClient/HTTPHandler.swift

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,15 @@ extension HTTPClient {
3232
/// A streaming uploader.
3333
///
3434
/// ``StreamWriter`` abstracts
35-
public struct StreamWriter {
36-
let closure: (IOData) -> EventLoopFuture<Void>
35+
public struct StreamWriter: Sendable {
36+
let closure: @Sendable (IOData) -> EventLoopFuture<Void>
3737

3838
/// Create new ``HTTPClient/Body/StreamWriter``
3939
///
4040
/// - parameters:
4141
/// - closure: function that will be called to write actual bytes to the channel.
42-
public init(closure: @escaping (IOData) -> EventLoopFuture<Void>) {
42+
@preconcurrency
43+
public init(closure: @escaping @Sendable (IOData) -> EventLoopFuture<Void>) {
4344
self.closure = closure
4445
}
4546

@@ -55,15 +56,15 @@ extension HTTPClient {
5556
func writeChunks<Bytes: Collection>(
5657
of bytes: Bytes,
5758
maxChunkSize: Int
58-
) -> EventLoopFuture<Void> where Bytes.Element == UInt8 {
59-
// `StreamWriter` is has design issues, for example
59+
) -> EventLoopFuture<Void> where Bytes.Element == UInt8, Bytes: Sendable, Bytes.SubSequence: Sendable {
60+
// `StreamWriter` has design issues, for example
6061
// - https://github.com/swift-server/async-http-client/issues/194
6162
// - https://github.com/swift-server/async-http-client/issues/264
6263
// - We're not told the EventLoop the task runs on and the user is free to return whatever EL they
6364
// want.
6465
// One important consideration then is that we must lock around the iterator because we could be hopping
6566
// between threads.
66-
typealias Iterator = EnumeratedSequence<ChunksOfCountCollection<Bytes>>.Iterator
67+
typealias Iterator = BodyStreamIterator<Bytes>
6768
typealias Chunk = (offset: Int, element: ChunksOfCountCollection<Bytes>.Element)
6869

6970
func makeIteratorAndFirstChunk(
@@ -77,7 +78,7 @@ extension HTTPClient {
7778
return nil
7879
}
7980

80-
return (NIOLockedValueBox(iterator), chunk)
81+
return (NIOLockedValueBox(BodyStreamIterator(iterator)), chunk)
8182
}
8283

8384
guard let (iterator, chunk) = makeIteratorAndFirstChunk(bytes: bytes) else {
@@ -86,20 +87,19 @@ extension HTTPClient {
8687

8788
@Sendable // can't use closure here as we recursively call ourselves which closures can't do
8889
func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise<Void>) {
89-
if let nextElement = iterator.withLockedValue({ $0.next() }) {
90+
if let (index, element) = iterator.withLockedValue({ $0.next() }) {
9091
self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).map {
91-
let index = nextElement.offset
9292
if (index + 1) % 4 == 0 {
9393
// Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
9494
// mode.
9595
// Also, we must frequently return to the EventLoop because we may get the pause signal
9696
// from another thread. If we fail to do that promptly, we may balloon our body chunks
9797
// into memory.
9898
allDone.futureResult.eventLoop.execute {
99-
writeNextChunk(nextElement, allDone: allDone)
99+
writeNextChunk((offset: index, element: element), allDone: allDone)
100100
}
101101
} else {
102-
writeNextChunk(nextElement, allDone: allDone)
102+
writeNextChunk((offset: index, element: element), allDone: allDone)
103103
}
104104
}.cascadeFailure(to: allDone)
105105
} else {
@@ -188,7 +188,7 @@ extension HTTPClient {
188188
@preconcurrency
189189
@inlinable
190190
public static func bytes<Bytes>(_ bytes: Bytes) -> Body
191-
where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 {
191+
where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.SubSequence: Sendable, Bytes.Element == UInt8 {
192192
Body(contentLength: Int64(bytes.count)) { writer in
193193
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
194194
return writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
@@ -1080,3 +1080,26 @@ extension RequestBodyLength {
10801080
self = .known(length)
10811081
}
10821082
}
1083+
1084+
@usableFromInline
1085+
struct BodyStreamIterator<
1086+
Bytes: Collection
1087+
>: IteratorProtocol, @unchecked Sendable where Bytes.Element == UInt8, Bytes: Sendable {
1088+
// @unchecked: swift-algorithms hasn't adopted Sendable yet. By inspection, the iterator
1089+
// is safe to annotate as sendable.
1090+
@usableFromInline
1091+
typealias Element = (offset: Int, element: Bytes.SubSequence)
1092+
1093+
@usableFromInline
1094+
var _backing: EnumeratedSequence<ChunksOfCountCollection<Bytes>>.Iterator
1095+
1096+
@inlinable
1097+
init(_ backing: EnumeratedSequence<ChunksOfCountCollection<Bytes>>.Iterator) {
1098+
self._backing = backing
1099+
}
1100+
1101+
@inlinable
1102+
mutating func next() -> Element? {
1103+
self._backing.next()
1104+
}
1105+
}

0 commit comments

Comments
 (0)