Skip to content

Commit 19827c2

Browse files
committed
Simplify implementation of Dispatch-async bridge
Stress testing this approach over use of a long-lived DispatchIO channel appears to resolve the nondeterministic failures with the file descriptor being destroyed, and is easier to reason about. Closes #21
1 parent 3be34c2 commit 19827c2

File tree

2 files changed

+10
-86
lines changed

2 files changed

+10
-86
lines changed

Sources/SWBUtil/Dispatch+Async.swift

Lines changed: 10 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,9 @@ extension AsyncThrowingStream where Element == UInt8, Failure == any Error {
6868
@available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.")
6969
public static func _dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> AsyncThrowingStream<Element, any Error> {
7070
AsyncThrowingStream { continuation in
71-
let newFD: DispatchFD
72-
do {
73-
newFD = try fileDescriptor._duplicate()
74-
} catch {
75-
continuation.finish(throwing: error)
76-
return
77-
}
78-
79-
let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in
80-
do {
81-
try newFD._close()
82-
if error != 0 {
83-
continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#1"))
84-
}
85-
} catch {
86-
continuation.finish(throwing: error)
71+
let io = SWBDispatchIO.stream(fileDescriptor: fileDescriptor, queue: queue) { error in
72+
if error != 0 {
73+
continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#1"))
8774
}
8875
}
8976
io.setLimit(lowWater: 0)
@@ -120,51 +107,15 @@ extension AsyncThrowingStream where Element == UInt8, Failure == any Error {
120107
extension AsyncSequence where Element == UInt8, Failure == any Error {
121108
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
122109
public static func dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> any AsyncSequence<Element, any Error> {
123-
AsyncThrowingStream<SWBDispatchData, any Error> { continuation in
124-
let newFD: DispatchFD
125-
do {
126-
newFD = try fileDescriptor._duplicate()
127-
} catch {
128-
continuation.finish(throwing: error)
129-
return
130-
}
131-
132-
let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in
133-
do {
134-
try newFD._close()
135-
if error != 0 {
136-
let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#1"
137-
continuation.finish(throwing: POSIXError(error, context: context))
138-
}
139-
} catch {
140-
continuation.finish(throwing: error)
141-
}
142-
}
143-
io.setLimit(lowWater: 0)
144-
io.setLimit(highWater: 4096)
145-
146-
continuation.onTermination = { termination in
147-
if case .cancelled = termination {
148-
io.close(flags: .stop)
149-
} else {
150-
io.close()
151-
}
152-
}
153-
154-
io.read(offset: 0, length: .max, queue: queue) { done, data, error in
155-
guard error == 0 else {
156-
let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#2"
157-
continuation.finish(throwing: POSIXError(error, context: context))
158-
return
159-
}
160-
161-
let data = data ?? .empty
162-
continuation.yield(data)
163-
164-
if done {
165-
continuation.finish()
110+
AsyncThrowingStream<SWBDispatchData, any Error> {
111+
while !Task.isCancelled {
112+
let chunk = try await fileDescriptor.readChunk(upToLength: 4096)
113+
if chunk.isEmpty {
114+
return nil
166115
}
116+
return chunk
167117
}
118+
throw CancellationError()
168119
}.flattened
169120
}
170121
}

Sources/SWBUtil/SWBDispatch.swift

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -49,33 +49,6 @@ public struct DispatchFD {
4949
rawValue = fileHandle.fileDescriptor
5050
#endif
5151
}
52-
53-
internal func _duplicate() throws -> DispatchFD {
54-
#if os(Windows)
55-
return self
56-
#else
57-
return try DispatchFD(fileDescriptor: FileDescriptor(rawValue: rawValue).duplicate())
58-
#endif
59-
}
60-
61-
internal func _close() throws {
62-
#if !os(Windows)
63-
try FileDescriptor(rawValue: rawValue).close()
64-
#endif
65-
}
66-
67-
// Only exists to help debug a rare concurrency issue where the file descriptor goes invalid
68-
internal func _filePath() throws -> String {
69-
#if canImport(Darwin)
70-
var buffer = [CChar](repeating: 0, count: Int(MAXPATHLEN))
71-
if fcntl(rawValue, F_GETPATH, &buffer) == -1 {
72-
throw POSIXError(errno, "fcntl", String(rawValue), "F_GETPATH")
73-
}
74-
return String(cString: buffer)
75-
#else
76-
return String()
77-
#endif
78-
}
7952
}
8053

8154
// @unchecked: rdar://130051790 (DispatchData should be Sendable)

0 commit comments

Comments
 (0)