Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions hyperactor_mesh/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ declare_attrs! {
env_name: Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
py_name: None,
})
pub attr MESH_ENABLE_LOG_FORWARDING: bool = true;
pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;

/// When `true`: if stdio is piped, each child's `StreamFwder`
/// also forwards lines to a host-scoped `FileAppender` managed by
Expand All @@ -124,7 +124,7 @@ declare_attrs! {
env_name: Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
py_name: None,
})
pub attr MESH_ENABLE_FILE_CAPTURE: bool = true;
pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;

/// Maximum number of log lines retained in a proc's stderr/stdout
/// tail buffer. Used by [`StreamFwder`] when wiring child
Expand All @@ -133,7 +133,7 @@ declare_attrs! {
env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
py_name: None,
})
pub attr MESH_TAIL_LOG_LINES: usize = 100;
pub attr MESH_TAIL_LOG_LINES: usize = 0;

/// If enabled (default), bootstrap child processes install
/// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
Expand Down Expand Up @@ -3692,6 +3692,10 @@ mod tests {
#[tokio::test]
async fn exit_tail_is_attached_and_logged() {
hyperactor_telemetry::initialize_logging_for_test();

let lock = hyperactor::config::global::lock();
let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);

// Spawn a child that writes to stderr then exits 7.
let mut cmd = Command::new("sh");
cmd.arg("-c")
Expand Down
4 changes: 4 additions & 0 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ mod tests {

use super::*;
use crate::Bootstrap;
use crate::bootstrap::MESH_TAIL_LOG_LINES;
use crate::resource::Status;
use crate::v1::ActorMesh;
use crate::v1::testactor;
Expand Down Expand Up @@ -1321,6 +1322,9 @@ mod tests {
#[tokio::test]
#[cfg(fbcode_build)]
async fn test_failing_proc_allocation() {
let lock = hyperactor::config::global::lock();
let _guard = lock.override_key(MESH_TAIL_LOG_LINES, 100);

let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");

let hosts = vec![free_localhost_addr(), free_localhost_addr()];
Expand Down
14 changes: 14 additions & 0 deletions monarch_hyperactor/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
fbinit::perform_init();
};

// SAFETY: This is an FFI call to libc::signal, which is unsafe by
// signature. We pass a valid signal number (SIGTERM) and a
// well-defined handler constant (SIG_DFL). This only installs the
// default disposition for SIGTERM; it does not call back into
// Rust. We do this during bootstrap (before spawning threads or
// installing other handlers) to avoid glog's SIGTERM backtraces,
// and we accept the process-wide effect. We are not invoking it
// from a signal handler, so async-signal-safety constraints on
// the caller don't apply here. If we ever need finer control
// (flags, SA_RESTART), we should switch to sigaction(2).
unsafe {
libc::signal(libc::SIGTERM, libc::SIG_DFL);
}

hyperactor::tracing::debug!("entering async bootstrap");
crate::runtime::future_into_py::<_, ()>(py, async move {
// SAFETY:
Expand Down
10 changes: 7 additions & 3 deletions monarch_hyperactor/src/v1/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,9 @@ impl LoggingMeshClient {
// re-spawning infra, which we deliberately don't do at
// runtime.
(None, true) => {
return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"log forwarding disabled by config at startup; cannot enable streaming_to_client",
));
// return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
// "log forwarding disabled by config at startup; cannot enable streaming_to_client",
// ));
}
}

Expand Down Expand Up @@ -592,6 +592,9 @@ mod tests {
);
}

/*
// Update (SF: 2025, 11, 13): We now ignore stream to client requests if
// log forwarding is enabled.
// (c) stream_to_client = true when forwarding was
// never spawned -> Err
let res = client_ref.set_mode(&py_instance, true, None, 10);
Expand All @@ -606,6 +609,7 @@ mod tests {
"unexpected err when enabling streaming with no forwarders: {msg}"
);
}
*/
});

drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP"
Expand Down
4 changes: 2 additions & 2 deletions python/monarch/_src/actor/v1/proc_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ def rank_tensors(self) -> Dict[str, "Tensor"]:

async def logging_option(
self,
stream_to_client: bool = True,
aggregate_window_sec: int | None = 3,
stream_to_client: bool = False,
aggregate_window_sec: int | None = None,
level: int = logging.INFO,
) -> None:
"""
Expand Down
191 changes: 191 additions & 0 deletions python/tests/test_actor_logging_smoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-unsafe

"""Actor-based logging smoke test.


Defines a `Logger` actor that routes INFO/WARNING to stdout and ERROR+
to stderr using two `logging.StreamHandler`s. The test captures
**process-level** stdout/stderr by temporarily redirecting file
descriptors (FD 1/2), so both Python and any Rust / native output
would be captured. It then spins up a small mesh, invokes the actor's
endpoints, and asserts the messages landed on the expected streams
(and include the expected actor prefix).

"""

import asyncio
import logging
import os
import re
import sys
import tempfile

import pytest
from monarch._src.actor.host_mesh import this_host
from monarch.actor import Actor, endpoint


class Logger(Actor):
"""Actor that emits log lines at different severities and routes them
to separate files.

Setup:

- Adds a file handler for INFO/WARNING and another for ERROR+.

- Flushes handlers after each endpoint call to minimize
buffering effects.

Notes:
- We attach handlers to the *root* logger returned by
`logging.getLogger()`.

