Skip to content

Commit d5bcca1

Browse files
: set mesh logging defaults for notebooks (#1893)
Summary: D86994420 changed defaults: - `HYPERACTOR_MESH_ENABLE_LOG_FORWARDING=false` - `HYPERACTOR_MESH_ENABLE_FILE_CAPTURE=false` - `HYPERACTOR_MESH_TAIL_LOG_LINES=0` these defaults do not play well with interactive notebooks so this diff selectively overrides them when the execution environment is interactive (ipython, jupyter, bento). incidentally this diff adds a new test module `test_actor_logging.py`. today it has a new smoke test, in time i mean to break out the logging tests from `test_python_actor.py`, fix those that need it, and move them here. Differential Revision: D87098535
1 parent a35e848 commit d5bcca1

File tree

3 files changed

+160
-3
lines changed

3 files changed

+160
-3
lines changed

hyperactor_mesh/src/bootstrap.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ declare_attrs! {
9696
/// piping) or via [`StreamFwder`] when piping is active.
9797
@meta(CONFIG = ConfigAttr {
9898
env_name: Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
99-
py_name: None,
99+
py_name: Some("enable_log_forwarding".to_string()),
100100
})
101101
pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;
102102

@@ -121,7 +121,7 @@ declare_attrs! {
121121
/// buffer used for peeking—independent of file capture.
122122
@meta(CONFIG = ConfigAttr {
123123
env_name: Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
124-
py_name: None,
124+
py_name: Some("enable_file_capture".to_string()),
125125
})
126126
pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;
127127

@@ -130,7 +130,7 @@ declare_attrs! {
130130
/// pipes. Default: 100
131131
@meta(CONFIG = ConfigAttr {
132132
env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
133-
py_name: None,
133+
py_name: Some("tail_log_lines".to_string()),
134134
})
135135
pub attr MESH_TAIL_LOG_LINES: usize = 0;
136136

python/monarch/_src/actor/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,27 @@
99
"""
1010
Monarch Actor API
1111
"""
12+
13+
import os
14+
from monarch._rust_bindings.monarch_hyperactor.config import configure
15+
16+
# Detect if we're running in IPython/Jupyter
17+
_in_ipython = False
18+
try:
19+
# pyre-ignore[21]
20+
from IPython import get_ipython
21+
22+
_in_ipython = get_ipython() is not None
23+
except ImportError:
24+
pass
25+
26+
# Set notebook-friendly defaults for stdio piping when spawning procs.
27+
# These env vars are read by:
28+
# 1. Rust BootstrapProcManager::spawn() to decide whether to pipe
29+
# child stdio
30+
# 2. Rust LoggingMeshClient::spawn() to decide whether to spawn
31+
# LogForwardActors
32+
# Only apply these defaults overrides in notebook/IPython environments
33+
# where stdout **needs** to be captured.
34+
if _in_ipython:
35+
configure(enable_log_forwarding = True)

python/tests/test_actor_logging.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
# pyre-unsafe
8+
9+
import asyncio
10+
import logging
11+
import os
12+
import re
13+
import sys
14+
import tempfile
15+
16+
import pytest
17+
from monarch._src.actor.host_mesh import this_host
18+
from monarch.actor import Actor, endpoint
19+
20+
21+
class Logger(Actor):
22+
def __init__(
23+
self, stdout_path: str | None = None, stderr_path: str | None = None
24+
) -> None:
25+
self._logger: logging.Logger = logging.getLogger()
26+
27+
# If file paths are provided, remove existing handlers to log
28+
# only to files.
29+
if stdout_path or stderr_path:
30+
self._logger.handlers.clear()
31+
32+
stdout_handler = (
33+
logging.FileHandler(stdout_path, mode="a")
34+
if stdout_path
35+
else logging.StreamHandler(sys.stdout)
36+
)
37+
stdout_handler.setLevel(logging.INFO)
38+
stdout_handler.addFilter(lambda record: record.levelno < logging.ERROR)
39+
40+
stderr_handler = (
41+
logging.FileHandler(stderr_path, mode="a")
42+
if stderr_path
43+
else logging.StreamHandler(sys.stderr)
44+
)
45+
stderr_handler.setLevel(logging.ERROR)
46+
47+
self._logger.addHandler(stdout_handler)
48+
self._logger.addHandler(stderr_handler)
49+
50+
self._stdout_handler = stdout_handler
51+
self._stderr_handler = stderr_handler
52+
53+
@endpoint
54+
async def log_warn(self, content: str) -> None:
55+
self._logger.warning(f"{content}")
56+
self._stdout_handler.flush()
57+
self._stderr_handler.flush()
58+
59+
@endpoint
60+
async def log_info(self, content: str) -> None:
61+
self._logger.info(f"{content}")
62+
self._stdout_handler.flush()
63+
self._stderr_handler.flush()
64+
65+
@endpoint
66+
async def log_error(self, content: str) -> None:
67+
self._logger.error(f"{content}")
68+
self._stdout_handler.flush()
69+
self._stderr_handler.flush()
70+
71+
72+
@pytest.mark.timeout(60)
73+
async def test_actor_logging_smoke() -> None:
74+
# Create temporary files to capture output.
75+
with tempfile.NamedTemporaryFile(
76+
mode="w+", delete=False, suffix="_stdout.log"
77+
) as stdout_file, tempfile.NamedTemporaryFile(
78+
mode="w+", delete=False, suffix="_stderr.log"
79+
) as stderr_file:
80+
stdout_path = stdout_file.name
81+
stderr_path = stderr_file.name
82+
83+
try:
84+
pm = this_host().spawn_procs(per_host={"gpus": 2})
85+
await pm.logging_option(level=logging.INFO)
86+
87+
# Log to the terminal.
88+
am_1 = pm.spawn("logger_1", Logger)
89+
await am_1.log_warn.call("hello 1")
90+
await am_1.log_info.call("hello 2")
91+
await am_1.log_error.call("hello 3")
92+
93+
# Log to files.
94+
am_2 = pm.spawn("logger_2", Logger, stdout_path, stderr_path)
95+
await am_2.log_warn.call("hello 1")
96+
await am_2.log_info.call("hello 2")
97+
await am_2.log_error.call("hello 3")
98+
99+
# Wait for output to be written.
100+
await asyncio.sleep(1)
101+
102+
# Read the captured output.
103+
with open(stdout_path, "r") as f:
104+
stdout_content = f.read()
105+
with open(stderr_path, "r") as f:
106+
stderr_content = f.read()
107+
108+
# Assertions on the captured output.
109+
assert re.search(
110+
r"hello 1", stdout_content
111+
), f"Expected 'hello 1' in stdout: {stdout_content}"
112+
assert re.search(
113+
r"hello 2", stdout_content
114+
), f"Expected 'hello 2' in stdout: {stdout_content}"
115+
assert re.search(
116+
r"hello 3", stderr_content
117+
), f"Expected 'hello 3' in stderr: {stderr_content}"
118+
assert re.search(
119+
r"\[actor=.*Logger.*\]", stdout_content
120+
), f"Expected actor prefix in stdout: {stdout_content}"
121+
122+
await pm.stop()
123+
124+
finally:
125+
# Clean up temp files.
126+
try:
127+
os.unlink(stdout_path)
128+
except OSError:
129+
pass
130+
try:
131+
os.unlink(stderr_path)
132+
except OSError:
133+
pass

0 commit comments

Comments
 (0)