Skip to content

Make body stream writer sendable #835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 29, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 46 additions & 42 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ extension HTTPClient {
/// A streaming uploader.
///
/// ``StreamWriter`` abstracts
public struct StreamWriter {
let closure: (IOData) -> EventLoopFuture<Void>
public struct StreamWriter: Sendable {
let closure: @Sendable (IOData) -> EventLoopFuture<Void>

/// Create new ``HTTPClient/Body/StreamWriter``
///
/// - parameters:
/// - closure: function that will be called to write actual bytes to the channel.
public init(closure: @escaping (IOData) -> EventLoopFuture<Void>) {
@preconcurrency
public init(closure: @escaping @Sendable (IOData) -> EventLoopFuture<Void>) {
self.closure = closure
}

Expand All @@ -55,8 +56,8 @@ extension HTTPClient {
func writeChunks<Bytes: Collection>(
of bytes: Bytes,
maxChunkSize: Int
) -> EventLoopFuture<Void> where Bytes.Element == UInt8 {
// `StreamWriter` is has design issues, for example
) -> EventLoopFuture<Void> where Bytes.Element == UInt8, Bytes: Sendable {
// `StreamWriter` has design issues, for example
// - https://github.com/swift-server/async-http-client/issues/194
// - https://github.com/swift-server/async-http-client/issues/264
// - We're not told the EventLoop the task runs on and the user is free to return whatever EL they
Expand All @@ -66,49 +67,52 @@ extension HTTPClient {
typealias Iterator = EnumeratedSequence<ChunksOfCountCollection<Bytes>>.Iterator
typealias Chunk = (offset: Int, element: ChunksOfCountCollection<Bytes>.Element)

func makeIteratorAndFirstChunk(
bytes: Bytes
) -> (
iterator: NIOLockedValueBox<Iterator>,
chunk: Chunk
)? {
var iterator = bytes.chunks(ofCount: maxChunkSize).enumerated().makeIterator()
guard let chunk = iterator.next() else {
return nil
// HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
return self.write(.byteBuffer(ByteBuffer())).flatMapWithEventLoop { (_, loop) in
func makeIteratorAndFirstChunk(
bytes: Bytes
) -> (iterator: Iterator, chunk: Chunk)? {
var iterator = bytes.chunks(ofCount: maxChunkSize).enumerated().makeIterator()
guard let chunk = iterator.next() else {
return nil
}

return (iterator, chunk)
}

return (NIOLockedValueBox(iterator), chunk)
}

guard let (iterator, chunk) = makeIteratorAndFirstChunk(bytes: bytes) else {
return self.write(IOData.byteBuffer(.init()))
}
guard let iteratorAndChunk = makeIteratorAndFirstChunk(bytes: bytes) else {
return loop.makeSucceededVoidFuture()
}

@Sendable // can't use closure here as we recursively call ourselves which closures can't do
func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise<Void>) {
if let nextElement = iterator.withLockedValue({ $0.next() }) {
self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).map {
let index = nextElement.offset
if (index + 1) % 4 == 0 {
// Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
// mode.
// Also, we must frequently return to the EventLoop because we may get the pause signal
// from another thread. If we fail to do that promptly, we may balloon our body chunks
// into memory.
allDone.futureResult.eventLoop.execute {
writeNextChunk(nextElement, allDone: allDone)
var iterator = iteratorAndChunk.0
let chunk = iteratorAndChunk.1

// can't use closure here as we recursively call ourselves which closures can't do
func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise<Void>) {
let loop = allDone.futureResult.eventLoop
loop.assertInEventLoop()

if let (index, element) = iterator.next() {
self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).hop(to: loop).assumeIsolated().map
{
if (index + 1) % 4 == 0 {
// Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
// mode.
// Also, we must frequently return to the EventLoop because we may get the pause signal
// from another thread. If we fail to do that promptly, we may balloon our body chunks
// into memory.
allDone.futureResult.eventLoop.assumeIsolated().execute {
writeNextChunk((offset: index, element: element), allDone: allDone)
}
} else {
writeNextChunk((offset: index, element: element), allDone: allDone)
}
} else {
writeNextChunk(nextElement, allDone: allDone)
}
}.cascadeFailure(to: allDone)
} else {
self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).cascade(to: allDone)
}.nonisolated().cascadeFailure(to: allDone)
} else {
self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).cascade(to: allDone)
}
}
}

// HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
return self.write(.byteBuffer(ByteBuffer())).flatMapWithEventLoop { (_, loop) in
let allDone = loop.makePromise(of: Void.self)
writeNextChunk(chunk, allDone: allDone)
return allDone.futureResult
Expand Down
Loading