- The INFO/WARNING routing is enforced via a simple level
filter: records with `levelno < logging.ERROR` go to one file;
others go to another file.

"""

def __init__(self, stdout_path: str, stderr_path: str) -> None:
self._logger: logging.Logger = logging.getLogger()

stdout_handler = logging.FileHandler(stdout_path, mode="a")
stdout_handler.setLevel(logging.INFO)
stdout_handler.addFilter(lambda record: record.levelno < logging.ERROR)

stderr_handler = logging.FileHandler(stderr_path, mode="a")
stderr_handler.setLevel(logging.ERROR)

self._logger.addHandler(stdout_handler)
self._logger.addHandler(stderr_handler)

self._stdout_handler = stdout_handler
self._stderr_handler = stderr_handler

@endpoint
async def log_warn(self, content: str) -> None:
"""Emit a WARNING-level message and flush all handlers.

Args:
content: The message body to log.

"""
self._logger.warning(f"{content}")
self._stdout_handler.flush()
self._stderr_handler.flush()

@endpoint
async def log_info(self, content: str) -> None:
"""
Emit an INFO-level message and flush all handlers.

Args:
content: The message body to log.
"""
self._logger.info(f"{content}")
self._stdout_handler.flush()
self._stderr_handler.flush()

@endpoint
async def log_error(self, content: str) -> None:
"""
Emit an ERROR-level message and flush all handlers.

Args:
content: The message body to log.
"""
self._logger.error(f"{content}")
self._stdout_handler.flush()
self._stderr_handler.flush()


@pytest.mark.timeout(60)
async def test_actor_logging_smoke() -> None:
"""End-to-end smoke test of file-based logging for the Logger actor.

Flow:

1. Create temporary files for stdout/stderr output.

2. Start a small per-host mesh, enable logging, and spawn the
`Logger` actor with paths to the temp files.

3. Invoke `log_warn`, `log_info`, and `log_error`.

4. Read back the files and assert:
- WARNING/INFO appear in the stdout file,
- ERROR appears in the stderr file,
- an actor prefix like `[actor=...Logger...]` is present.

This test validates logging without relying on FD-level redirection,
which may not work reliably in all CI environments.

"""
# Create temporary files to capture output.
with tempfile.NamedTemporaryFile(
mode="w+", delete=False, suffix="_stdout.log"
) as stdout_file, tempfile.NamedTemporaryFile(
mode="w+", delete=False, suffix="_stderr.log"
) as stderr_file:
stdout_path = stdout_file.name
stderr_path = stderr_file.name

try:
# Make a logger mesh.
pm = this_host().spawn_procs(per_host={"gpus": 2})
await pm.logging_option(level=logging.INFO)
am = pm.spawn("logger", Logger, stdout_path, stderr_path)

# Do some logging actions.
await am.log_warn.call("hello 1")
await am.log_info.call("hello 2")
await am.log_error.call("hello 3")

# Wait a bit for output to be written.
await asyncio.sleep(1)

await pm.stop()

# Read the captured output.
with open(stdout_path, "r") as f:
stdout_content = f.read()
with open(stderr_path, "r") as f:
stderr_content = f.read()

# Print the captured output.
print("")
print("=== Captured stdout ===")
print(stdout_content)
print("=== Captured stderr ===")
print(stderr_content)

# Assertions on the captured output.
assert re.search(
r"hello 1", stdout_content
), f"Expected 'hello 1' in stdout: {stdout_content}"
assert re.search(
r"hello 2", stdout_content
), f"Expected 'hello 2' in stdout: {stdout_content}"
assert re.search(
r"hello 3", stderr_content
), f"Expected 'hello 3' in stderr: {stderr_content}"
assert re.search(
r"\[actor=.*Logger.*\]", stdout_content
), f"Expected actor prefix in stdout: {stdout_content}"

finally:
# Clean up temp files.
try:
os.unlink(stdout_path)
except OSError:
pass
try:
os.unlink(stderr_path)
except OSError:
pass
14 changes: 12 additions & 2 deletions python/tests/test_allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,19 @@ async def test_allocate_failure_message(self) -> None:
r"exited with code 1: Traceback \(most recent call last\).*",
):
with remote_process_allocator(
envs={"MONARCH_ERROR_DURING_BOOTSTRAP_FOR_TESTING": "1"}
envs={
"MONARCH_ERROR_DURING_BOOTSTRAP_FOR_TESTING": "1",
"HYPERACTOR_MESH_ENABLE_LOG_FORWARDING": "true",
"HYPERACTOR_MESH_ENABLE_FILE_CAPTURE": "true",
"HYPERACTOR_MESH_TAIL_LOG_LINES": "100",
}
) as host1, remote_process_allocator(
envs={"MONARCH_ERROR_DURING_BOOTSTRAP_FOR_TESTING": "1"}
envs={
"MONARCH_ERROR_DURING_BOOTSTRAP_FOR_TESTING": "1",
"HYPERACTOR_MESH_ENABLE_LOG_FORWARDING": "true",
"HYPERACTOR_MESH_ENABLE_FILE_CAPTURE": "true",
"HYPERACTOR_MESH_TAIL_LOG_LINES": "100",
}
) as host2:
allocator = RemoteAllocator(
world_id="test_remote_allocator",
Expand Down
Loading