Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hyperactor/src/attrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,15 @@ impl Attrs {
pub(crate) fn get_value_by_name(&self, name: &'static str) -> Option<&dyn SerializableValue> {
self.values.get(name).map(|b| b.as_ref())
}

/// Merge all attributes from `other` into this set, consuming
/// `other`.
///
/// For each key in `other`, moves its value into `self`,
/// overwriting any existing value for the same key.
pub(crate) fn merge(&mut self, other: Attrs) {
self.values.extend(other.values);
}
}

impl Clone for Attrs {
Expand Down
99 changes: 89 additions & 10 deletions hyperactor/src/config/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,16 @@ pub fn get<T: AttrValue + Copy>(key: Key<T>) -> T {
*key.default().expect("key must have a default")
}

/// Return the override value for `key` if it is explicitly present in
/// `overrides`, otherwise fall back to the global value for that key.
pub fn override_or_global<T: AttrValue + Copy>(overrides: &Attrs, key: Key<T>) -> T {
if overrides.contains_key(key) {
*overrides.get(key).unwrap()
} else {
get(key)
}
}

/// Get a key by cloning the value.
///
/// Resolution order: TestOverride -> Runtime -> Env -> File ->
Expand Down Expand Up @@ -440,6 +450,24 @@ pub fn try_get_cloned<T: AttrValue>(key: Key<T>) -> Option<T> {
key.default().cloned()
}

/// Construct a [`Layer`] for the given [`Source`] using the provided
/// `attrs`.
///
/// Used by [`set`] and [`create_or_merge`] when installing a new
/// configuration layer.
fn make_layer(source: Source, attrs: Attrs) -> Layer {
match source {
Source::File => Layer::File(attrs),
Source::Env => Layer::Env(attrs),
Source::Runtime => Layer::Runtime(attrs),
Source::TestOverride => Layer::TestOverride {
attrs,
stacks: HashMap::new(),
},
Source::ClientOverride => Layer::ClientOverride(attrs),
}
}

/// Insert or replace a configuration layer for the given source.
///
/// If a layer with the same [`Source`] already exists, its
Expand All @@ -457,16 +485,34 @@ pub fn set(source: Source, attrs: Attrs) {
if let Some(l) = g.ordered.iter_mut().find(|l| layer_source(l) == source) {
*layer_attrs_mut(l) = attrs;
} else {
g.ordered.push(match source {
Source::File => Layer::File(attrs),
Source::Env => Layer::Env(attrs),
Source::Runtime => Layer::Runtime(attrs),
Source::TestOverride => Layer::TestOverride {
attrs,
stacks: HashMap::new(),
},
Source::ClientOverride => Layer::ClientOverride(attrs),
});
g.ordered.push(make_layer(source, attrs));
}
g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Runtime < Env < File < ClientOverride
}

/// Insert or update a configuration layer for the given [`Source`].
///
/// If a layer with the same [`Source`] already exists, its attributes
/// are **updated in place**: all keys present in `attrs` are absorbed
/// into the existing layer, overwriting any previous values for those
/// keys while leaving all other keys in that layer unchanged.
///
/// If no layer for `source` exists yet, this behaves like [`set`]: a
/// new layer is created with the provided `attrs`.
///
/// This is useful for incremental / additive updates (for example,
/// runtime configuration driven by a Python API), where callers want
/// to change a subset of keys without discarding previously installed
/// values in the same layer.
///
/// By contrast, [`set`] replaces the entire layer for `source` with
/// `attrs`, discarding any existing values in that layer.
pub fn create_or_merge(source: Source, attrs: Attrs) {
let mut g = LAYERS.write().unwrap();
if let Some(layer) = g.ordered.iter_mut().find(|l| layer_source(l) == source) {
layer_attrs_mut(layer).merge(attrs);
} else {
g.ordered.push(make_layer(source, attrs));
}
g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Runtime < Env < File < ClientOverride
}
Expand Down Expand Up @@ -1206,4 +1252,37 @@ mod tests {
assert!(priority(Env) < priority(File));
assert!(priority(File) < priority(ClientOverride));
}

#[test]
fn test_create_or_merge_runtime_merges_keys() {
let _lock = lock();
reset_to_defaults();

// Seed Runtime with one key.
let mut rt = Attrs::new();
rt[MESSAGE_TTL_DEFAULT] = 10;
set(Source::Runtime, rt);

// Now update Runtime with a different key via
// `create_or_merge`.
let mut update = Attrs::new();
update[MESSAGE_ACK_EVERY_N_MESSAGES] = 123;
create_or_merge(Source::Runtime, update);

// Both keys should now be visible from Runtime.
assert_eq!(get(MESSAGE_TTL_DEFAULT), 10);
assert_eq!(get(MESSAGE_ACK_EVERY_N_MESSAGES), 123);
}

#[test]
fn test_create_or_merge_runtime_creates_layer_if_missing() {
let _lock = lock();
reset_to_defaults();

let mut rt = Attrs::new();
rt[MESSAGE_TTL_DEFAULT] = 42;
create_or_merge(Source::Runtime, rt);

assert_eq!(get(MESSAGE_TTL_DEFAULT), 42);
}
}
42 changes: 14 additions & 28 deletions hyperactor_mesh/src/alloc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use super::ProcStopReason;
use crate::assign::Ranks;
use crate::bootstrap;
use crate::bootstrap::Allocator2Process;
use crate::bootstrap::MESH_ENABLE_FILE_CAPTURE;
use crate::bootstrap::MESH_ENABLE_LOG_FORWARDING;
use crate::bootstrap::MESH_TAIL_LOG_LINES;
use crate::bootstrap::Process2Allocator;
Expand Down Expand Up @@ -454,43 +453,30 @@ impl ProcessAlloc {
}
let mut cmd = self.cmd.lock().await;

