Skip to content

Commit 8f679e9

Browse files
: bootstrap: restore default SIGTERM disposition (#1885)
Summary: `fbinit()` causes the installation of a glog signal handler that prints a stack trace handling `SIGTERM`. calls to `pm.stop()` result in these traces being written to stderr. this diff restores the default signal disposition after the call to `fbinit()` in `bootstrap_main` and the behavior is extinguished. Differential Revision: D87037324
1 parent 47d8cb1 commit 8f679e9

File tree

2 files changed

+254
-0
lines changed

2 files changed

+254
-0
lines changed

monarch_hyperactor/src/bootstrap.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
3636
fbinit::perform_init();
3737
};
3838

39+
// SAFETY: This is an FFI call to libc::signal, which is unsafe by
40+
// signature. We pass a valid signal number (SIGTERM) and a
41+
// well-defined handler constant (SIG_DFL). This only installs the
42+
// default disposition for SIGTERM; it does not call back into
43+
// Rust. We do this during bootstrap (before spawning threads or
44+
// installing other handlers) to avoid glog's SIGTERM backtraces,
45+
// and we accept the process-wide effect. We are not invoking it
46+
// from a signal handler, so async-signal-safety constraints on
47+
// the caller don't apply here. If we ever need finer control
48+
// (flags, SA_RESTART), we should switch to sigaction(2).
49+
unsafe {
50+
libc::signal(libc::SIGTERM, libc::SIG_DFL);
51+
}
52+
3953
hyperactor::tracing::debug!("entering async bootstrap");
4054
crate::runtime::future_into_py::<_, ()>(py, async move {
4155
// SAFETY:
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
# pyre-unsafe
8+
9+
"""Actor-based logging smoke test.
10+
11+
12+
Defines a `Logger` actor that routes INFO/WARNING to stdout and ERROR+
13+
to stderr using two `logging.StreamHandler`s. The test captures
14+
**process-level** stdout/stderr by temporarily redirecting file
15+
descriptors (FD 1/2), so both Python and any Rust / native output
16+
would be captured. It then spins up a small mesh, invokes the actor's
17+
endpoints, and asserts the messages landed on the expected streams
18+
(and include the expected actor prefix).
19+
20+
"""
21+
22+
import asyncio
23+
import logging
24+
import os
25+
import re
26+
import sys
27+
import tempfile
28+
29+
import pytest
30+
from monarch._src.actor.host_mesh import this_host
31+
from monarch.actor import Actor, endpoint
32+
33+
34+
class Logger(Actor):
35+
"""Actor that emits log lines at different severities and routes them
36+
to separate streams.
37+
38+
Setup:
39+
40+
- Adds a stdout handler (INFO/WARNING only) and a stderr handler
41+
(ERROR+).
42+
43+
- Flushes handlers after each endpoint call to minimize
44+
buffering effects.
45+
46+
Notes:
47+
- We attach handlers to the *root* logger returned by
48+
`logging.getLogger()`.
49+
50+
- The INFO/WARNING routing is enforced via a simple level
51+
filter: records with `levelno < logging.ERROR` go to stdout;
52+
others go to stderr.
53+
54+
"""
55+
56+
def __init__(self) -> None:
57+
self._logger: logging.Logger = logging.getLogger()
58+
59+
stdout_handler = logging.StreamHandler(sys.stdout)
60+
stdout_handler.setLevel(logging.INFO)
61+
stdout_handler.addFilter(lambda record: record.levelno < logging.ERROR)
62+
63+
stderr_handler = logging.StreamHandler(sys.stderr)
64+
stderr_handler.setLevel(logging.ERROR)
65+
66+
self._logger.addHandler(stdout_handler)
67+
self._logger.addHandler(stderr_handler)
68+
69+
@endpoint
70+
async def log_warn(self, content: str) -> None:
71+
"""Emit a WARNING-level message and flush all handlers.
72+
73+
Args:
74+
content: The message body to log.
75+
76+
"""
77+
self._logger.warning(f"{content}")
78+
for handler in self._logger.handlers:
79+
handler.flush()
80+
sys.stdout.flush()
81+
sys.stderr.flush()
82+
83+
@endpoint
84+
async def log_info(self, content: str) -> None:
85+
"""
86+
Emit an INFO-level message and flush all handlers.
87+
88+
Args:
89+
content: The message body to log.
90+
"""
91+
self._logger.info(f"{content}")
92+
for handler in self._logger.handlers:
93+
handler.flush()
94+
sys.stdout.flush()
95+
sys.stderr.flush()
96+
97+
@endpoint
98+
async def log_error(self, content: str) -> None:
99+
"""
100+
Emit an ERROR-level message and flush all handlers.
101+
102+
Args:
103+
content: The message body to log.
104+
"""
105+
self._logger.error(f"{content}")
106+
for handler in self._logger.handlers:
107+
handler.flush()
108+
sys.stdout.flush()
109+
sys.stderr.flush()
110+
111+
112+
@pytest.mark.timeout(60)
113+
async def test_actor_logging_smoke() -> None:
114+
"""End-to-end smoke test of stdio routing for the Logger actor.
115+
116+
Flow:
117+
118+
1. Duplicate and redirect the process's stdout/stderr file
119+
descriptors to temporary files (captures both Python and
120+
native output).
121+
122+
2. Start a small per-host mesh, enable logging, and spawn the
123+
`Logger` actor.
124+
125+
3. Invoke `log_warn`, `log_info`, and `log_error`.
126+
127+
4. Restore FDs, read back captured output, and assert:
128+
- WARNING/INFO appear on stdout,
129+
- ERROR appears on stderr,
130+
- an actor prefix like `[actor=...Logger...]` is present on
131+
stdout.
132+
133+
This test intentionally uses FD-level redirection (not just
134+
`sys.stdout`) to validate the real streams that the parent process
135+
would see.
136+
137+
"""
138+
original_stdout_fd = None
139+
original_stderr_fd = None
140+
141+
try:
142+
# Save original file descriptors.
143+
original_stdout_fd = os.dup(1) # stdout
144+
original_stderr_fd = os.dup(2) # stderr
145+
146+
# Create temporary files to capture output.
147+
with tempfile.NamedTemporaryFile(
148+
mode="w+", delete=False
149+
) as stdout_file, tempfile.NamedTemporaryFile(
150+
mode="w+", delete=False
151+
) as stderr_file:
152+
stdout_path = stdout_file.name
153+
stderr_path = stderr_file.name
154+
155+
# Redirect file descriptors to our temp files. This will
156+
# capture both Python and Rust output.
157+
os.dup2(stdout_file.fileno(), 1)
158+
os.dup2(stderr_file.fileno(), 2)
159+
160+
# Also redirect Python's sys.stdout/stderr for
161+
# completeness.
162+
original_sys_stdout = sys.stdout
163+
original_sys_stderr = sys.stderr
164+
sys.stdout = stdout_file
165+
sys.stderr = stderr_file
166+
167+
try:
168+
# Make a logger mesh.
169+
pm = this_host().spawn_procs(per_host={"gpus": 2})
170+
await pm.logging_option(level=logging.INFO)
171+
am = pm.spawn("logger", Logger)
172+
173+
# Do some logging actions.
174+
await am.log_warn.call("hello 1")
175+
await am.log_info.call("hello 2")
176+
await am.log_error.call("hello 3")
177+
178+
# Wait a bit for output to be written.
179+
await asyncio.sleep(1)
180+
181+
# Cleanup.
182+
stdout_file.flush()
183+
stderr_file.flush()
184+
os.fsync(stdout_file.fileno())
185+
os.fsync(stderr_file.fileno())
186+
187+
await pm.stop()
188+
189+
finally:
190+
# Restore Python's sys.stdout/stderr
191+
sys.stdout = original_sys_stdout
192+
sys.stderr = original_sys_stderr
193+
194+
# Restore original file descriptors.
195+
os.dup2(original_stdout_fd, 1)
196+
os.dup2(original_stderr_fd, 2)
197+
198+
# Read the captured output.
199+
with open(stdout_path, "r") as f:
200+
stdout_content = f.read()
201+
with open(stderr_path, "r") as f:
202+
stderr_content = f.read()
203+
204+
# Print the captured output.
205+
print("")
206+
print("=== Captured stdout ===")
207+
print(stdout_content)
208+
print("=== Captured stderr ===")
209+
print(stderr_content)
210+
211+
# Clean up temp files.
212+
os.unlink(stdout_path)
213+
os.unlink(stderr_path)
214+
215+
# Assertions on the captured output.
216+
assert re.search(
217+
r"hello 1", stdout_content
218+
), f"Expected 'hello 1' in stdout: {stdout_content}"
219+
assert re.search(
220+
r"hello 2", stdout_content
221+
), f"Expected 'hello 2' in stdout: {stdout_content}"
222+
assert re.search(
223+
r"hello 3", stderr_content
224+
), f"Expected 'hello 3' in stderr: {stderr_content}"
225+
assert re.search(
226+
r"\[actor=.*Logger.*\]", stdout_content
227+
), f"Expected actor prefix in stdout: {stdout_content}"
228+
229+
finally:
230+
# Ensure file descriptors are restored even if something goes
231+
# wrong.
232+
try:
233+
if original_stdout_fd is not None:
234+
os.dup2(original_stdout_fd, 1)
235+
os.close(original_stdout_fd)
236+
if original_stderr_fd is not None:
237+
os.dup2(original_stderr_fd, 2)
238+
os.close(original_stderr_fd)
239+
except OSError:
240+
pass

0 commit comments

Comments
 (0)