Skip to content

Commit f153b91

Browse files
committed
Update AsyncProcess output streaming on Windows
1 parent ba4bc64 commit f153b91

File tree

1 file changed

+70
-12
lines changed

1 file changed

+70
-12
lines changed

Sources/Basics/Concurrency/AsyncProcess.swift

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -514,15 +514,18 @@ package final class AsyncProcess {
514514
if self.outputRedirection.redirectsOutput {
515515
let stdoutPipe = Pipe()
516516
let stderrPipe = Pipe()
517+
let stdoutStream = DispatchFD(fileHandle: stdoutPipe.fileHandleForReading).dataStream()
518+
let stderrStream = DispatchFD(fileHandle: stderrPipe.fileHandleForReading).dataStream()
517519

518520
group.enter()
519-
stdoutPipe.fileHandleForReading.readabilityHandler = { (fh: FileHandle) in
520-
let data = (try? fh.read(upToCount: Int.max)) ?? Data()
521-
if data.count == 0 {
522-
stdoutPipe.fileHandleForReading.readabilityHandler = nil
521+
Task {
522+
defer {
523+
print("--- finished consuming stdout ---")
523524
group.leave()
524-
} else {
525-
let contents = data.withUnsafeBytes { [UInt8]($0) }
525+
}
526+
print("--- started consuming stdout ---")
527+
for try await data in stdoutStream {
528+
let contents = [UInt8](data)
526529
self.outputRedirection.outputClosures?.stdoutClosure(contents)
527530
stdoutLock.withLock {
528531
stdout += contents
@@ -531,13 +534,14 @@ package final class AsyncProcess {
531534
}
532535

533536
group.enter()
534-
stderrPipe.fileHandleForReading.readabilityHandler = { (fh: FileHandle) in
535-
let data = (try? fh.read(upToCount: Int.max)) ?? Data()
536-
if data.count == 0 {
537-
stderrPipe.fileHandleForReading.readabilityHandler = nil
537+
Task {
538+
defer {
539+
print("--- finished consuming stderr ---")
538540
group.leave()
539-
} else {
540-
let contents = data.withUnsafeBytes { [UInt8]($0) }
541+
}
542+
print("--- started consuming stderr ---")
543+
for try await data in stderrStream {
544+
let contents = [UInt8](data)
541545
self.outputRedirection.outputClosures?.stderrClosure(contents)
542546
stderrLock.withLock {
543547
stderr += contents
@@ -557,6 +561,7 @@ package final class AsyncProcess {
557561
}
558562

559563
group.notify(queue: self.completionQueue) {
564+
print("--- notified that output is ready ---")
560565
self.stateLock.withLock {
561566
self.state = .outputReady(stdout: .success(stdout), stderr: .success(stderr))
562567
}
@@ -820,6 +825,7 @@ package final class AsyncProcess {
820825
/// Executes the process I/O state machine, calling completion block when finished.
821826
private func waitUntilExit(_ completion: @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void) {
822827
self.stateLock.lock()
828+
print("--- waitUntilExit called: \(self.state) ---")
823829
switch self.state {
824830
case .idle:
825831
defer { self.stateLock.unlock() }
@@ -832,7 +838,9 @@ package final class AsyncProcess {
832838
completion(.failure(error))
833839
case .readingOutput(let sync):
834840
self.stateLock.unlock()
841+
print("--- queing up waitUntilExit block ---")
835842
sync.notify(queue: self.completionQueue) {
843+
print("--- was notified we should enter waitUntilExit again ---")
836844
self.waitUntilExit(completion)
837845
}
838846
case .outputReady(let stdoutResult, let stderrResult):
@@ -841,7 +849,9 @@ package final class AsyncProcess {
841849
#if os(Windows)
842850
precondition(self._process != nil, "The process is not yet launched.")
843851
let p = self._process!
852+
self.stateLock.unlock()
844853
p.waitUntilExit()
854+
self.stateLock.lock()
845855
let exitStatusCode = p.terminationStatus
846856
let normalExit = p.terminationReason == .exit
847857
#else
@@ -1354,3 +1364,51 @@ extension FileHandle: WritableByteStream {
13541364
}
13551365
}
13561366
#endif
1367+
1368+
extension DispatchFD {
1369+
public func readChunk(upToLength maxLength: Int) async throws -> DispatchData {
1370+
return try await withCheckedThrowingContinuation { continuation in
1371+
DispatchIO.read(fromFileDescriptor: numericCast(self.rawValue), maxLength: maxLength, runningHandlerOn: DispatchQueue.global())
1372+
{ data, error in
1373+
if error != 0 {
1374+
continuation.resume(throwing: StringError("POSIX error: \(error)"))
1375+
return
1376+
}
1377+
continuation.resume(returning: data)
1378+
}
1379+
}
1380+
1381+
}
1382+
1383+
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
1384+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
1385+
public func dataStream() -> some AsyncSequence<DispatchData, any Error> {
1386+
AsyncThrowingStream<DispatchData, any Error> {
1387+
while !Task.isCancelled {
1388+
let chunk = try await readChunk(upToLength: 4096)
1389+
if chunk.isEmpty {
1390+
return nil
1391+
}
1392+
return chunk
1393+
}
1394+
throw CancellationError()
1395+
}
1396+
}
1397+
}
1398+
1399+
public struct DispatchFD {
1400+
#if os(Windows)
1401+
fileprivate let rawValue: Int
1402+
#else
1403+
fileprivate let rawValue: Int32
1404+
#endif
1405+
1406+
init(fileHandle: FileHandle) {
1407+
#if os(Windows)
1408+
// This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
1409+
rawValue = .init(bitPattern: fileHandle._handle)
1410+
#else
1411+
rawValue = fileHandle.fileDescriptor
1412+
#endif
1413+
}
1414+
}

0 commit comments

Comments
 (0)