Skip to content

Commit 872871d

Browse files
: stdio redirection (#900)
Summary: rust startup code (code that runs before `main`) changes the disposition for `SIGPIPE` such that it is silently ignored (that is, runs `signal(Signal::SIGPIPE, SigHandler::SigIgn)` or equivalent). this behavior introduced in 2014, is poorly documented but see rust-lang/rust#62569. a task spawned in `hyperactor::signal_handler::GlobalSignalManager::new` creates an async signal listener using `signal-hook-tokio` crate. it watches for `SIGINT` and `SIGTERM` and on receiving one, executes cleanup code before removing the hooks and re-raising the signals in order to restore and execute the default behaviors (process termination). that signal handling code includes logging calls via `tracing::info!()` and `tracing::error!()`. the problem is, if `SIGTERM` (say) is being handled by an orphan, the earlier death of the parent can mean the orphan's stdout/stderr pipes are closed. normally, writing to a closed pipe would result in signalling `SIGPIPE` and process termination but here a logging call results in an infinite uninterruptible sleep, hanging the process preventing it from shutting down. this diff adds a call to a newly developed function `stdio_redirect::handle_broken_pipes()` which detects this condition and redirects stdio to a file (named derived from the process ID - e.g. `monarch-process-exit-3529266.log`). in our testing so far, this overcomes hangs allowing processes to terminate and write logs normally as it does so. this check will still race with pipe closure though so perhaps we should do something like this (e.g. redirect to `/dev/null` if not this) and avoid doing IO completely during signal handling? Reviewed By: mariusae Differential Revision: D80366985
1 parent 5d0a065 commit 872871d

File tree

4 files changed

+215
-1
lines changed

4 files changed

+215
-1
lines changed

hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ unicode-ident = "1.0.12"
7070

7171
[dev-dependencies]
7272
maplit = "1.0"
73+
tempfile = "3.15"
7374
timed_test = { version = "0.0.0", path = "../timed_test" }
7475
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
7576
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }

hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ pub mod proc;
8686
pub mod reference;
8787
mod signal_handler;
8888
pub mod simnet;
89+
mod stdio_redirect;
8990
pub mod supervision;
9091
pub mod sync;
9192
/// Test utilities

hyperactor/src/signal_handler.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
* LICENSE file in the root directory of this source tree.
77
*/
88

