@@ -4,13 +4,12 @@ use std::{
44 collections:: hash_map,
55 ffi:: OsString ,
66 fmt:: Display ,
7- fs:: { self , File } ,
7+ fs,
88 hash:: Hasher ,
9- io:: { self , BufRead , BufReader , Read , Write } ,
9+ io:: { self , Read , Write } ,
1010 path:: Path ,
11- process:: { Child , Command , Stdio } ,
11+ process:: { Child , ChildStderr , Command , Stdio } ,
1212 sync:: Arc ,
13- thread:: { self , JoinHandle } ,
1413} ;
1514
1615use crate :: { Error , ErrorKind , Object } ;
@@ -41,83 +40,175 @@ impl CargoOutput {
4140 }
4241 }
4342
44- pub ( crate ) fn print_thread ( & self ) -> Result < Option < PrintThread > , Error > {
45- self . warnings . then ( PrintThread :: new) . transpose ( )
43+ fn stdio_for_warnings ( & self ) -> Stdio {
44+ if self . warnings {
45+ Stdio :: piped ( )
46+ } else {
47+ Stdio :: null ( )
48+ }
4649 }
4750}
4851
49- pub ( crate ) struct PrintThread {
50- handle : Option < JoinHandle < ( ) > > ,
51- pipe_writer : Option < File > ,
52+ pub ( crate ) struct StderrForwarder {
53+ inner : Option < ( ChildStderr , Vec < u8 > ) > ,
54+ #[ cfg( feature = "parallel" ) ]
55+ is_non_blocking : bool ,
5256}
5357
54- impl PrintThread {
55- pub ( crate ) fn new ( ) -> Result < Self , Error > {
56- let ( pipe_reader, pipe_writer) = crate :: os_pipe:: pipe ( ) ?;
57-
58- // Capture the standard error coming from compilation, and write it out
59- // with cargo:warning= prefixes. Note that this is a bit wonky to avoid
60- // requiring the output to be UTF-8, we instead just ship bytes from one
61- // location to another.
62- let print = thread:: spawn ( move || {
63- let mut stderr = BufReader :: with_capacity ( 4096 , pipe_reader) ;
64- let mut line = Vec :: with_capacity ( 20 ) ;
65- let stdout = io:: stdout ( ) ;
58+ #[ cfg( all( feature = "parallel" , not( unix) , not( windows) ) ) ]
59+ compile_error ! ( "Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature." ) ;
60+
61+ const MIN_BUFFER_CAPACITY : usize = 100 ;
62+
63+ impl StderrForwarder {
64+ pub ( crate ) fn new ( child : & mut Child ) -> Self {
65+ Self {
66+ inner : child
67+ . stderr
68+ . take ( )
69+ . map ( |stderr| ( stderr, Vec :: with_capacity ( MIN_BUFFER_CAPACITY ) ) ) ,
70+ #[ cfg( feature = "parallel" ) ]
71+ is_non_blocking : false ,
72+ }
73+ }
6674
67- // read_until returns 0 on Eof
68- while stderr. read_until ( b'\n' , & mut line) . unwrap ( ) != 0 {
69- {
70- let mut stdout = stdout. lock ( ) ;
75+ fn forward_available ( & mut self ) -> bool {
76+ if let Some ( ( stderr, buffer) ) = self . inner . as_mut ( ) {
77+ let stdout = io:: stdout ( ) ;
78+ let write_warning = move |line : & [ u8 ] | {
79+ let mut stdout = stdout. lock ( ) ;
80+ stdout. write_all ( b"cargo:warning=" ) . unwrap ( ) ;
81+ stdout. write_all ( line) . unwrap ( ) ;
82+ stdout. write_all ( b"\n " ) . unwrap ( ) ;
83+ } ;
7184
72- stdout. write_all ( b"cargo:warning=" ) . unwrap ( ) ;
73- stdout. write_all ( & line) . unwrap ( ) ;
74- stdout. write_all ( b"\n " ) . unwrap ( ) ;
85+ #[ cfg( all( windows, feature = "parallel" ) ) ]
86+ let is_non_blocking = self . is_non_blocking ;
87+ let mut read_stderr = move |buf : & mut [ u8 ] | -> Result < usize , io:: Error > {
88+ // On Unix, the pipe is non-blocking, so we can just read.
89+ // On Windows, take a peek at the pipe to see if there's data.
90+ #[ cfg( all( windows, feature = "parallel" ) ) ]
91+ if is_non_blocking {
92+ use crate :: windows:: windows_sys:: PeekNamedPipe ;
93+ use std:: os:: windows:: io:: AsRawHandle ;
94+ use std:: ptr:: null_mut;
95+ let mut bytes_available = 0 ;
96+ unsafe {
97+ if PeekNamedPipe (
98+ stderr. as_raw_handle ( ) ,
99+ null_mut ( ) ,
100+ 0 ,
101+ null_mut ( ) ,
102+ & mut bytes_available,
103+ null_mut ( ) ,
104+ ) == 0
105+ {
106+ return Err ( io:: Error :: last_os_error ( ) ) ;
107+ }
108+ }
109+ if bytes_available == 0 {
110+ return Err ( io:: Error :: new (
111+ io:: ErrorKind :: WouldBlock ,
112+ "The pipe is empty" ,
113+ ) ) ;
114+ }
75115 }
76116
77- // read_until does not clear the buffer
78- line. clear ( ) ;
79- }
80- } ) ;
117+ stderr. read ( buf)
118+ } ;
81119
82- Ok ( Self {
83- handle : Some ( print) ,
84- pipe_writer : Some ( pipe_writer) ,
85- } )
120+ loop {
121+ buffer. reserve ( MIN_BUFFER_CAPACITY ) ;
122+
123+ let old_data_end = buffer. len ( ) ;
124+ buffer. resize ( buffer. capacity ( ) , 0 ) ;
125+ match read_stderr ( & mut buffer[ old_data_end..] ) {
126+ Err ( err) if err. kind ( ) == std:: io:: ErrorKind :: WouldBlock => {
127+ // No data currently, yield back.
128+ buffer. truncate ( old_data_end) ;
129+ return false ;
130+ }
131+ Err ( err) if err. kind ( ) == std:: io:: ErrorKind :: Interrupted => {
132+ // Interrupted, try again.
133+ buffer. truncate ( old_data_end) ;
134+ }
135+ Ok ( 0 ) | Err ( _) => {
136+ // End of stream: flush remaining data and bail.
137+ if old_data_end > 0 {
138+ write_warning ( & buffer[ ..old_data_end] ) ;
139+ }
140+ return true ;
141+ }
142+ Ok ( bytes_read) => {
143+ buffer. truncate ( old_data_end + bytes_read) ;
144+ let mut consumed = 0 ;
145+ for line in buffer. split_inclusive ( |& b| b == b'\n' ) {
146+ // Only forward complete lines, leave the rest in the buffer.
147+ if let Some ( ( b'\n' , line) ) = line. split_last ( ) {
148+ consumed += line. len ( ) + 1 ;
149+ write_warning ( line) ;
150+ }
151+ }
152+ buffer. drain ( ..consumed) ;
153+ }
154+ }
155+ }
156+ } else {
157+ true
158+ }
86159 }
87160
88- /// # Panics
89- ///
90- /// Will panic if the pipe writer has already been taken.
91- pub ( crate ) fn take_pipe_writer ( & mut self ) -> File {
92- self . pipe_writer . take ( ) . unwrap ( )
93- }
161+ #[ cfg( feature = "parallel" ) ]
162+ pub ( crate ) fn set_non_blocking ( & mut self ) -> Result < ( ) , Error > {
163+ assert ! ( !self . is_non_blocking) ;
164+
165+ // On Unix, switch the pipe to non-blocking mode.
166+ // On Windows, we have a different way to be non-blocking.
167+ #[ cfg( unix) ]
168+ if let Some ( ( stderr, _) ) = self . inner . as_mut ( ) {
169+ use std:: os:: unix:: io:: AsRawFd ;
170+ let fd = stderr. as_raw_fd ( ) ;
171+ let flags = unsafe { libc:: fcntl ( fd, libc:: F_GETFL , 0 ) } ;
172+ if flags < 0 {
173+ return Err ( Error :: new (
174+ ErrorKind :: IOError ,
175+ format ! (
176+ "Failed to get flags for child stderr: {}" ,
177+ io:: Error :: last_os_error( )
178+ ) ,
179+ ) ) ;
180+ }
94181
95- /// # Panics
96- ///
97- /// Will panic if the pipe writer has already been taken.
98- pub ( crate ) fn clone_pipe_writer ( & self ) -> Result < File , Error > {
99- self . try_clone_pipe_writer ( ) . map ( Option :: unwrap)
100- }
182+ if unsafe { libc:: fcntl ( fd, libc:: F_SETFL , flags | libc:: O_NONBLOCK ) } != 0 {
183+ return Err ( Error :: new (
184+ ErrorKind :: IOError ,
185+ format ! (
186+ "Failed to set flags for child stderr: {}" ,
187+ io:: Error :: last_os_error( )
188+ ) ,
189+ ) ) ;
190+ }
191+ }
101192
102- pub ( crate ) fn try_clone_pipe_writer ( & self ) -> Result < Option < File > , Error > {
103- self . pipe_writer
104- . as_ref ( )
105- . map ( File :: try_clone)
106- . transpose ( )
107- . map_err ( From :: from)
193+ self . is_non_blocking = true ;
194+ Ok ( ( ) )
108195 }
109- }
110196
111- impl Drop for PrintThread {
112- fn drop ( & mut self ) {
113- // Drop pipe_writer first to avoid deadlock
114- self . pipe_writer . take ( ) ;
197+ # [ cfg ( feature = "parallel" ) ]
198+ fn forward_all ( & mut self ) {
199+ while ! self . forward_available ( ) { }
200+ }
115201
116- self . handle . take ( ) . unwrap ( ) . join ( ) . unwrap ( ) ;
202+ #[ cfg( not( feature = "parallel" ) ) ]
203+ fn forward_all ( & mut self ) {
204+ let forward_result = self . forward_available ( ) ;
205+ assert ! ( forward_result, "Should have consumed all data" ) ;
117206 }
118207}
119208
120209fn wait_on_child ( cmd : & Command , program : & str , child : & mut Child ) -> Result < ( ) , Error > {
210+ StderrForwarder :: new ( child) . forward_all ( ) ;
211+
121212 let status = match child. wait ( ) {
122213 Ok ( s) => s,
123214 Err ( e) => {
@@ -193,20 +284,13 @@ pub(crate) fn objects_from_files(files: &[Arc<Path>], dst: &Path) -> Result<Vec<
193284 Ok ( objects)
194285}
195286
196- fn run_inner ( cmd : & mut Command , program : & str , pipe_writer : Option < File > ) -> Result < ( ) , Error > {
197- let mut child = spawn ( cmd, program, pipe_writer) ?;
198- wait_on_child ( cmd, program, & mut child)
199- }
200-
201287pub ( crate ) fn run (
202288 cmd : & mut Command ,
203289 program : & str ,
204- print : Option < & PrintThread > ,
290+ cargo_output : & CargoOutput ,
205291) -> Result < ( ) , Error > {
206- let pipe_writer = print. map ( PrintThread :: clone_pipe_writer) . transpose ( ) ?;
207- run_inner ( cmd, program, pipe_writer) ?;
208-
209- Ok ( ( ) )
292+ let mut child = spawn ( cmd, program, cargo_output) ?;
293+ wait_on_child ( cmd, program, & mut child)
210294}
211295
212296pub ( crate ) fn run_output (
@@ -216,12 +300,7 @@ pub(crate) fn run_output(
216300) -> Result < Vec < u8 > , Error > {
217301 cmd. stdout ( Stdio :: piped ( ) ) ;
218302
219- let mut print = cargo_output. print_thread ( ) ?;
220- let mut child = spawn (
221- cmd,
222- program,
223- print. as_mut ( ) . map ( PrintThread :: take_pipe_writer) ,
224- ) ?;
303+ let mut child = spawn ( cmd, program, cargo_output) ?;
225304
226305 let mut stdout = vec ! [ ] ;
227306 child
@@ -239,7 +318,7 @@ pub(crate) fn run_output(
239318pub ( crate ) fn spawn (
240319 cmd : & mut Command ,
241320 program : & str ,
242- pipe_writer : Option < File > ,
321+ cargo_output : & CargoOutput ,
243322) -> Result < Child , Error > {
244323 struct ResetStderr < ' cmd > ( & ' cmd mut Command ) ;
245324
@@ -254,10 +333,7 @@ pub(crate) fn spawn(
254333 println ! ( "running: {:?}" , cmd) ;
255334
256335 let cmd = ResetStderr ( cmd) ;
257- let child = cmd
258- . 0
259- . stderr ( pipe_writer. map_or_else ( Stdio :: null, Stdio :: from) )
260- . spawn ( ) ;
336+ let child = cmd. 0 . stderr ( cargo_output. stdio_for_warnings ( ) ) . spawn ( ) ;
261337 match child {
262338 Ok ( child) => Ok ( child) ,
263339 Err ( ref e) if e. kind ( ) == io:: ErrorKind :: NotFound => {
@@ -307,9 +383,14 @@ pub(crate) fn try_wait_on_child(
307383 program : & str ,
308384 child : & mut Child ,
309385 stdout : & mut dyn io:: Write ,
386+ stderr_forwarder : & mut StderrForwarder ,
310387) -> Result < Option < ( ) > , Error > {
388+ stderr_forwarder. forward_available ( ) ;
389+
311390 match child. try_wait ( ) {
312391 Ok ( Some ( status) ) => {
392+ stderr_forwarder. forward_all ( ) ;
393+
313394 let _ = writeln ! ( stdout, "{}" , status) ;
314395
315396 if status. success ( ) {
@@ -325,12 +406,15 @@ pub(crate) fn try_wait_on_child(
325406 }
326407 }
327408 Ok ( None ) => Ok ( None ) ,
328- Err ( e) => Err ( Error :: new (
329- ErrorKind :: ToolExecError ,
330- format ! (
331- "Failed to wait on spawned child process, command {:?} with args {:?}: {}." ,
332- cmd, program, e
333- ) ,
334- ) ) ,
409+ Err ( e) => {
410+ stderr_forwarder. forward_all ( ) ;
411+ Err ( Error :: new (
412+ ErrorKind :: ToolExecError ,
413+ format ! (
414+ "Failed to wait on spawned child process, command {:?} with args {:?}: {}." ,
415+ cmd, program, e
416+ ) ,
417+ ) )
418+ }
335419 }
336420}
0 commit comments