Skip to content

Commit 1ec5b30

Browse files
committed
Simplify implementation of Dispatch-async bridge
Stress testing this approach over use of a long-lived DispatchIO channel appears to resolve the nondeterministic failures with the file descriptor being destroyed, and is easier to reason about. Closes #21
1 parent ed66ddc commit 1ec5b30

File tree

9 files changed

+67
-152
lines changed

9 files changed

+67
-152
lines changed

Sources/SWBUtil/Dispatch+Async.swift

Lines changed: 18 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -57,114 +57,38 @@ extension DispatchFD {
5757
}
5858
}
5959
}
60-
}
6160

62-
extension AsyncThrowingStream where Element == UInt8, Failure == any Error {
6361
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
6462
@available(macOS, deprecated: 15.0, message: "Use the AsyncSequence-returning overload.")
6563
@available(iOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
6664
@available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
6765
@available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.")
6866
@available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.")
69-
public static func _dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> AsyncThrowingStream<Element, any Error> {
70-
AsyncThrowingStream { continuation in
71-
let newFD: DispatchFD
72-
do {
73-
newFD = try fileDescriptor._duplicate()
74-
} catch {
75-
continuation.finish(throwing: error)
76-
return
77-
}
78-
79-
let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in
80-
do {
81-
try newFD._close()
82-
if error != 0 {
83-
continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#1"))
84-
}
85-
} catch {
86-
continuation.finish(throwing: error)
87-
}
88-
}
89-
io.setLimit(lowWater: 0)
90-
io.setLimit(highWater: 4096)
91-
92-
continuation.onTermination = { termination in
93-
if case .cancelled = termination {
94-
io.close(flags: .stop)
95-
} else {
96-
io.close()
97-
}
98-
}
99-
100-
io.read(offset: 0, length: .max, queue: queue) { done, data, error in
101-
guard error == 0 else {
102-
continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#2"))
103-
return
104-
}
105-
106-
let data = data ?? .empty
107-
for element in data {
108-
continuation.yield(element)
109-
}
110-
111-
if done {
112-
continuation.finish()
67+
public func _dataStream() -> AsyncThrowingStream<SWBDispatchData, any Error> {
68+
AsyncThrowingStream<SWBDispatchData, any Error> {
69+
while !Task.isCancelled {
70+
let chunk = try await readChunk(upToLength: 4096)
71+
if chunk.isEmpty {
72+
return nil
11373
}
74+
return chunk
11475
}
76+
throw CancellationError()
11577
}
11678
}
117-
}
11879

119-
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
120-
extension AsyncSequence where Element == UInt8, Failure == any Error {
12180
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
122-
public static func dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> any AsyncSequence<Element, any Error> {
123-
AsyncThrowingStream<SWBDispatchData, any Error> { continuation in
124-
let newFD: DispatchFD
125-
do {
126-
newFD = try fileDescriptor._duplicate()
127-
} catch {
128-
continuation.finish(throwing: error)
129-
return
130-
}
131-
132-
let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in
133-
do {
134-
try newFD._close()
135-
if error != 0 {
136-
let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#1"
137-
continuation.finish(throwing: POSIXError(error, context: context))
138-
}
139-
} catch {
140-
continuation.finish(throwing: error)
141-
}
142-
}
143-
io.setLimit(lowWater: 0)
144-
io.setLimit(highWater: 4096)
145-
146-
continuation.onTermination = { termination in
147-
if case .cancelled = termination {
148-
io.close(flags: .stop)
149-
} else {
150-
io.close()
151-
}
152-
}
153-
154-
io.read(offset: 0, length: .max, queue: queue) { done, data, error in
155-
guard error == 0 else {
156-
let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#2"
157-
continuation.finish(throwing: POSIXError(error, context: context))
158-
return
159-
}
160-
161-
let data = data ?? .empty
162-
continuation.yield(data)
163-
164-
if done {
165-
continuation.finish()
81+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
82+
public func dataStream() -> some AsyncSequence<SWBDispatchData, any Error> {
83+
AsyncThrowingStream<SWBDispatchData, any Error> {
84+
while !Task.isCancelled {
85+
let chunk = try await readChunk(upToLength: 4096)
86+
if chunk.isEmpty {
87+
return nil
16688
}
89+
return chunk
16790
}
168-
}.flattened
91+
throw CancellationError()
92+
}
16993
}
17094
}

Sources/SWBUtil/FileHandle+Async.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ extension FileHandle {
1919
@available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
2020
@available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.")
2121
@available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.")
22-
public func _bytes(on queue: SWBQueue) -> AsyncThrowingStream<UInt8, any Error> {
23-
._dataStream(reading: DispatchFD(fileHandle: self), on: queue)
22+
public func _bytes() -> AsyncThrowingStream<SWBDispatchData, any Error> {
23+
DispatchFD(fileHandle: self)._dataStream()
2424
}
2525

2626
/// Replacement for `bytes` which uses DispatchIO to avoid blocking the caller.
2727
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
28-
public func bytes(on queue: SWBQueue) -> any AsyncSequence<UInt8, any Error> {
29-
AsyncThrowingStream.dataStream(reading: DispatchFD(fileHandle: self), on: queue)
28+
public func bytes() -> some AsyncSequence<SWBDispatchData, any Error> {
29+
DispatchFD(fileHandle: self).dataStream()
3030
}
3131
}

Sources/SWBUtil/Misc+Async.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@ extension AsyncSequence {
2323
}
2424
}
2525

26+
extension AsyncSequence where Element: RandomAccessCollection {
27+
@inlinable
28+
public func collect() async rethrows -> [Element.Element] {
29+
var items = [Element.Element]()
30+
var it = makeAsyncIterator()
31+
while let e = try await it.next() {
32+
items.append(contentsOf: e)
33+
}
34+
return items
35+
}
36+
}
37+
2638
extension TaskGroup where Element == Void {
2739
/// Concurrency-friendly replacement for `DispatchQueue.concurrentPerform(iterations:execute:)`.
2840
public static func concurrentPerform(iterations: Int, maximumParallelism: Int, execute work: @Sendable @escaping (Int) async -> Element) async {

Sources/SWBUtil/Process+Async.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,20 @@ extension Process {
3535
@available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
3636
@available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.")
3737
@available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.")
38-
public func _makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> AsyncThrowingStream<UInt8, any Error> {
38+
public func _makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> AsyncThrowingStream<SWBDispatchData, any Error> {
3939
precondition(!isRunning) // the pipe setters will raise `NSInvalidArgumentException` anyways
4040
self[keyPath: keyPath] = pipe
41-
return pipe.fileHandleForReading._bytes(on: .global())
41+
return pipe.fileHandleForReading._bytes()
4242
}
4343

4444
/// Returns an ``AsyncStream`` configured to read the standard output or error stream of the process.
4545
///
4646
/// - note: This method will mutate the `standardOutput` or `standardError` property of the Process object, replacing any existing `Pipe` or `FileHandle` which may be set. It must be called before the process is started.
4747
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
48-
public func makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> any AsyncSequence<UInt8, any Error> {
48+
public func makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> some AsyncSequence<SWBDispatchData, any Error> {
4949
precondition(!isRunning) // the pipe setters will raise `NSInvalidArgumentException` anyways
5050
self[keyPath: keyPath] = pipe
51-
return pipe.fileHandleForReading.bytes(on: .global())
51+
return pipe.fileHandleForReading.bytes()
5252
}
5353
}
5454

Sources/SWBUtil/Process.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ extension Process {
103103
let (exitStatus, output) = try await _getOutput(url: url, arguments: arguments, currentDirectoryURL: currentDirectoryURL, environment: environment, interruptible: interruptible) { process in
104104
process.standardOutputPipe = pipe
105105
process.standardErrorPipe = pipe
106-
return pipe.fileHandleForReading.bytes(on: .global())
106+
return pipe.fileHandleForReading.bytes()
107107
} collect: { stream in
108108
try await stream.collect()
109109
}
@@ -115,7 +115,7 @@ extension Process {
115115
let (exitStatus, output) = try await _getOutput(url: url, arguments: arguments, currentDirectoryURL: currentDirectoryURL, environment: environment, interruptible: interruptible) { process in
116116
process.standardOutputPipe = pipe
117117
process.standardErrorPipe = pipe
118-
return pipe.fileHandleForReading._bytes(on: .global())
118+
return pipe.fileHandleForReading._bytes()
119119
} collect: { stream in
120120
try await stream.collect()
121121
}

Sources/SWBUtil/SWBDispatch.swift

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -49,33 +49,6 @@ public struct DispatchFD {
4949
rawValue = fileHandle.fileDescriptor
5050
#endif
5151
}
52-
53-
internal func _duplicate() throws -> DispatchFD {
54-
#if os(Windows)
55-
return self
56-
#else
57-
return try DispatchFD(fileDescriptor: FileDescriptor(rawValue: rawValue).duplicate())
58-
#endif
59-
}
60-
61-
internal func _close() throws {
62-
#if !os(Windows)
63-
try FileDescriptor(rawValue: rawValue).close()
64-
#endif
65-
}
66-
67-
// Only exists to help debug a rare concurrency issue where the file descriptor goes invalid
68-
internal func _filePath() throws -> String {
69-
#if canImport(Darwin)
70-
var buffer = [CChar](repeating: 0, count: Int(MAXPATHLEN))
71-
if fcntl(rawValue, F_GETPATH, &buffer) == -1 {
72-
throw POSIXError(errno, "fcntl", String(rawValue), "F_GETPATH")
73-
}
74-
return String(cString: buffer)
75-
#else
76-
return String()
77-
#endif
78-
}
7952
}
8053

8154
// @unchecked: rdar://130051790 (DispatchData should be Sendable)

Tests/SWBUtilTests/FileHandleTests.swift

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,21 @@ import SystemPackage
3838
let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)
3939
try await fd.closeAfter {
4040
if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) {
41-
var it = fh.bytes(on: .global()).makeAsyncIterator()
41+
var it = fh.bytes().makeAsyncIterator()
4242
var bytesOfFile: [UInt8] = []
4343
await #expect(throws: Never.self) {
44-
while let byte = try await it.next() {
45-
bytesOfFile.append(byte)
44+
while let chunk = try await it.next() {
45+
bytesOfFile.append(contentsOf: chunk)
4646
}
4747
}
4848
#expect(bytesOfFile.count == 1448)
4949
#expect(plist.bytes == bytesOfFile)
5050
} else {
51-
var it = fh._bytes(on: .global()).makeAsyncIterator()
51+
var it = fh._bytes().makeAsyncIterator()
5252
var bytesOfFile: [UInt8] = []
5353
await #expect(throws: Never.self) {
54-
while let byte = try await it.next() {
55-
bytesOfFile.append(byte)
54+
while let chunk = try await it.next() {
55+
bytesOfFile.append(contentsOf: chunk)
5656
}
5757
}
5858
#expect(bytesOfFile.count == 1448)
@@ -72,15 +72,15 @@ import SystemPackage
7272
let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)
7373

7474
if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) {
75-
var it = fh.bytes(on: .global()).makeAsyncIterator()
75+
var it = fh.bytes().makeAsyncIterator()
7676
try fd.close()
7777

7878
await #expect(throws: (any Error).self) {
7979
while let _ = try await it.next() {
8080
}
8181
}
8282
} else {
83-
var it = fh._bytes(on: .global()).makeAsyncIterator()
83+
var it = fh._bytes().makeAsyncIterator()
8484
try fd.close()
8585

8686
await #expect(throws: (any Error).self) {
@@ -99,21 +99,21 @@ import SystemPackage
9999
try await fd.closeAfter {
100100
let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)
101101
if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) {
102-
var it = fh.bytes(on: .global()).makeAsyncIterator()
102+
var it = fh.bytes().makeAsyncIterator()
103103
var bytes: [UInt8] = []
104-
while let byte = try await it.next() {
105-
bytes.append(byte)
106-
if bytes.count == 100 {
104+
while let chunk = try await it.next() {
105+
bytes.append(contentsOf: chunk)
106+
if bytes.count >= 100 {
107107
condition.signal()
108108
throw CancellationError()
109109
}
110110
}
111111
} else {
112-
var it = fh._bytes(on: .global()).makeAsyncIterator()
112+
var it = fh._bytes().makeAsyncIterator()
113113
var bytes: [UInt8] = []
114-
while let byte = try await it.next() {
115-
bytes.append(byte)
116-
if bytes.count == 100 {
114+
while let chunk = try await it.next() {
115+
bytes.append(contentsOf: chunk)
116+
if bytes.count >= 100 {
117117
condition.signal()
118118
throw CancellationError()
119119
}

Tests/SwiftBuildTests/ConsoleCommands/CLIConnection.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ final class CLIConnection {
3232
private let monitorHandle: FileHandle
3333
private let temporaryDirectory: NamedTemporaryDirectory
3434
private let exitPromise: Promise<Processes.ExitStatus, any Error>
35-
private let outputStream: AsyncThrowingStream<UInt8, any Error>
36-
private var outputStreamIterator: AsyncCLIConnectionResponseSequence<AsyncThrowingStream<UInt8, any Error>>.AsyncIterator
35+
private let outputStream: AsyncThrowingStream<SWBDispatchData, any Error>
36+
private var outputStreamIterator: AsyncCLIConnectionResponseSequence<AsyncFlatteningSequence<AsyncThrowingStream<SWBDispatchData, any Error>>>.AsyncIterator
3737

3838
static var swiftbuildToolSearchPaths: [URL] {
3939
var searchPaths: [URL] = []
@@ -138,8 +138,8 @@ final class CLIConnection {
138138
// Close the session handle, so the FD will close once the service stops.
139139
try sessionHandle.close()
140140

141-
outputStream = monitorHandle._bytes(on: .global())
142-
outputStreamIterator = outputStream.cliResponses.makeAsyncIterator()
141+
outputStream = monitorHandle._bytes()
142+
outputStreamIterator = outputStream.flattened.cliResponses.makeAsyncIterator()
143143
#endif
144144
}
145145

@@ -253,6 +253,9 @@ public struct AsyncCLIConnectionResponseSequence<Base: AsyncSequence>: AsyncSequ
253253
// BSDs send EOF, Linux raises EIO...
254254
#if os(Linux) || os(Android)
255255
if error.code == EIO {
256+
if reply.isEmpty {
257+
return nil
258+
}
256259
break
257260
}
258261
#endif
@@ -282,6 +285,9 @@ public struct AsyncCLIConnectionResponseSequence<Base: AsyncSequence>: AsyncSequ
282285
// BSDs send EOF, Linux raises EIO...
283286
#if os(Linux) || os(Android)
284287
if error.code == EIO {
288+
if reply.isEmpty {
289+
return nil
290+
}
285291
break
286292
}
287293
#endif

Tests/SwiftBuildTests/ConsoleCommands/ServiceConsoleTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ fileprivate struct ServiceConsoleTests {
3333
let standardOutput = task._makeStream(for: \.standardOutputPipe, using: outputPipe)
3434
let promise: Promise<Processes.ExitStatus, any Error> = try task.launch()
3535

36-
let data = try await standardOutput.reduce(into: [], { $0.append($1) })
36+
let data = try await standardOutput.reduce(into: [], { $0.append(contentsOf: $1) })
3737
let output = String(decoding: data, as: UTF8.self)
3838

3939
// Verify there were no errors.

0 commit comments

Comments
 (0)