Skip to content

Commit bb63757

Browse files
: set mesh logging defaults for notebooks (#1893)
Summary: D86994420 changed defaults: - `HYPERACTOR_MESH_ENABLE_LOG_FORWARDING=false` - `HYPERACTOR_MESH_ENABLE_FILE_CAPTURE=false` - `HYPERACTOR_MESH_TAIL_LOG_LINES=0` these defaults do not play well with interactive notebooks so this diff selectively overrides them when the execution environment is interactive (ipython, jupyter, bento). incidentally this diff adds a new test module `test_actor_logging.py`. today it has a new smoke test, in time i mean to break out the logging tests from `test_python_actor.py`, fix those that need it, and move them here. Reviewed By: mariusae Differential Revision: D87098535
1 parent b5c6dde commit bb63757

File tree

5 files changed

+188
-34
lines changed

5 files changed

+188
-34
lines changed

hyperactor/src/config/global.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,16 @@ pub fn get<T: AttrValue + Copy>(key: Key<T>) -> T {
413413
*key.default().expect("key must have a default")
414414
}
415415

416+
/// Return the override value for `key` if it is explicitly present in
417+
/// `overrides`, otherwise fall back to the global value for that key.
418+
pub fn override_or_global<T: AttrValue + Copy>(overrides: &Attrs, key: Key<T>) -> T {
419+
if overrides.contains_key(key) {
420+
*overrides.get(key).unwrap()
421+
} else {
422+
get(key)
423+
}
424+
}
425+
416426
/// Get a key by cloning the value.
417427
///
418428
/// Resolution order: TestOverride -> Runtime -> Env -> File ->

hyperactor_mesh/src/alloc/process.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ use super::ProcStopReason;
4949
use crate::assign::Ranks;
5050
use crate::bootstrap;
5151
use crate::bootstrap::Allocator2Process;
52-
use crate::bootstrap::MESH_ENABLE_FILE_CAPTURE;
5352
use crate::bootstrap::MESH_ENABLE_LOG_FORWARDING;
5453
use crate::bootstrap::MESH_TAIL_LOG_LINES;
5554
use crate::bootstrap::Process2Allocator;
@@ -447,43 +446,30 @@ impl ProcessAlloc {
447446
}
448447
let mut cmd = self.cmd.lock().await;
449448

450-
// Read config (defaults are in 'bootstrap.rs').
449+
// In the case `MESH_ENABLE_LOG_FORWARDING` is set it's
450+
// probable the client execution context is a notebook. In
451+
// that case, for output from this process's children to
452+
// reach the client, we **must** use pipes and copy output
453+
// from child to parent (**`Stdio::inherit`** does not work!).
454+
// So, this variable is being used as a proxy for "use pipes"
455+
// here.
451456
let enable_forwarding = hyperactor::config::global::get(MESH_ENABLE_LOG_FORWARDING);
452-
let enable_file_capture = hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE);
453457
let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
454-
455-
// We don't support FileAppender in this v0 allocator path; warn if asked.
456-
if enable_file_capture {
457-
tracing::info!(
458-
"MESH_ENABLE_FILE_CAPTURE=true, but ProcessAllocator (v0) has no FileAppender; \
459-
files will NOT be written in this path"
460-
);
461-
}
462-
463-
let need_stdio = enable_forwarding || tail_size > 0;
464-
465-
if need_stdio {
458+
if enable_forwarding || tail_size > 0 {
466459
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
467460
} else {
468461
cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
469462
tracing::info!(
470-
enable_forwarding,
471-
enable_file_capture,
472-
tail_size,
473463
"child stdio NOT captured (forwarding/file_capture/tail all disabled); \
474464
inheriting parent console"
475465
);
476466
}
477-
478-
// Only allocate & export a log channel when forwarding is
479-
// enabled.
480-
let log_channel: Option<ChannelAddr> = if enable_forwarding {
481-
let addr = ChannelAddr::any(ChannelTransport::Unix);
482-
cmd.env(bootstrap::BOOTSTRAP_LOG_CHANNEL, addr.to_string());
483-
Some(addr)
484-
} else {
485-
None
486-
};
467+
// Regardless of the value of `MESH_ENABLE_LOG_FORWARDING`
468+
// (c.f. `enable_forwarding`), we do not do log forwarding on
469+
// these procs. This is because, now that we are on the v1
470+
// path, the only procs we spawn via this code path are those
471+
// to support `HostMeshAgent`s.
472+
let log_channel: Option<ChannelAddr> = None;
487473

488474
let index = self.created.len();
489475
self.created.push(ShortUuid::generate());

hyperactor_mesh/src/bootstrap.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use hyperactor::clock::RealClock;
4545
use hyperactor::config::CONFIG;
4646
use hyperactor::config::ConfigAttr;
4747
use hyperactor::config::global as config;
48+
use hyperactor::config::global::override_or_global;
4849
use hyperactor::context;
4950
use hyperactor::declare_attrs;
5051
use hyperactor::host::Host;
@@ -1848,6 +1849,13 @@ impl ProcManager for BootstrapProcManager {
18481849
let (callback_addr, mut callback_rx) =
18491850
channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
18501851

1852+
// Decide whether we need to capture stdio.
1853+
let overrides = &config.client_config_override;
1854+
let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1855+
let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1856+
let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1857+
let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1858+
18511859
let mode = Bootstrap::Proc {
18521860
proc_id: proc_id.clone(),
18531861
backend_addr,
@@ -1862,12 +1870,6 @@ impl ProcManager for BootstrapProcManager {
18621870
.map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
18631871
);
18641872

1865-
// Decide whether we need to capture stdio.
1866-
let enable_forwarding = hyperactor::config::global::get(MESH_ENABLE_LOG_FORWARDING);
1867-
let enable_file_capture = hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE);
1868-
let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
1869-
let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1870-
18711873
if need_stdio {
18721874
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
18731875
} else {

python/monarch/_src/actor/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,26 @@
99
"""
1010
Monarch Actor API
1111
"""
12+
13+
from monarch._rust_bindings.monarch_hyperactor.config import configure
14+
15+
# Detect if we're running in IPython/Jupyter
16+
_in_ipython = False
17+
try:
18+
# pyre-ignore[21]
19+
from IPython import get_ipython
20+
21+
_in_ipython = get_ipython() is not None
22+
except ImportError:
23+
pass
24+
25+
# Set notebook-friendly defaults for stdio piping when spawning procs.
26+
# These config is read by:
27+
# 1. Rust BootstrapProcManager::spawn() to decide whether to pipe
28+
# child stdio
29+
# 2. Rust LoggingMeshClient::spawn() to decide whether to spawn
30+
# LogForwardActors
31+
# Only apply these defaults overrides in notebook/IPython environments
32+
# where stdout **needs** to be captured.
33+
if _in_ipython:
34+
configure(enable_log_forwarding=True)

python/tests/test_actor_logging.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
# pyre-unsafe
8+
9+
import asyncio
10+
import logging
11+
import os
12+
import re
13+
import sys
14+
import tempfile
15+
16+
import pytest
17+
from monarch._src.actor.host_mesh import this_host
18+
from monarch.actor import Actor, endpoint
19+
20+
21+
class Logger(Actor):
22+
def __init__(
23+
self, stdout_path: str | None = None, stderr_path: str | None = None
24+
) -> None:
25+
self._logger: logging.Logger = logging.getLogger()
26+
27+
# If file paths are provided, remove existing handlers to log
28+
# only to files.
29+
if stdout_path or stderr_path:
30+
self._logger.handlers.clear()
31+
32+
stdout_handler = (
33+
logging.FileHandler(stdout_path, mode="a")
34+
if stdout_path
35+
else logging.StreamHandler(sys.stdout)
36+
)
37+
stdout_handler.setLevel(logging.INFO)
38+
stdout_handler.addFilter(lambda record: record.levelno < logging.ERROR)
39+
40+
stderr_handler = (
41+
logging.FileHandler(stderr_path, mode="a")
42+
if stderr_path
43+
else logging.StreamHandler(sys.stderr)
44+
)
45+
stderr_handler.setLevel(logging.ERROR)
46+
47+
self._logger.addHandler(stdout_handler)
48+
self._logger.addHandler(stderr_handler)
49+
50+
self._stdout_handler = stdout_handler
51+
self._stderr_handler = stderr_handler
52+
53+
@endpoint
54+
async def log_warn(self, content: str) -> None:
55+
self._logger.warning(f"{content}")
56+
self._stdout_handler.flush()
57+
self._stderr_handler.flush()
58+
59+
@endpoint
60+
async def log_info(self, content: str) -> None:
61+
self._logger.info(f"{content}")
62+
self._stdout_handler.flush()
63+
self._stderr_handler.flush()
64+
65+
@endpoint
66+
async def log_error(self, content: str) -> None:
67+
self._logger.error(f"{content}")
68+
self._stdout_handler.flush()
69+
self._stderr_handler.flush()
70+
71+
72+
@pytest.mark.timeout(60)
73+
async def test_actor_logging_smoke() -> None:
74+
# Create temporary files to capture output.
75+
with tempfile.NamedTemporaryFile(
76+
mode="w+", delete=False, suffix="_stdout.log"
77+
) as stdout_file, tempfile.NamedTemporaryFile(
78+
mode="w+", delete=False, suffix="_stderr.log"
79+
) as stderr_file:
80+
stdout_path = stdout_file.name
81+
stderr_path = stderr_file.name
82+
83+
try:
84+
pm = this_host().spawn_procs(per_host={"gpus": 2})
85+
await pm.logging_option(level=logging.INFO)
86+
87+
# Log to the terminal.
88+
am_1 = pm.spawn("logger_1", Logger)
89+
await am_1.log_warn.call("hello 1")
90+
await am_1.log_info.call("hello 2")
91+
await am_1.log_error.call("hello 3")
92+
93+
# Log to files.
94+
am_2 = pm.spawn("logger_2", Logger, stdout_path, stderr_path)
95+
await am_2.log_warn.call("hello 1")
96+
await am_2.log_info.call("hello 2")
97+
await am_2.log_error.call("hello 3")
98+
99+
# Wait for output to be written.
100+
await asyncio.sleep(1)
101+
102+
# Read the captured output.
103+
with open(stdout_path, "r") as f:
104+
stdout_content = f.read()
105+
with open(stderr_path, "r") as f:
106+
stderr_content = f.read()
107+
108+
# Assertions on the captured output.
109+
assert re.search(
110+
r"hello 1", stdout_content
111+
), f"Expected 'hello 1' in stdout: {stdout_content}"
112+
assert re.search(
113+
r"hello 2", stdout_content
114+
), f"Expected 'hello 2' in stdout: {stdout_content}"
115+
assert re.search(
116+
r"hello 3", stderr_content
117+
), f"Expected 'hello 3' in stderr: {stderr_content}"
118+
assert re.search(
119+
r"\[actor=.*Logger.*\]", stdout_content
120+
), f"Expected actor prefix in stdout: {stdout_content}"
121+
122+
await pm.stop()
123+
124+
finally:
125+
# Clean up temp files.
126+
try:
127+
os.unlink(stdout_path)
128+
except OSError:
129+
pass
130+
try:
131+
os.unlink(stderr_path)
132+
except OSError:
133+
pass

0 commit comments

Comments
 (0)