// Read config (defaults are in 'bootstrap.rs').
// In the case `MESH_ENABLE_LOG_FORWARDING` is set it's
// probable the client execution context is a notebook. In
// that case, for output from this process's children to
// reach the client, we **must** use pipes and copy output
// from child to parent (**`Stdio::inherit`** does not work!).
// So, this variable is being used as a proxy for "use pipes"
// here.
let enable_forwarding = hyperactor::config::global::get(MESH_ENABLE_LOG_FORWARDING);
let enable_file_capture = hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE);
let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);

// We don't support FileAppender in this v0 allocator path; warn if asked.
if enable_file_capture {
tracing::info!(
"MESH_ENABLE_FILE_CAPTURE=true, but ProcessAllocator (v0) has no FileAppender; \
files will NOT be written in this path"
);
}

let need_stdio = enable_forwarding || tail_size > 0;

if need_stdio {
if enable_forwarding || tail_size > 0 {
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
} else {
cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
tracing::info!(
enable_forwarding,
enable_file_capture,
tail_size,
"child stdio NOT captured (forwarding/file_capture/tail all disabled); \
inheriting parent console"
);
}

// Only allocate & export a log channel when forwarding is
// enabled.
let log_channel: Option<ChannelAddr> = if enable_forwarding {
let addr = ChannelAddr::any(ChannelTransport::Unix);
cmd.env(bootstrap::BOOTSTRAP_LOG_CHANNEL, addr.to_string());
Some(addr)
} else {
None
};
// Regardless of the value of `MESH_ENABLE_LOG_FORWARDING`
// (c.f. `enable_forwarding`), we do not do log forwarding on
// these procs. This is because, now that we are on the v1
// path, the only procs we spawn via this code path are those
// to support `HostMeshAgent`s.
let log_channel: Option<ChannelAddr> = None;

let index = self.created.len();
self.created.push(ShortUuid::generate());
Expand Down
20 changes: 11 additions & 9 deletions hyperactor_mesh/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use hyperactor::clock::RealClock;
use hyperactor::config::CONFIG;
use hyperactor::config::ConfigAttr;
use hyperactor::config::global as config;
use hyperactor::config::global::override_or_global;
use hyperactor::context;
use hyperactor::declare_attrs;
use hyperactor::host::Host;
Expand Down Expand Up @@ -96,7 +97,7 @@ declare_attrs! {
/// piping) or via [`StreamFwder`] when piping is active.
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESH_ENABLE_LOG_FORWARDING".to_string()),
py_name: None,
py_name: Some("enable_log_forwarding".to_string()),
})
pub attr MESH_ENABLE_LOG_FORWARDING: bool = false;

Expand All @@ -121,7 +122,7 @@ declare_attrs! {
/// buffer used for peeking—independent of file capture.
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESH_ENABLE_FILE_CAPTURE".to_string()),
py_name: None,
py_name: Some("enable_file_capture".to_string()),
})
pub attr MESH_ENABLE_FILE_CAPTURE: bool = false;

Expand All @@ -130,7 +131,7 @@ declare_attrs! {
/// pipes. Default: 100
@meta(CONFIG = ConfigAttr {
env_name: Some("HYPERACTOR_MESH_TAIL_LOG_LINES".to_string()),
py_name: None,
py_name: Some("tail_log_lines".to_string()),
})
pub attr MESH_TAIL_LOG_LINES: usize = 0;

Expand Down Expand Up @@ -1848,6 +1849,13 @@ impl ProcManager for BootstrapProcManager {
let (callback_addr, mut callback_rx) =
channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;

// Decide whether we need to capture stdio.
let overrides = &config.client_config_override;
let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;

let mode = Bootstrap::Proc {
proc_id: proc_id.clone(),
backend_addr,
Expand All @@ -1862,12 +1870,6 @@ impl ProcManager for BootstrapProcManager {
.map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
);

// Decide whether we need to capture stdio.
let enable_forwarding = hyperactor::config::global::get(MESH_ENABLE_LOG_FORWARDING);
let enable_file_capture = hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE);
let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;

if need_stdio {
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
} else {
Expand Down
2 changes: 1 addition & 1 deletion monarch_hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn set_global_config<T: AttrValue + Debug>(key: &'static dyn ErasedKey, value: T
let key = key.downcast_ref().expect("cannot fail");
let mut attrs = Attrs::new();
attrs.set(key.clone(), value);
hyperactor::config::global::set(Source::Runtime, attrs);
hyperactor::config::global::create_or_merge(Source::Runtime, attrs);
Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions python/monarch/_rust_bindings/monarch_hyperactor/config.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ def reload_config_from_env() -> None:

def configure(
default_transport: ChannelTransport = ChannelTransport.Unix,
enable_log_forwarding: bool = False,
enable_file_capture: bool = False,
tail_log_lines: int = 0,
) -> None: ...
def get_configuration() -> Dict[str, Any]: ...
23 changes: 23 additions & 0 deletions python/monarch/_src/actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,26 @@
"""
Monarch Actor API
"""

from monarch._rust_bindings.monarch_hyperactor.config import configure

# Detect if we're running in IPython/Jupyter
_in_ipython = False
try:
# pyre-ignore[21]
from IPython import get_ipython

_in_ipython = get_ipython() is not None
except ImportError:
pass

# Set notebook-friendly defaults for stdio piping when spawning procs.
# These config is read by:
# 1. Rust BootstrapProcManager::spawn() to decide whether to pipe
# child stdio
# 2. Rust LoggingMeshClient::spawn() to decide whether to spawn
# LogForwardActors
# Only apply these defaults overrides in notebook/IPython environments
# where stdout **needs** to be captured.
if _in_ipython:
configure(enable_log_forwarding=True)
Loading