Skip to content

Commit 17afb54

Browse files
committed
Create platform specific AsyncIO
- Darwin: based on DispatchIO - Linux: based on epoll - Windows (not included in this commit): based on IOCP with OVERLAPPED
1 parent 4211d5f commit 17afb54

File tree

13 files changed

+927
-402
lines changed

13 files changed

+927
-402
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
2323
public typealias Failure = any Swift.Error
2424
public typealias Element = Buffer
2525

26-
#if os(Windows)
27-
internal typealias DiskIO = FileDescriptor
28-
#else
26+
#if canImport(Darwin)
2927
internal typealias DiskIO = DispatchIO
28+
#else
29+
internal typealias DiskIO = FileDescriptor
3030
#endif
3131

3232
@_nonSendable
@@ -47,15 +47,16 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
4747
return self.buffer.removeFirst()
4848
}
4949
// Read more data
50-
let data = try await self.diskIO.read(
51-
upToLength: readBufferSize
50+
let data = try await AsyncIO.shared.read(
51+
from: self.diskIO,
52+
upTo: readBufferSize
5253
)
5354
guard let data else {
5455
// We finished reading. Close the file descriptor now
55-
#if os(Windows)
56-
try self.diskIO.close()
57-
#else
56+
#if canImport(Darwin)
5857
self.diskIO.close()
58+
#else
59+
try self.diskIO.close()
5960
#endif
6061
return nil
6162
}
@@ -130,17 +131,7 @@ extension AsyncBufferSequence {
130131
self.eofReached = true
131132
return nil
132133
}
133-
#if os(Windows)
134-
// Cast data to CodeUnit type
135-
let result = buffer.withUnsafeBytes { ptr in
136-
return Array(
137-
UnsafeBufferPointer<Encoding.CodeUnit>(
138-
start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!,
139-
count: ptr.count / MemoryLayout<Encoding.CodeUnit>.size
140-
)
141-
)
142-
}
143-
#else
134+
#if canImport(Darwin)
144135
// Unfortunately here we _have to_ copy the bytes out because
145136
// DispatchIO (rightfully) reuses buffer, which means `buffer.data`
146137
// has the same address on all iterations, therefore we can't directly
@@ -155,7 +146,16 @@ extension AsyncBufferSequence {
155146
UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount)
156147
)
157148
}
158-
149+
#else
150+
// Cast data to CodeUnit type
151+
let result = buffer.withUnsafeBytes { ptr in
152+
return Array(
153+
UnsafeBufferPointer<Encoding.CodeUnit>(
154+
start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!,
155+
count: ptr.count / MemoryLayout<Encoding.CodeUnit>.size
156+
)
157+
)
158+
}
159159
#endif
160160
return result.isEmpty ? nil : result
161161
}

Sources/Subprocess/Buffer.swift

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,8 @@
1717
extension AsyncBufferSequence {
1818
/// A immutable collection of bytes
1919
public struct Buffer: Sendable {
20-
#if os(Windows)
21-
internal let data: [UInt8]
22-
23-
internal init(data: [UInt8]) {
24-
self.data = data
25-
}
26-
27-
internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
28-
return [.init(data: data)]
29-
}
30-
#else
31-
// We need to keep the backingData alive while _ContiguousBufferView is alive
20+
#if canImport(Darwin)
21+
// We need to keep the backingData alive while Slice is alive
3222
internal let backingData: DispatchData
3323
internal let data: DispatchData._ContiguousBufferView
3424

@@ -45,7 +35,17 @@ extension AsyncBufferSequence {
4535
}
4636
return slices.map{ .init(data: $0, backingData: data) }
4737
}
48-
#endif
38+
#else
39+
internal let data: [UInt8]
40+
41+
internal init(data: [UInt8]) {
42+
self.data = data
43+
}
44+
45+
internal static func createFrom(_ data: [UInt8]) -> [Buffer] {
46+
return [.init(data: data)]
47+
}
48+
#endif // canImport(Darwin)
4949
}
5050
}
5151