9+
#![cfg(unix)]
10+
911
use std::collections::HashMap;
1012
use std::fmt;
1113
use std::future::Future;
@@ -130,6 +132,11 @@ impl GlobalSignalManager {
130132
signal_hook_tokio::Signals::new([signal::SIGINT as i32, signal::SIGTERM as i32])
131133
{
132134
if let Some(signal) = signals.next().await {
135+
// If parent died, stdout/stderr are broken pipes
136+
// that cause uninterruptible sleep on write.
137+
// Detect and redirect to file to prevent hanging.
138+
crate::stdio_redirect::handle_broken_pipes();
139+
133140
tracing::info!("received signal: {}", signal);
134141

135142
get_signal_manager().execute_all_cleanups().await;
@@ -178,7 +185,11 @@ impl GlobalSignalManager {
178185
.lock()
179186
.unwrap_or_else(|e| e.into_inner());
180187
callbacks.insert(id, callback);
181-
tracing::info!("registered signal cleanup callback with ID: {}", id);
188+
tracing::info!(
189+
"process {} registered signal cleanup callback with ID: {}",
190+
std::process::id(),
191+
id
192+
);
182193
id
183194
}
184195

hyperactor/src/stdio_redirect.rs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#![cfg(unix)]
10+
11+
use std::fs::OpenOptions;
12+
use std::os::fd::BorrowedFd;
13+
use std::os::unix::io::AsRawFd;
14+
15+
use anyhow::Context;
16+
use nix::libc::STDERR_FILENO;
17+
use nix::libc::STDOUT_FILENO;
18+
use nix::poll::PollFd;
19+
use nix::poll::PollFlags;
20+
use nix::poll::poll;
21+
22+
/// This probe is invasive. It requires writing bytes. We compile it
23+
/// out by default in favor of [`fd_looks_broken`].
24+
#[cfg(feature = "stdio-write-probe")]
25+
pub(crate) fn is_stdout_broken_write_probe() -> bool {
26+
use std::os::fd::BorrowedFd;
27+
28+
use nix::errno::Errno;
29+
use nix::unistd::write;
30+
let fd = unsafe { BorrowedFd::borrow_raw(nix::libc::STDOUT_FILENO) };
31+
matches!(write(fd, b"\0"), Err(Errno::EPIPE | Errno::EBADF))
32+
}
33+
34+
/// Non-invasive: returns true if the fd looks broken (peer gone) or
35+
/// invalid. Based on
36+
/// https://stackoverflow.com/questions/9212243/linux-checking-if-a-socket-pipe-is-broken-without-doing-a-read-write.
37+
pub(crate) fn fd_looks_broken(fd_raw: i32) -> bool {
38+
// SAFETY: we only borrow for the duration of this call; `fd_raw`
39+
// must be a valid fd.
40+
let fd = unsafe { BorrowedFd::borrow_raw(fd_raw) };
41+
42+
// Request writable readiness and error/hangup notifications.
43+
let mut fds = [PollFd::new(
44+
fd,
45+
PollFlags::POLLOUT | PollFlags::POLLERR | PollFlags::POLLHUP,
46+
)];
47+
48+
// NOTE: 0u16 means "don't block" in this nix API.
49+
match poll(&mut fds, 0u16) {
50+
Ok(n) if n > 0 => {
51+
let re = fds[0].revents().unwrap_or(PollFlags::empty());
52+
// On the write end of a pipe/pty, POLLHUP or POLLERR
53+
// means the reader is gone.
54+
re.intersects(PollFlags::POLLERR | PollFlags::POLLHUP)
55+
}
56+
Ok(_) => false, // no events → treat as not broken (regular
57+
// files often look like this)
58+
Err(_) => true, // e.g., EBADF → definitely broken/invalid
59+
}
60+
}
61+
62+
/// Returns true if stdout's endpoint appears gone (pipe/pty peer
63+
/// closed) or invalid.
64+
pub(crate) fn is_stdout_broken() -> bool {
65+
fd_looks_broken(STDOUT_FILENO)
66+
}
67+
68+
/// Returns true if stderr's endpoint appears gone (pipe/pty peer
69+
/// closed) or invalid.
70+
pub(crate) fn is_stderr_broken() -> bool {
71+
fd_looks_broken(STDERR_FILENO)
72+
}
73+
74+
/// Returns true if either stdout or stderr looks broken.
75+
pub(crate) fn any_stdio_broken() -> bool {
76+
is_stdout_broken() || is_stderr_broken()
77+
}
78+
79+
/// Redirects stdout and stderr to the specified file.
80+
///
81+
/// The file is opened in append mode and created if it doesn't exist.
82+
/// This permanently modifies the process's stdio streams.
83+
pub(crate) fn redirect_stdio_to_file(path: &str) -> anyhow::Result<()> {
84+
let file = OpenOptions::new()
85+
.create(true)
86+
.append(true)
87+
.open(path)
88+
.with_context(|| format!("failed to open log file: {}", path))?;
89+
let raw_fd = file.as_raw_fd();
90+
// SAFETY: `raw_fd` is a valid file descriptor obtained from
91+
// `as_raw_fd()` on an open file. `STDOUT_FILENO` (`1`) and
92+
// `STDERR_FILENO` (`2`) are always valid file descriptor numbers.
93+
// `dup2` is safe to call with these valid file descriptors.
94+
unsafe {
95+
if nix::libc::dup2(raw_fd, STDOUT_FILENO) == -1 {
96+
anyhow::bail!(
97+
"failed to redirect stdout: {}",
98+
std::io::Error::last_os_error()
99+
);
100+
}
101+
if nix::libc::dup2(raw_fd, STDERR_FILENO) == -1 {
102+
anyhow::bail!(
103+
"failed to redirect stderr: {}",
104+
std::io::Error::last_os_error()
105+
);
106+
}
107+
}
108+
std::mem::forget(file);
109+
Ok(())
110+
}
111+
112+
/// Redirects stdout and stderr to a user-specific log file in /tmp.
113+
///
114+
/// Creates a log file at `/tmp/{user}/monarch-process-exit-{pid}.log`
115+
/// and redirects stdio to it. The user directory is created if it
116+
/// doesn't exist.
117+
pub(crate) fn redirect_stdio_to_user_pid_file() -> anyhow::Result<()> {
118+
let user = std::env::var("USER").unwrap_or_else(|_| "unknown".to_string());
119+
let pid = std::process::id();
120+
let log_dir = format!("/tmp/{}", user);
121+
std::fs::create_dir_all(&log_dir)?;
122+
let path = format!("{}/monarch-process-exit-{}.log", log_dir, pid);
123+
redirect_stdio_to_file(&path)?;
124+
Ok(())
125+
}
126+
127+
/// Redirects stdio to a log file if stdout is broken.
128+
pub(crate) fn handle_broken_pipes() {
129+
if any_stdio_broken() {
130+
if redirect_stdio_to_user_pid_file().is_ok() {
131+
tracing::info!(
132+
"stdio for {} redirected due to broken pipe",
133+
std::process::id()
134+
);
135+
}
136+
}
137+
}
138+
139+
#[cfg(all(unix, test))]
140+
mod tests {
141+
use nix::libc::STDERR_FILENO;
142+
use nix::libc::STDOUT_FILENO;
143+
use tempfile::TempDir;
144+
145+
use super::*;
146+
147+
struct StdioGuard {
148+
saved_stdout: i32,
149+
saved_stderr: i32,
150+
}
151+
152+
impl StdioGuard {
153+
fn new() -> Self {
154+
// SAFETY: `STDOUT_FILENO` (`1`) and `STDERR_FILENO` (`2`)
155+
// are always valid file descriptor numbers. `dup()` is
156+
// safe to call on these standard descriptors and will
157+
// return new file descriptors pointing to the same files.
158+
unsafe {
159+
let saved_stdout = nix::libc::dup(STDOUT_FILENO);
160+
let saved_stderr = nix::libc::dup(STDERR_FILENO);
161+
Self {
162+
saved_stdout,
163+
saved_stderr,
164+
}
165+
}
166+
}
167+
}
168+
169+
impl Drop for StdioGuard {
170+
fn drop(&mut self) {
171+
// SAFETY: `saved_stdout` and `saved_stderr` are valid
172+
// file descriptors returned by `dup()` in `new()`.
173+
// `STDOUT_FILENO` and `STDERR_FILENO` are always valid
174+
// target descriptors. `dup2()` and `close()` are safe to
175+
// call with these valid fds.
176+
unsafe {
177+
nix::libc::dup2(self.saved_stdout, STDOUT_FILENO);
178+
nix::libc::dup2(self.saved_stderr, STDERR_FILENO);
179+
nix::libc::close(self.saved_stdout);
180+
nix::libc::close(self.saved_stderr);
181+
}
182+
}
183+
}
184+
185+
#[test]
186+
fn test_is_stdout_broken_with_working_stdout() {
187+
assert!(!is_stdout_broken());
188+
}
189+
190+
#[test]
191+
fn test_redirect_stdio_to_file_creates_file() {
192+
let _guard = StdioGuard::new();
193+
194+
let temp_dir = TempDir::new().unwrap();
195+
let log_path = temp_dir.path().join("test.log");
196+
let path_str = log_path.to_str().unwrap();
197+
198+
assert!(redirect_stdio_to_file(path_str).is_ok());
199+
assert!(log_path.exists());
200+
}
201+
}

0 commit comments

Comments
 (0)