From 9a7da62b43810fa6b0ba32306f014b23df08b57b Mon Sep 17 00:00:00 2001 From: zdevito Date: Thu, 20 Nov 2025 16:58:27 -0800 Subject: [PATCH 1/2] Removing remaining pipe functionality Differential Revision: [D87599488](https://our.internmc.facebook.com/intern/diff/D87599488/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87599488/)! [ghstack-poisoned] --- monarch_tensor_worker/src/lib.rs | 214 ++------------- monarch_tensor_worker/src/pipe.rs | 385 --------------------------- monarch_tensor_worker/src/py_pipe.rs | 161 ----------- monarch_tensor_worker/src/stream.rs | 164 +----------- monarch_tensor_worker/test_utils.py | 7 - 5 files changed, 25 insertions(+), 906 deletions(-) delete mode 100644 monarch_tensor_worker/src/pipe.rs delete mode 100644 monarch_tensor_worker/src/py_pipe.rs diff --git a/monarch_tensor_worker/src/lib.rs b/monarch_tensor_worker/src/lib.rs index 1b25420f2..6329e24bb 100644 --- a/monarch_tensor_worker/src/lib.rs +++ b/monarch_tensor_worker/src/lib.rs @@ -30,8 +30,6 @@ mod borrow; mod comm; pub mod device_mesh; -pub mod pipe; -pub mod py_pipe; pub mod stream; pub mod test_util; @@ -71,7 +69,6 @@ use monarch_messages::controller::Seq; use monarch_messages::wire_value::WireValue; use monarch_messages::worker::ActorCallParams; use monarch_messages::worker::ActorMethodParams; -use monarch_messages::worker::CallFunctionError; use monarch_messages::worker::CallFunctionParams; use monarch_messages::worker::Factory; use monarch_messages::worker::Reduction; @@ -84,8 +81,6 @@ use monarch_messages::worker::WorkerMessageHandler; use monarch_messages::worker::WorkerParams; use monarch_types::PyTree; use ndslice::Slice; -use pipe::PipeActor; -use pipe::PipeParams; use pyo3::Python; use pyo3::types::PyAnyMethods; use serde::Deserialize; @@ -173,8 +168,6 @@ pub struct WorkerActor { borrows: HashMap, comm: Option>, controller_actor: ActorRef, - /// Pipes created for the worker. - pipes: HashMap>, /// Remember the process groups "created" via `CreateRemoteProcessGroup` for /// subsequent `CallFunction` calls, as this is where the actual allocation /// will happen. @@ -244,7 +237,6 @@ impl Actor for WorkerActor { borrows: HashMap::new(), comm: None, controller_actor, - pipes: HashMap::new(), remote_process_groups: HashMap::new(), send_recv_comms: HashMap::new(), recordings: HashMap::new(), @@ -648,47 +640,18 @@ impl WorkerMessageHandler for WorkerActor { async fn create_pipe( &mut self, - cx: &hyperactor::Context, - result: Ref, + _cx: &hyperactor::Context, + _result: Ref, // TODO(agallagher): This is used in the python impl to name the socket // path to use for comms, but we don't currently use a named socket. _key: String, - function: ResolvableFunction, - max_messages: i64, - device_mesh: Ref, - args: Vec, - kwargs: HashMap, + _function: ResolvableFunction, + _max_messages: i64, + _device_mesh: Ref, + _args: Vec, + _kwargs: HashMap, ) -> Result<()> { - println!("CREATE PIPE1 {}", result); - let args: Vec> = args - .into_iter() - .map(|object| RValue::PyObject(object.into_py_object().unwrap()).into()) - .collect(); - let kwargs: HashMap<_, PyTree> = kwargs - .into_iter() - .map(|(k, object)| (k, RValue::PyObject(object.into_py_object().unwrap()).into())) - .collect(); - let device_mesh = self.device_meshes.get(&device_mesh).ok_or_else(|| { - CallFunctionError::Error(anyhow::anyhow!("ref not found: {}", device_mesh)) - })?; - println!("CREATE PIPE2 {}", result); - // TODO(agallagher): Fix error prop. (When pipe is read from the pipes dict if it had an error it should cause a dependent error in send_value not an actor error as it does now) - let pipe = PipeActor::spawn( - cx, - PipeParams { - function, - max_messages, - ranks: device_mesh.0.ranks(), - sizes: device_mesh.0.sizes(), - args, - kwargs, - }, - ) - .await?; - println!("AFTER CREATE PIPE {}", result); - - self.pipes.insert(result, pipe); - Ok(()) + panic!("create_pipe is no longer implemented") } async fn send_tensor( @@ -818,18 +781,11 @@ impl WorkerMessageHandler for WorkerActor { .collect() }; - let pipe = if let Some(destination) = destination { - let pipe = self - .pipes - .get(&destination) - .ok_or_else(|| anyhow::anyhow!("invalid pipe id: {:#?}", destination))? - .port(); - Some(pipe) - } else { - None - }; - // Resolve the value on the stream, then send the value to the pipe if provided, - // or back to the controller if not. + if destination.is_some() { + panic!("send_value with pipe destination is no longer implemented") + } + + // Resolve the value on the stream, then send the value back to the controller. stream .send_value( cx, @@ -840,7 +796,6 @@ impl WorkerMessageHandler for WorkerActor { args, kwargs, device_meshes, - pipe, ) .await } @@ -971,24 +926,13 @@ impl WorkerMessageHandler for WorkerActor { async fn pipe_recv( &mut self, - cx: &hyperactor::Context, - seq: Seq, - results: Vec>, - pipe: Ref, - stream: StreamRef, + _cx: &hyperactor::Context, + _seq: Seq, + _results: Vec>, + _pipe: Ref, + _stream: StreamRef, ) -> Result<()> { - self.maybe_add_stream_to_recording(cx, stream).await?; - - // Get a port for the pipe - let pipe = self - .pipes - .get(&pipe) - .ok_or_else(|| anyhow::anyhow!("ref not found: {}", pipe))?; - let pipe = pipe.port(); - // Resolve the stream. - let stream = self.try_get_stream(stream)?; - // Push result into the stream. - stream.set_value(cx, seq, results, pipe).await + panic!("pipe_recv is no longer implemented") } async fn set_ref_unit_tests_only( @@ -2186,126 +2130,6 @@ mod tests { Ok(()) } - #[async_timed_test(timeout_secs = 60)] - async fn pipe_send_recv() -> Result<()> { - test_setup()?; - - let proc = Proc::local(); - let (client, controller_ref, mut controller_rx) = proc.attach_actor("controller").unwrap(); - - let handle = proc - .spawn::( - "worker", - WorkerParams { - world_size: 1, - rank: 0, - device_index: None, - controller_actor: controller_ref, - }, - ) - .await - .unwrap(); - let (resolve_value_arg, torch_eq_arg1, torch_eq_arg2): ( - PickledPyObject, - PickledPyObject, - PickledPyObject, - ) = Python::with_gil(|py| { - PyResult::Ok(( - PyList::new(py, [2, 3])?.into_any().try_into()?, - Ref { id: 2 }.into_bound_py_any(py)?.try_into()?, - Ref { id: 4 }.into_bound_py_any(py)?.try_into()?, - )) - })?; - - handle - .command_group( - &client, - vec![ - WorkerMessage::CreateStream { - id: 0.into(), - stream_creation: StreamCreationMode::UseDefaultStream, - }, - WorkerMessage::CreateDeviceMesh { - result: 1.into(), - names: vec!["x".into()], - ranks: Slice::new(0, vec![2], vec![1]).unwrap(), - }, - // Create a tensor value which we'll send through the pipe. - WorkerMessage::CallFunction(CallFunctionParams { - seq: 0.into(), - results: vec![Some(2.into())], - mutates: vec![], - function: "torch.ops.aten.ones.default".into(), - args: vec![WireValue::IntList(vec![2, 3])], - kwargs: HashMap::new(), - stream: 0.into(), - remote_process_groups: vec![], - }), - WorkerMessage::CreatePipe { - result: 3.into(), - key: "unused".into(), - function: "monarch.monarch_tensor_worker.test_utils.handler".into(), - max_messages: 1, - mesh: 1.into(), - args: vec![], - kwargs: HashMap::new(), - }, - WorkerMessage::SendValue { - seq: 1.into(), - destination: Some(3.into()), - mutates: vec![], - function: Some( - "monarch.monarch_tensor_worker.test_utils.resolve_value".into(), - ), - args: vec![resolve_value_arg.into()], - kwargs: HashMap::new(), - stream: 0.into(), - }, - WorkerMessage::PipeRecv { - seq: 2.into(), - results: vec![Some(4.into())], - pipe: 3.into(), - stream: 0.into(), - }, - WorkerMessage::CallFunction(CallFunctionParams { - seq: 0.into(), - results: vec![Some(5.into())], - mutates: vec![], - function: "torch.equal".into(), - args: vec![torch_eq_arg1.into(), torch_eq_arg2.into()], - kwargs: HashMap::new(), - stream: 0.into(), - remote_process_groups: vec![], - }), - ], - ) - .await - .unwrap(); - - let matches: bool = handle - .get_ref_unit_tests_only(&client, 5.into(), 0.into()) - .await - .unwrap() - .unwrap() - .unwrap() - .try_into() - .unwrap(); - assert!(matches); - - handle.drain_and_stop()?; - assert_matches!(handle.await, ActorStatus::Stopped); - - let responses = controller_rx.drain(); - assert_eq!( - responses.len(), - 0, - "Expected one response, got: {:#?}", - responses - ); - - Ok(()) - } - fn get_random_channel_addr() -> ChannelAddr { let random_string = rand::thread_rng() .sample_iter(&Alphanumeric) diff --git a/monarch_tensor_worker/src/pipe.rs b/monarch_tensor_worker/src/pipe.rs deleted file mode 100644 index 0b1e96960..000000000 --- a/monarch_tensor_worker/src/pipe.rs +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * All rights reserved. - * - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. - */ - -use std::collections::HashMap; -use std::future::Future; -use std::io::Read; -use std::io::Write; -use std::process::Stdio; -use std::thread; - -use anyhow::Context; -use anyhow::Result; -use anyhow::anyhow; -use anyhow::bail; -use async_trait::async_trait; -use hyperactor::Actor; -use hyperactor::HandleClient; -use hyperactor::Handler; -use hyperactor::forward; -use hyperactor::mailbox::OncePortHandle; -use monarch_messages::controller::WorkerError; -use monarch_types::PyTree; -use nix::sys::wait::WaitStatus; -use nix::unistd::Pid; -use serde::Deserialize; -use serde::Serialize; -use serde::de::DeserializeOwned; -use tokio::io::AsyncRead; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWrite; -use tokio::io::AsyncWriteExt; -use tokio::process::Child; -use tokio::process::Command; -use tokio::sync::mpsc; -use tokio::task; -use torch_sys::RValue; - -use crate::ResolvableFunction; - -/// Simple communication channel to send/recv objects over an async stream. -pub trait AsyncPipe { - fn send(&mut self, val: T) -> impl Future>; - fn recv(&mut self) -> impl Future>; -} - -/// Simple communication channel to send/recv objects over a synchronous stream. -/// NOTE: This synchronous specialization is mainly useful when wrapped w/ the -/// `PyPipe` struct, which is also synchronous (via Python). -pub trait Pipe { - fn send(&mut self, val: T) -> Result<()>; - fn recv(&mut self) -> Result; -} - -#[derive(Serialize, Deserialize)] -pub struct OutOfProcessSetupParams { - pub sizes: HashMap, - pub ranks: HashMap, - pub function: ResolvableFunction, - pub args: Vec>, - pub kwargs: HashMap>, -} - -impl AsyncPipe - for (mpsc::UnboundedSender, mpsc::UnboundedReceiver) -{ - async fn send(&mut self, val: T) -> Result<()> { - Ok(self.0.send(val)?) - } - - async fn recv(&mut self) -> Result { - Ok(self - .1 - .recv() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))?) - } -} - -impl Pipe for (mpsc::UnboundedSender, mpsc::UnboundedReceiver) { - fn send(&mut self, val: T) -> Result<()> { - Ok(self.0.send(val)?) - } - - fn recv(&mut self) -> Result { - Ok(self - .1 - .blocking_recv() - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))?) - } -} - -/// Return a pair of parent/child pipes connected via unbounded tokio mpsc -/// queues. -pub fn create_local_pipe() -> ( - (mpsc::UnboundedSender, mpsc::UnboundedReceiver), - (mpsc::UnboundedSender, mpsc::UnboundedReceiver), -) { - let (t1, r1) = mpsc::unbounded_channel(); - let (t2, r2) = mpsc::unbounded_channel(); - ((t1, r2), (t2, r1)) -} - -pub trait AsyncWriteDebug: std::fmt::Debug + AsyncWrite + Sync + Send + Unpin {} -impl AsyncWriteDebug for T {} - -#[derive(Debug)] -pub struct AsyncStreamPipe { - writer: Box, - channel_reader: mpsc::Receiver>, -} - -impl AsyncStreamPipe { - /// Create a new `AsyncStreamPipe` from a reader/writer pair. - /// The pipe will run a background task to read-ahead up to max_messages - /// messages to make them immediately available to read. - /// When reader is closed, the background task will exit and further - /// reads will return an error. - pub fn new( - mut reader: impl AsyncRead + Unpin + Send + 'static, - writer: impl AsyncWriteDebug + 'static, - max_messages: usize, - ) -> Self { - let (channel_writer, channel_reader) = mpsc::channel::>(max_messages); - - task::spawn(async move { - loop { - let mut buf = vec![0; 8]; - match reader.read_exact(&mut buf).await { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, - // Other errors should not be expected. We should perhaps log and break - // instead of panicking. - Err(e) => panic!("preamble read failed: {}", e), - } - let len = u64::from_be_bytes(buf.try_into().unwrap()); - buf = vec![0; len as usize]; - match reader.read_exact(&mut buf).await { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, - // Other errors should not be expected. We should perhaps log and break - // instead of panicking. - Err(e) => panic!("read failed: {}", e), - } - if channel_writer.send(buf).await.is_err() { - // receiver closed AsyncStreamPipe dropped, so we can break out of the loop - break; - } - } - }); - - AsyncStreamPipe { - writer: Box::new(writer), - channel_reader, - } - } -} - -impl AsyncPipe for AsyncStreamPipe { - async fn send(&mut self, val: T) -> Result<()> { - let bytes = bincode::serialize(&val)?; - let len = bytes.len(); - self.writer.write_all(&len.to_be_bytes()).await?; - self.writer.write_all(&bytes).await?; - Ok(()) - } - - async fn recv(&mut self) -> Result { - let buf = self.channel_reader.recv().await.expect("recv failed"); - Ok(bincode::deserialize(&buf)?) - } -} - -pub trait WriteDebug: std::fmt::Debug + Write + Sync + Send {} -impl WriteDebug for T {} - -pub struct StreamPipe { - writer: Box, - channel_reader: std::sync::Arc>>>, -} - -impl StreamPipe { - /// Create a new `AsyncStreamPipe` from a reader/writer pair. - /// The pipe will run a background thread to read-ahead up to max_messages - /// messages to make them immediately available to read. - /// When reader is closed, the background thread will exit and further - /// reads will return an error. - pub fn new( - mut reader: impl Read + Send + 'static, - writer: impl WriteDebug + 'static, - max_messages: usize, - ) -> Self { - let (channel_writer, channel_reader) = - ::std::sync::mpsc::sync_channel::>(max_messages); - - thread::spawn(move || { - loop { - let mut buf = vec![0; 8]; - match reader.read_exact(&mut buf) { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, - // Other errors should not be expected. We should perhaps log and break - // instead of panicking. - Err(e) => panic!("preamble read failed: {}", e), - } - let len = u64::from_be_bytes(buf.try_into().unwrap()); - buf = vec![0; len as usize]; - match reader.read_exact(&mut buf) { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, - Err(e) => panic!("preamble read failed: {}", e), - } - if channel_writer.send(buf).is_err() { - // receiver closed StreamPipe dropped, so we can break out of the loop - break; - } - } - }); - - StreamPipe { - writer: Box::new(writer), - channel_reader: std::sync::Arc::new(std::sync::Mutex::new(channel_reader)), - } - } -} - -impl Pipe for StreamPipe { - fn send(&mut self, val: T) -> Result<()> { - let bytes = bincode::serialize(&val)?; - let len = bytes.len(); - self.writer.write_all(&len.to_be_bytes())?; - self.writer.write_all(&bytes)?; - self.writer.flush()?; - Ok(()) - } - - fn recv(&mut self) -> Result { - let buf = self - .channel_reader - .lock() - .unwrap() - .recv() - .expect("recv failed"); - Ok(bincode::deserialize(&buf)?) - } -} - -#[allow(dead_code)] -#[derive(Handler, HandleClient, Debug)] -pub enum PipeMessage { - SendValue(Result, WorkerError>), - - RecvValue(#[reply] OncePortHandle>), -} - -#[derive(Debug)] -pub struct PipeActor { - // NOTE: Use `Option` wrappers to allow moving in `Drop` impl below. - pipe: Option, - handle: Child, -} - -/// Initialization parameters for `PipeActor`. -#[derive(Debug, Clone)] -pub struct PipeParams { - pub function: ResolvableFunction, - pub max_messages: i64, - pub ranks: HashMap, - pub sizes: HashMap, - pub args: Vec>, - pub kwargs: HashMap>, -} - -#[async_trait] -impl Actor for PipeActor { - type Params = PipeParams; - - async fn new(params: Self::Params) -> Result { - let mut command = Command::new( - std::env::var("MONARCH_TENSOR_WORKER_EXE") - .map_err(|e| anyhow!("could not get var MONARCH_TENSOR_WORKER_EXE: {}", e))?, - ); - if let Ok(main) = std::env::var("MONARCH_TENSOR_WORKER_MAIN") { - if std::env::var("FB_XAR_INVOKED_NAME").is_ok() { - command.env("PAR_MAIN_OVERRIDE", main); - } else { - command.arg("-m").arg(main); - } - } - - // Spawn server process. - let mut handle = command - .arg("pipe") - .stdout(Stdio::piped()) - .stdin(Stdio::piped()) - .kill_on_drop(true) - .spawn()?; - - // Send init args. - let mut pipe = AsyncStreamPipe::new( - handle.stdout.take().unwrap(), - handle.stdin.take().unwrap(), - params.max_messages as usize, - ); - let params = OutOfProcessSetupParams { - ranks: params.ranks, - sizes: params.sizes, - function: params.function, - args: params.args, - kwargs: params.kwargs, - }; - tokio::select! { - res = handle.wait() => bail!("pipe server exited: {:?}", res), - res = pipe.send(params) => res?, - } - - Ok(Self { - pipe: Some(pipe), - handle, - }) - } -} - -impl PipeActor { - /// Forcibly kill and cleanup the pipe server. Avoids `await` to be usable - /// in `Drop`. - fn kill_pipe_server(&mut self) -> Result<()> { - self.handle.start_kill()?; - - // NOT(agallagher): Since this is called from `drop()`, we can't - // use the async `wait()` method (is there a way to convert to - // `std::process::Child`?). - let pid = Pid::from_raw(self.handle.id().context("cannot get pid")? as i32); - match nix::sys::wait::waitpid(pid, None)? { - WaitStatus::Exited(_, 0) => (), - status => bail!("exited abnormally: {:?}", status), - } - Ok(()) - } -} - -// TODO(agallager): It'd be nice if the `Actor` API had a `shutdown` mechanism -// which could allow for preserving error propagation in cases like this. -impl Drop for PipeActor { - fn drop(&mut self) { - // Close the pipe first, which should make the server end get an EPIPE - // and die. - self.pipe.take(); - - // Kill/cleanup the server. - if let Err(err) = self.kill_pipe_server() { - tracing::warn!("error cleaning up pipe server: {}", err); - } - } -} - -#[async_trait] -#[forward(PipeMessage)] -impl PipeMessageHandler for PipeActor { - async fn send_value( - &mut self, - _cx: &hyperactor::Context, - val: Result, WorkerError>, - ) -> Result<()> { - // TODO(agallagher): Propagate failures and use a timeout and handle worker errors? - let val = val.map_err(|err| anyhow::anyhow!(err.backtrace).context("worker error"))?; - tokio::select! { - res = self.handle.wait() => bail!("pipe server exited: {:?}", res), - res = self.pipe.as_mut().unwrap().send(val) => res?, - }; - Ok(()) - } - - async fn recv_value(&mut self, _cx: &hyperactor::Context) -> Result> { - // TODO(agallagher): Propagate failures and use a timeout? - tokio::select! { - res = self.handle.wait() => bail!("pipe server exited: {:?}", res), - res = self.pipe.as_mut().unwrap().recv() => res - } - } -} diff --git a/monarch_tensor_worker/src/py_pipe.rs b/monarch_tensor_worker/src/py_pipe.rs deleted file mode 100644 index fda5163da..000000000 --- a/monarch_tensor_worker/src/py_pipe.rs +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * All rights reserved. - * - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. - */ - -use std::collections::HashMap; - -use monarch_messages::worker::ResolvableFunction; -use monarch_types::PyTree; -use monarch_types::TryIntoPyObjectUnsafe; -use pyo3::prelude::*; -use pyo3::types::PyTuple; -use torch_sys::RValue; - -use crate::pipe::Pipe; - -/// Wrapper around `Pipe` to make it usable in Python. -#[pyclass] -pub struct PyPipe { - pipe: Box> + Send + Sync>, - #[pyo3(get)] - ranks: HashMap, - #[pyo3(get)] - sizes: HashMap, - allow_unsafe_obj_conversion: bool, -} - -impl PyPipe { - pub fn new( - pipe: Box> + Send + Sync>, - ranks: HashMap, - sizes: HashMap, - allow_unsafe_obj_conversion: bool, - ) -> Self { - Self { - pipe, - ranks, - sizes, - allow_unsafe_obj_conversion, - } - } -} - -#[pymethods] -impl PyPipe { - fn send(&mut self, py: Python<'_>, value: &Bound<'_, PyAny>) -> PyResult<()> { - let val = value.extract::>()?; - py.allow_threads(move || self.pipe.send(val))?; - Ok(()) - } - - fn recv<'a>(&mut self, py: Python<'a>) -> PyResult> { - let val = py.allow_threads(|| self.pipe.recv())?; - if self.allow_unsafe_obj_conversion { - // SAFETY: A caller who initialized this PyPipe with allow_unsafe_obj_conversion=True - // asserts that it is safe to use this unsafe method. - unsafe { val.try_to_object_unsafe(py) } - } else { - val.into_pyobject(py) - } - } -} - -/// Run a Python pipe server, which loads a remote function sent over the pipe -/// then delegates to it. -pub fn run_py_pipe( - pipe: PyPipe, - func: ResolvableFunction, - args: Vec>, - kwargs: HashMap>, -) -> PyResult<()> { - Python::with_gil(|py| { - let pipe_obj: Py = Py::new(py, pipe)?; - let func = func.resolve(py)?; - let mut py_args = vec![pipe_obj.into_bound(py).into_any()]; - py_args.extend( - args.into_iter() - .map(|a| a.into_pyobject(py)) - .collect::, _>>()?, - ); - func.call(PyTuple::new(py, py_args)?, Some(&kwargs.into_pyobject(py)?))?; - Ok(()) - }) -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - use std::collections::HashMap; - - use anyhow::Result; - use futures::try_join; - use indoc::indoc; - use pyo3::Python; - use pyo3::ffi::c_str; - use pyo3::types::PyModule; - use timed_test::async_timed_test; - use torch_sys::RValue; - - use super::PyPipe; - use super::run_py_pipe; - use crate::pipe::AsyncPipe; - use crate::pipe::create_local_pipe; - - #[async_timed_test(timeout_secs = 60)] - async fn test_py_pipe() -> Result<()> { - pyo3::prepare_freethreaded_python(); - // We need to load torch to initialize some internal structures used by - // the FFI funcs we use to convert ivalues to/from py objects. - Python::with_gil(|py| py.run(c_str!("import torch"), None, None))?; - - // Create the Python function that runs as the pipe handler. - Python::with_gil(|py| { - let _mod = PyModule::from_code( - py, - c_str!(indoc! {r#" - def func(pipe): - val = pipe.recv() - pipe.send(val) - "#}), - c_str!("test_helpers.py"), - c_str!("test_helpers"), - )?; - anyhow::Ok(()) - })?; - - let (mut client, server) = create_local_pipe(); - let ((), ()) = try_join!( - // Startup the pipe server side. - async move { - tokio::task::spawn_blocking(move || { - run_py_pipe( - PyPipe::new( - Box::new(server), - HashMap::new(), - HashMap::new(), - false, // allow_unsafe_obj_conversion - ), - "test_helpers.func".into(), - vec![], - HashMap::new(), - ) - }) - .await??; - anyhow::Ok(()) - }, - // Run the pipe client side. - async move { - client.send(RValue::Int(3).into()).await?; - let val = client.recv().await?; - assert_matches!(val.into_leaf().unwrap(), RValue::Int(3)); - anyhow::Ok(()) - }, - )?; - - Ok(()) - } -} diff --git a/monarch_tensor_worker/src/stream.rs b/monarch_tensor_worker/src/stream.rs index 3f4915509..16917b27c 100644 --- a/monarch_tensor_worker/src/stream.rs +++ b/monarch_tensor_worker/src/stream.rs @@ -82,7 +82,6 @@ use crate::comm::CommBackend; use crate::comm::CommMessage; use crate::comm::CommMessageClient; use crate::comm::NcclCommActor; -use crate::pipe::PipeMessage; pub type TensorCellResult = Result>; @@ -221,13 +220,6 @@ pub enum StreamMessage { args: Vec, kwargs: HashMap, device_meshes: HashMap, - pipe: Option>, - }, - - SetValue { - seq: Seq, - results: Vec>, - pipe: PortHandle, }, DefineRecording { @@ -365,11 +357,6 @@ impl StreamMessage { factory: factory.clone(), comm: comm.clone(), }, - StreamMessage::SetValue { seq, results, pipe } => StreamMessage::SetValue { - seq: seq.clone(), - results: results.clone(), - pipe: pipe.clone(), - }, other => panic!( "StreamMessage variant not supported in recording: {:?}", other @@ -395,9 +382,6 @@ impl StreamMessage { HashSet::new() } } - StreamMessage::SetValue { results, .. } => { - results.iter().filter_map(|&ref_| ref_).collect() - } // TODO(slurye): Add SendValue eventually. _ => HashSet::new(), } @@ -1570,9 +1554,8 @@ impl StreamMessageHandler for StreamActor { args: Vec, kwargs: HashMap, device_meshes: HashMap, - pipe: Option>, ) -> Result<()> { - if self.respond_with_python_message && pipe.is_none() { + if self.respond_with_python_message { return self .send_value_python_message(cx, seq, mutates, function, args, kwargs, device_meshes) .await; @@ -1682,15 +1665,11 @@ impl StreamMessageHandler for StreamActor { }; // Actually send the value. - if let Some(pipe) = pipe { - pipe.send(PipeMessage::SendValue(value))?; - } else { - let result = match value { - Ok(value) => Ok(Serialized::serialize(&value).map_err(anyhow::Error::from)?), - Err(e) => Err(e), - }; - self.controller_actor.fetch_result(cx, seq, result).await?; - } + let result = match value { + Ok(value) => Ok(Serialized::serialize(&value).map_err(anyhow::Error::from)?), + Err(e) => Err(e), + }; + self.controller_actor.fetch_result(cx, seq, result).await?; Ok(()) } @@ -1733,31 +1712,6 @@ impl StreamMessageHandler for StreamActor { .await } - async fn set_value( - &mut self, - cx: &Context, - seq: Seq, - results: Vec>, - pipe: PortHandle, - ) -> Result<()> { - if let Some((recording, _)) = self.get_defining_recording() { - recording - .messages - .push(StreamMessage::SetValue { seq, results, pipe }); - return Ok(()); - } - - self.try_define(cx, seq, results, &vec![], async |self| { - let (tx, rx) = cx.open_once_port(); - pipe.send(PipeMessage::RecvValue(tx)) - .map_err(anyhow::Error::from) - .map_err(CallFunctionError::from)?; - let value = rx.recv().await.map_err(anyhow::Error::from)?; - Ok(value.into_leaves()) - }) - .await - } - async fn define_recording(&mut self, _cx: &Context, recording: Ref) -> Result<()> { if self.active_recording.is_some() { bail!("different recording already active"); @@ -3935,110 +3889,4 @@ mod tests { Ok(()) } - - #[async_timed_test(timeout_secs = 60)] - async fn test_set_value_in_recording_valid_pipe() -> Result<()> { - let mut test_setup = TestSetup::new().await?; - - let (pipe_tx, mut pipe_rx) = test_setup.client.open_port(); - - let recording_ref = test_setup.next_ref(); - test_setup - .stream_actor - .define_recording(&test_setup.client, recording_ref) - .await?; - - let result_ref_0 = test_setup.next_ref(); - - test_setup - .stream_actor - .set_value( - &test_setup.client, - 0.into(), - vec![Some(result_ref_0)], - pipe_tx, - ) - .await?; - - test_setup - .stream_actor - .recording_result(&test_setup.client, result_ref_0, 0) - .await?; - - test_setup - .stream_actor - .finalize_recording(&test_setup.client, recording_ref) - .await?; - - let real_result_ref = test_setup.next_ref(); - let recording_fut = test_setup.stream_actor.call_recording( - &test_setup.client, - 0.into(), - recording_ref, - vec![real_result_ref], - vec![], - ); - - let pipe_fut = async { - let msg = pipe_rx.recv().await.unwrap(); - match msg { - PipeMessage::RecvValue(tx) => { - tx.send(PyTree::from(RValue::Tensor(TensorCell::new( - factory_float_tensor(&[1.0, 2.0, 3.0], "cuda".try_into().unwrap()), - )))) - .unwrap(); - } - _ => panic!("Unexpected message"), - } - Ok(()) - }; - - tokio::try_join!(recording_fut, pipe_fut)?; - - assert!(test_setup.allclose(real_result_ref, &[1.0, 2.0, 3.0]).await); - - // This will cause the next call to set_value to fail. - drop(pipe_rx); - - let real_result_ref = test_setup.next_ref(); - test_setup - .stream_actor - .call_recording( - &test_setup.client, - 1.into(), - recording_ref, - vec![real_result_ref], - vec![], - ) - .await?; - - let real_result_err = test_setup - .stream_actor - .get_tensor_ref_unit_tests_only(&test_setup.client, real_result_ref) - .await? - .unwrap() - .unwrap_err(); - // Check that the error contains the expected string - let error_str = real_result_err.to_string(); - assert!( - error_str.contains("send error"), - "Error should contain 'send error': {}", - error_str - ); - - let controller_msg = test_setup.controller_rx.recv().await.unwrap(); - match controller_msg { - ControllerMessage::RemoteFunctionFailed { seq, error } => { - assert_eq!(seq, 1.into()); - assert!( - error.backtrace.contains("send error"), - "Unexpected WorkerError: {:?}", - error - ); - } - _ => panic!("Unexpected controller message: {:?}", controller_msg), - }; - - Ok(()) - } } diff --git a/monarch_tensor_worker/test_utils.py b/monarch_tensor_worker/test_utils.py index 57775023a..9c40ee2dd 100644 --- a/monarch_tensor_worker/test_utils.py +++ b/monarch_tensor_worker/test_utils.py @@ -13,7 +13,6 @@ import torch if TYPE_CHECKING: - from monarch.common.pipe import Pipe # @manual //monarch/python/monarch/common:pipe from monarch.worker.worker import ( # @manual //monarch/python/monarch/worker:worker DeviceMesh, ) @@ -23,12 +22,6 @@ def resolve_value(val: object) -> object: return val -def handler(pipe: Pipe) -> None: - arg = pipe.recv() - tensor = torch.ones(arg, dtype=torch.int32) - pipe.send(tensor) - - def mesh_rank(mesh: DeviceMesh, dim: str) -> int: return mesh.dims[dim].rank From d5d7f6d0bf8a5aaf9d8e33907372087cdd1e6481 Mon Sep 17 00:00:00 2001 From: zdevito Date: Fri, 21 Nov 2025 10:03:31 -0800 Subject: [PATCH 2/2] Update on "Removing remaining pipe functionality" Differential Revision: [D87599488](https://our.internmc.facebook.com/intern/diff/D87599488/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87599488/)! [ghstack-poisoned] --- monarch_tensor_worker/src/stream.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/monarch_tensor_worker/src/stream.rs b/monarch_tensor_worker/src/stream.rs index 16917b27c..605cda893 100644 --- a/monarch_tensor_worker/src/stream.rs +++ b/monarch_tensor_worker/src/stream.rs @@ -2217,7 +2217,6 @@ mod tests { vec![WireValue::PyObject(ref_to_send)], HashMap::new(), HashMap::new(), - None, ) .await .unwrap()