@@ -92,26 +92,23 @@ extension AsyncBufferSequence.Buffer {
9292

9393
// MARK: - Hashable, Equatable
9494
extension AsyncBufferSequence.Buffer: Equatable, Hashable {
95-
#if os(Windows)
96-
// Compiler generated conformances
97-
#else
95+
#if canImport(Darwin)
9896
public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool {
99-
return lhs.data.elementsEqual(rhs.data)
97+
return lhs.data == rhs.data
10098
}
10199

102100
public func hash(into hasher: inout Hasher) {
103-
self.data.withUnsafeBytes { ptr in
104-
hasher.combine(bytes: ptr)
105-
}
101+
hasher.combine(self.data)
106102
}
107103
#endif
104+
// else Compiler generated conformances
108105
}
109106

110107
// MARK: - DispatchData.Block
111108
#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl)
112109
extension DispatchData {
113110
/// Unfortunately `DispatchData.Region` is not available on Linux, hence our own wrapper
114-
internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection {
111+
internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection, Hashable {
115112
typealias Element = UInt8
116113

117114
internal let bytes: UnsafeBufferPointer<UInt8>
@@ -127,6 +124,14 @@ extension DispatchData {
127124
return try body(UnsafeRawBufferPointer(self.bytes))
128125
}
129126

127+
internal func hash(into hasher: inout Hasher) {
128+
hasher.combine(bytes: UnsafeRawBufferPointer(self.bytes))
129+
}
130+
131+
internal static func == (lhs: DispatchData._ContiguousBufferView, rhs: DispatchData._ContiguousBufferView) -> Bool {
132+
return lhs.bytes.elementsEqual(rhs.bytes)
133+
}
134+
130135
subscript(position: Int) -> UInt8 {
131136
_read {
132137
yield self.bytes[position]

Sources/Subprocess/Configuration.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,15 +589,13 @@ internal struct TrackedFileDescriptor: ~Copyable {
589589
self.closeWhenDone = closeWhenDone
590590
}
591591

592-
#if os(Windows)
593592
consuming func consumeDiskIO() -> FileDescriptor {
594593
let result = self.fileDescriptor
595594
// Transfer the ownership out and therefor
596595
// don't perform close on deinit
597596
self.closeWhenDone = false
598597
return result
599598
}
600-
#endif
601599

602600
internal mutating func safelyClose() throws {
603601
guard self.closeWhenDone else {

Sources/Subprocess/Error.swift

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ extension SubprocessError {
4141
case failedToWriteToSubprocess
4242
case failedToMonitorProcess
4343
case streamOutputExceedsLimit(Int)
44+
case asyncIOFailed(String)
4445
// Signal
4546
case failedToSendSignal(Int32)
4647
// Windows Only
@@ -67,18 +68,20 @@ extension SubprocessError {
6768
return 5
6869
case .streamOutputExceedsLimit(_):
6970
return 6
70-
case .failedToSendSignal(_):
71+
case .asyncIOFailed(_):
7172
return 7
72-
case .failedToTerminate:
73+
case .failedToSendSignal(_):
7374
return 8
74-
case .failedToSuspend:
75+
case .failedToTerminate:
7576
return 9
76-
case .failedToResume:
77+
case .failedToSuspend:
7778
return 10
78-
case .failedToCreatePipe:
79+
case .failedToResume:
7980
return 11
80-
case .invalidWindowsPath(_):
81+
case .failedToCreatePipe:
8182
return 12
83+
case .invalidWindowsPath(_):
84+
return 13
8285
}
8386
}
8487

@@ -108,6 +111,8 @@ extension SubprocessError: CustomStringConvertible, CustomDebugStringConvertible
108111
return "Failed to monitor the state of child process with underlying error: \(self.underlyingError!)"
109112
case .streamOutputExceedsLimit(let limit):
110113
return "Failed to create output from current buffer because the output limit (\(limit)) was reached."
114+
case .asyncIOFailed(let reason):
115+
return "An error occurred within the AsyncIO subsystem: \(reason). Underlying error: \(self.underlyingError!)"
111116
case .failedToSendSignal(let signal):
112117
return "Failed to send signal \(signal) to the child process."
113118
case .failedToTerminate:

0 commit comments

Comments
 (0)