Skip to content

Commit 9d0eeaa

Browse files
committed
Fixed hang when cat long files
1 parent fb53fe9 commit 9d0eeaa

File tree

7 files changed

+77
-65
lines changed

7 files changed

+77
-65
lines changed

Sources/FoundationEssentials/Subprocess/Platforms/Subprocess+Unix.swift

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import SystemPackage
2626
package import _CShims
2727
#endif
2828

29+
import Dispatch
30+
2931

3032
// MARK: - Signals
3133
extension Subprocess {
@@ -270,17 +272,40 @@ extension Subprocess.Configuration {
270272
@Sendable
271273
internal func monitorProcessTermination(
272274
forProcessWithIdentifier pid: Subprocess.ProcessIdentifier
273-
) -> Subprocess.TerminationStatus {
274-
var status: Int32 = -1
275-
// Block and wait
276-
waitpid(pid.value, &status, 0)
277-
if _was_process_exited(status) != 0 {
278-
return .exit(_get_exit_code(status))
275+
) async -> Subprocess.TerminationStatus {
276+
return await withCheckedContinuation { continuation in
277+
let source = DispatchSource.makeProcessSource(
278+
identifier: pid.value,
279+
eventMask: [.exit, .signal]
280+
)
281+
source.setEventHandler {
282+
source.cancel()
283+
var status: Int32 = -1
284+
waitpid(pid.value, &status, WNOHANG)
285+
if _was_process_exited(status) != 0 {
286+
continuation.resume(returning: .exit(_get_exit_code(status)))
287+
return
288+
}
289+
if _was_process_signaled(status) != 0 {
290+
continuation.resume(returning: .unhandledException(_get_signal_code(status)))
291+
return
292+
}
293+
fatalError("Unexpected exit status type: \(status)")
294+
}
295+
source.resume()
279296
}
280-
if _was_process_signaled(status) != 0 {
281-
return .unhandledException(_get_signal_code(status))
297+
}
298+
299+
// MARK: - Read Buffer Size
300+
extension Subprocess {
301+
@inline(__always)
302+
internal static var readBufferSize: Int {
303+
#if canImport(Darwin)
304+
return 16384
305+
#else
306+
return Platform.pageSize
307+
#endif // canImport(Darwin)
282308
}
283-
fatalError("Unexpected exit status type: \(status)")
284309
}
285310

286311
#endif // canImport(Darwin) || canImport(Glibc)

Sources/FoundationEssentials/Subprocess/Subprocess+AsyncBytes.swift

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@ import Dispatch
1414

1515
extension Subprocess {
1616
public struct AsyncBytes: AsyncSequence, Sendable {
17-
@inline(__always) static var bufferSize: Int {
18-
16384
19-
}
2017
public typealias Element = UInt8
2118

2219
@_nonSendable
@@ -42,9 +39,9 @@ extension Subprocess {
4239
try Task.checkCancellation()
4340
do {
4441
self.buffer = try await self.fileDescriptor.read(
45-
upToLength: AsyncBytes.bufferSize)
42+
upToLength: Subprocess.readBufferSize)
4643
self.currentPosition = 0
47-
if self.buffer.count < AsyncBytes.bufferSize {
44+
if self.buffer.count < Subprocess.readBufferSize {
4845
self.finished = true
4946
}
5047
} catch {

Sources/FoundationEssentials/Subprocess/Subprocess+Configuration.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,13 @@ extension Subprocess {
201201
return try await withTaskCancellationHandler {
202202
return try await withThrowingTaskGroup(of: RunState<R>.self) { group in
203203
group.addTask {
204-
let status = monitorProcessTermination(
204+
let status = await monitorProcessTermination(
205205
forProcessWithIdentifier: process.processIdentifier)
206206
return .monitorChildProcess(status)
207207
}
208208
group.addTask {
209209
do {
210-
let result = try await body(process, .init(fileDescriptor: writeFd))
210+
let result = try await body(process, .init(input: executionInput))
211211
try self.cleanup(
212212
process: process,
213213
childSide: false,
@@ -276,7 +276,7 @@ extension Subprocess {
276276
return try await withTaskCancellationHandler {
277277
return try await withThrowingTaskGroup(of: RunState<R>.self) { group in
278278
group.addTask {
279-
let status = monitorProcessTermination(
279+
let status = await monitorProcessTermination(
280280
forProcessWithIdentifier: process.processIdentifier)
281281
return .monitorChildProcess(status)
282282
}

Sources/FoundationEssentials/Subprocess/Subprocess+IO.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ extension Subprocess {
130130

131131
// MARK: - Execution IO
132132
extension Subprocess {
133-
internal final class ExecutionInput {
134-
135-
internal enum Storage {
133+
internal final class ExecutionInput: Sendable {
134+
135+
internal enum Storage: Sendable {
136136
case noInput(FileDescriptor?)
137137
case customWrite(FileDescriptor?, FileDescriptor?)
138138
case fileDescriptor(FileDescriptor?, Bool)
@@ -145,7 +145,7 @@ extension Subprocess {
145145
}
146146

147147
internal func getReadFileDescriptor() -> FileDescriptor? {
148-
return self.storage.withLock { $0
148+
return self.storage.withLock {
149149
switch $0 {
150150
case .noInput(let readFd):
151151
return readFd
@@ -196,8 +196,8 @@ extension Subprocess {
196196
// The parent fd should have been closed
197197
// in the `body` when writer.finish() is called
198198
// But in case it isn't call it agian
199-
try readFd?.close()
200-
$0 = .customWrite(nil, writeFd)
199+
try writeFd?.close()
200+
$0 = .customWrite(readFd, nil)
201201
}
202202
}
203203
}

Sources/FoundationEssentials/Subprocess/Subprocess.swift

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -63,57 +63,38 @@ public struct Subprocess: Sendable {
6363

6464
// MARK: - StandardInputWriter
6565
extension Subprocess {
66-
internal actor StandardInputWriterActor {
67-
private let fileDescriptor: FileDescriptor
66+
@_nonSendable
67+
public struct StandardInputWriter {
6868

69-
internal init(fileDescriptor: FileDescriptor) {
70-
self.fileDescriptor = fileDescriptor
71-
}
72-
73-
@discardableResult
74-
public func write<S>(_ sequence: S) async throws -> Int where S : Sequence, S.Element == UInt8 {
75-
return try self.fileDescriptor.writeAll(sequence)
76-
}
77-
78-
@discardableResult
79-
public func write<S: AsyncSequence>(_ asyncSequence: S) async throws -> Int where S.Element == UInt8 {
80-
let sequence = try await Array(asyncSequence)
81-
return try self.fileDescriptor.writeAll(sequence)
82-
}
83-
84-
public func finish() async throws {
85-
try self.fileDescriptor.close()
86-
}
87-
}
69+
private let input: ExecutionInput
8870

89-
public struct StandardInputWriter: Sendable {
90-
91-
private let actor: StandardInputWriterActor
92-
93-
init(fileDescriptor: FileDescriptor) {
94-
self.actor = StandardInputWriterActor(fileDescriptor: fileDescriptor)
71+
init(input: ExecutionInput) {
72+
self.input = input
9573
}
9674

9775
public func write<S>(_ sequence: S) async throws where S : Sequence, S.Element == UInt8 {
98-
try await self.actor.write(sequence)
76+
guard let fd: FileDescriptor = self.input.getWriteFileDescriptor() else {
77+
fatalError("Attempting to write to a file descriptor that's already closed")
78+
}
79+
try await fd.write(sequence)
9980
}
10081

10182
public func write<S>(_ sequence: S) async throws where S : Sequence, S.Element == CChar {
102-
try await self.actor.write(sequence.map { UInt8($0) })
83+
try await self.write(sequence.map { UInt8($0) })
10384
}
10485

10586
public func write<S: AsyncSequence>(_ asyncSequence: S) async throws where S.Element == CChar {
10687
let sequence = try await Array(asyncSequence).map { UInt8($0) }
107-
try await self.actor.write(sequence)
88+
try await self.write(sequence)
10889
}
10990

11091
public func write<S: AsyncSequence>(_ asyncSequence: S) async throws where S.Element == UInt8 {
11192
let sequence = try await Array(asyncSequence)
112-
try await self.actor.write(sequence)
93+
try await self.write(sequence)
11394
}
11495

11596
public func finish() async throws {
116-
try await self.actor.finish()
97+
try self.input.closeParentSide()
11798
}
11899
}
119100
}
@@ -153,23 +134,33 @@ extension Subprocess.Result: Equatable where T : Equatable {}
153134

154135
extension Subprocess.Result: Hashable where T : Hashable {}
155136

156-
extension POSIXError : Swift.Error {}
157-
158137
// MARK: Internal
159138
extension Subprocess {
160139
internal enum OutputCapturingState {
161140
case standardOutputCaptured(Data?)
162141
case standardErrorCaptured(Data?)
163142
}
164-
143+
144+
private func capture(fileDescriptor: FileDescriptor, maxLength: Int) async throws -> Data{
145+
let chunkSize: Int = min(Subprocess.readBufferSize, maxLength)
146+
var buffer: [UInt8] = []
147+
while buffer.count < maxLength {
148+
let captured = try await fileDescriptor.read(upToLength: chunkSize)
149+
buffer += captured
150+
if captured.count < chunkSize {
151+
break
152+
}
153+
}
154+
return Data(buffer)
155+
}
156+
165157
internal func captureStandardOutput() async throws -> Data? {
166158
guard let (limit, readFd) = self.executionOutput
167159
.consumeCollectedFileDescriptor(),
168160
let readFd = readFd else {
169161
return nil
170162
}
171-
let captured = try await readFd.read(upToLength: limit)
172-
return Data(captured)
163+
return try await self.capture(fileDescriptor: readFd, maxLength: limit)
173164
}
174165

175166
internal func captureStandardError() async throws -> Data? {
@@ -178,8 +169,7 @@ extension Subprocess {
178169
let readFd = readFd else {
179170
return nil
180171
}
181-
let captured = try await readFd.read(upToLength: limit)
182-
return Data(captured)
172+
return try await self.capture(fileDescriptor: readFd, maxLength: limit)
183173
}
184174

185175
internal func captureIOs() async throws -> (standardOut: Data?, standardError: Data?) {

Sources/TestSupport/TestSupport.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public typealias FloatingPointFormatStyle = Foundation.FloatingPointFormatStyle
4949
public typealias NumberFormatStyleConfiguration = Foundation.NumberFormatStyleConfiguration
5050
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
5151
public typealias CurrencyFormatStyleConfiguration = Foundation.CurrencyFormatStyleConfiguration
52-
public typealias DiscreteFormatStyle = Foundation.DiscreteFormatStyle
5352

5453
@available(FoundationPreview 0.4, *)
5554
public typealias DiscreteFormatStyle = Foundation.DiscreteFormatStyle
@@ -148,7 +147,6 @@ public typealias FloatingPointFormatStyle = FoundationInternationalization.Float
148147
public typealias NumberFormatStyleConfiguration = FoundationInternationalization.NumberFormatStyleConfiguration
149148
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
150149
public typealias CurrencyFormatStyleConfiguration = FoundationInternationalization.CurrencyFormatStyleConfiguration
151-
public typealias DiscreteFormatStyle = FoundationEssentials.DiscreteFormatStyle
152150

153151
@available(FoundationPreview 0.4, *)
154152
public typealias DiscreteFormatStyle = FoundationEssentials.DiscreteFormatStyle

Tests/FoundationEssentialsTests/SubprocessTests.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ final class SubprocessTests: XCTestCase {
2525
let result = String(data: ls.standardOutput!, encoding: .utf8)!
2626
XCTAssert(ls.terminationStatus.isSuccess)
2727
XCTAssert(!result.isEmpty)
28+
print(result)
2829
}
2930

3031
func testLongText() async throws {
3132
let cat = try await Subprocess.run(
3233
executing: .named("cat"),
3334
arguments: ["/Users/icharleshu/Downloads/PaP.txt"],
34-
output: .collect(limit: 1024 * 1024)
35+
output: .collect(limit: 1024 * 1024),
36+
error: .discard
3537
)
3638
print("after")
3739
print("Result: \(cat.standardOutput?.count ?? -1)")

0 commit comments

Comments
 (0)