Skip to content

Commit a1ae076

Browse files
dulinrileymeta-codesync[bot]
authored andcommitted
Change proc mesh stop to exit the process instead of SIGTERM (#1900)
Summary: Pull Request resolved: #1900 Instead of relying on sending SIGTERM, give the process a chance to clean itself up gracefully. Have the StopAll message on ProcMeshAgent call exit(0) if it is able to clean up all actors successfully, or exit(1) if there's an issue. Then, instead of awaiting a reply from the actor, use the `wait()` functionality of ProcHandle to wait for it to exit. This way we don't get the SIGTERM stack dump, and gives the user process a chance to run atexit handlers such as static C++ object destructors (SIGTERM bypasses atexit handlers). This also means nothing uses GetAllRankStatus anymore, and we can delete it! We may also want to rename StopAll to StopSelf or something that implies it'll stop itself as well. Reviewed By: shayne-fletcher Differential Revision: D87108568 fbshipit-source-id: 9e263b67d05a0bda1f7f5fb09d2b799bb330e757
1 parent 27c98c9 commit a1ae076

File tree

3 files changed

+44
-129
lines changed

3 files changed

+44
-129
lines changed

hyperactor_mesh/src/bootstrap.rs

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ use crate::logging::OutputTarget;
7272
use crate::logging::StreamFwder;
7373
use crate::proc_mesh::mesh_agent::ProcMeshAgent;
7474
use crate::resource;
75-
use crate::resource::Status;
7675
use crate::v1;
7776
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
7877
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
@@ -1202,12 +1201,15 @@ impl BootstrapProcHandle {
12021201
(out, err)
12031202
}
12041203

1204+
/// Sends a StopAll message to the ProcMeshAgent, which should exit the process.
1205+
/// Waits for the successful state change of the process. If the process
1206+
/// doesn't reach a terminal state, returns Err.
12051207
async fn send_stop_all(
12061208
&self,
12071209
cx: &impl context::Actor,
12081210
agent: ActorRef<ProcMeshAgent>,
12091211
timeout: Duration,
1210-
) -> anyhow::Result<()> {
1212+
) -> anyhow::Result<ProcStatus> {
12111213
// For all of the messages and replies in this function:
12121214
// if the proc is already dead, then the message will be undeliverable,
12131215
// which should be ignored.
@@ -1217,25 +1219,12 @@ impl BootstrapProcHandle {
12171219
let mut agent_port = agent.port();
12181220
agent_port.return_undeliverable(false);
12191221
agent_port.send(cx, resource::StopAll {})?;
1220-
let (reply_port, mut rx) = cx.mailbox().open_port::<Vec<(usize, Status)>>();
1221-
let mut reply_port = reply_port.bind();
1222-
reply_port.return_undeliverable(false);
1223-
// Similar to above, if we cannot query for the stopped actors, just
1224-
// proceed with SIGTERM.
1225-
let mut agent_port = agent.port();
1226-
agent_port.return_undeliverable(false);
1227-
agent_port.send(cx, resource::GetAllRankStatus { reply: reply_port })?;
1228-
// If there's a timeout waiting for a reply, continue with SIGTERM.
1229-
let statuses = RealClock.timeout(timeout, rx.recv()).await??;
1230-
let has_failure = statuses.iter().any(|(_rank, status)| status.is_failure());
1231-
1232-
if has_failure {
1233-
Err(anyhow::anyhow!(
1234-
"StopAll had some actors that failed: {:?}",
1235-
statuses,
1236-
))
1237-
} else {
1238-
Ok(())
1222+
// The agent handling Stop should exit the process, if it doesn't within
1223+
// the time window, we escalate to SIGTERM.
1224+
match RealClock.timeout(timeout, self.wait()).await {
1225+
Ok(Ok(st)) => Ok(st),
1226+
Ok(Err(e)) => Err(anyhow::anyhow!("agent did not exit the process: {:?}", e)),
1227+
Err(_) => Err(anyhow::anyhow!("agent did not exit the process in time")),
12391228
}
12401229
}
12411230
}
@@ -1361,20 +1350,19 @@ impl hyperactor::host::ProcHandle for BootstrapProcHandle {
13611350
// they are in the Ready state and have an Agent we can message.
13621351
let agent = self.agent_ref();
13631352
if let Some(agent) = agent {
1364-
if let Err(e) = self.send_stop_all(cx, agent.clone(), timeout).await {
1365-
// Variety of possible errors, proceed with SIGTERM.
1366-
tracing::warn!(
1367-
"ProcMeshAgent {} could not successfully stop all actors: {}",
1368-
agent.actor_id(),
1369-
e,
1370-
);
1353+
match self.send_stop_all(cx, agent.clone(), timeout).await {
1354+
Ok(st) => return Ok(st),
1355+
Err(e) => {
1356+
// Variety of possible errors, proceed with SIGTERM.
1357+
tracing::warn!(
1358+
"ProcMeshAgent {} could not successfully stop all actors: {}",
1359+
agent.actor_id(),
1360+
e,
1361+
);
1362+
}
13711363
}
1372-
// Even if the StopAll message and response is fully effective, we
1373-
// still want to send SIGTERM to actually exit the process and free
1374-
// any leftover resources. No actor should be running at this
1375-
// point.
13761364
}
1377-
// After the stop all actors message may be successful, we still need
1365+
// If the stop all actors message was unsuccessful, we need
13781366
// to actually stop the process.
13791367
let _ = self.mark_stopping();
13801368

@@ -1690,7 +1678,7 @@ impl BootstrapProcManager {
16901678
/// Return the current [`ProcStatus`] for the given [`ProcId`], if
16911679
/// the proc is known to this manager.
16921680
///
1693-
/// This querprocies the live [`BootstrapProcHandle`] stored in the
1681+
/// This queries the live [`BootstrapProcHandle`] stored in the
16941682
/// manager's internal map. It provides an immediate snapshot of
16951683
/// lifecycle state (`Starting`, `Running`, `Stopping`, `Stopped`,
16961684
/// etc.).

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 22 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ pub(crate) fn update_event_actor_id(mut event: ActorSupervisionEvent) -> ActorSu
222222
resource::StopAll { cast = true },
223223
resource::GetState<ActorState> { cast = true },
224224
resource::GetRankStatus { cast = true },
225-
resource::GetAllRankStatus { cast = true },
226225
]
227226
)]
228227
pub struct ProcMeshAgent {
@@ -599,6 +598,10 @@ impl Handler<resource::Stop> for ProcMeshAgent {
599598
}
600599
}
601600

601+
/// Handles `StopAll` by coordinating an orderly stop of child actors and then
602+
/// exiting the process. This handler never returns to the caller: it calls
603+
/// `std::process::exit(0/1)` after shutdown. Any sender must *not* expect a
604+
/// reply or send any further message, and should watch `ProcStatus` instead.
602605
#[async_trait]
603606
impl Handler<resource::StopAll> for ProcMeshAgent {
604607
async fn handle(
@@ -610,19 +613,26 @@ impl Handler<resource::StopAll> for ProcMeshAgent {
610613
// By passing in the self context, destroy_and_wait will stop this agent
611614
// last, after all others are stopped.
612615
let stop_result = self.destroy_and_wait_except_current(cx, timeout).await;
616+
// Exit here to cleanup all remaining resources held by the process.
617+
// This means ProcMeshAgent will never run cleanup or any other code
618+
// from exiting its root actor. Senders of this message should never
619+
// send any further messages or expect a reply.
613620
match stop_result {
614-
Ok(_) => {
615-
for (_, actor_state) in self.actor_states.iter_mut() {
616-
// Mark all actors as stopped.
617-
actor_state.stopped = true;
618-
}
619-
Ok(())
621+
Ok((stopped_actors, aborted_actors)) => {
622+
// No need to clean up any state, the process is exiting.
623+
tracing::info!(
624+
actor = %cx.self_id(),
625+
"exiting process after receiving StopAll message on ProcMeshAgent. \
626+
stopped actors = {:?}, aborted actors = {:?}",
627+
stopped_actors.into_iter().map(|a| a.to_string()).collect::<Vec<_>>(),
628+
aborted_actors.into_iter().map(|a| a.to_string()).collect::<Vec<_>>(),
629+
);
630+
std::process::exit(0);
631+
}
632+
Err(e) => {
633+
tracing::error!(actor = %cx.self_id(), "failed to stop all actors on ProcMeshAgent: {:?}", e);
634+
std::process::exit(1);
620635
}
621-
Err(e) => Err(anyhow::anyhow!(
622-
"failed to StopAll on {}: {:?}",
623-
cx.self_id(),
624-
e
625-
)),
626636
}
627637
}
628638
}
@@ -696,69 +706,6 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
696706
}
697707
}
698708

699-
#[async_trait]
700-
impl Handler<resource::GetAllRankStatus> for ProcMeshAgent {
701-
async fn handle(
702-
&mut self,
703-
cx: &Context<Self>,
704-
get_rank_status: resource::GetAllRankStatus,
705-
) -> anyhow::Result<()> {
706-
use crate::resource::Status;
707-
708-
let mut ranks = Vec::new();
709-
for (_name, state) in self.actor_states.iter() {
710-
match state {
711-
ActorInstanceState {
712-
spawn: Ok(actor_id),
713-
create_rank,
714-
stopped,
715-
} => {
716-
if *stopped {
717-
ranks.push((*create_rank, resource::Status::Stopped));
718-
} else {
719-
let supervision_events = self
720-
.supervision_events
721-
.get(actor_id)
722-
.map_or_else(Vec::new, |a| a.clone());
723-
ranks.push((
724-
*create_rank,
725-
if supervision_events.is_empty() {
726-
resource::Status::Running
727-
} else {
728-
resource::Status::Failed(format!(
729-
"because of supervision events: {:?}",
730-
supervision_events
731-
))
732-
},
733-
));
734-
}
735-
}
736-
ActorInstanceState {
737-
spawn: Err(e),
738-
create_rank,
739-
..
740-
} => {
741-
ranks.push((*create_rank, Status::Failed(e.to_string())));
742-
}
743-
}
744-
}
745-
746-
let result = get_rank_status.reply.send(cx, ranks);
747-
// Ignore errors, because returning Err from here would cause the ProcMeshAgent
748-
// to be stopped, which would prevent querying and spawning other actors.
749-
// This only means some actor that requested the state of an actor failed to receive it.
750-
if let Err(e) = result {
751-
tracing::warn!(
752-
actor = %cx.self_id(),
753-
"failed to send GetRankStatus reply to {} due to error: {}",
754-
get_rank_status.reply.port_id().actor_id(),
755-
e
756-
);
757-
}
758-
Ok(())
759-
}
760-
}
761-
762709
#[async_trait]
763710
impl Handler<resource::GetState<ActorState>> for ProcMeshAgent {
764711
async fn handle(

hyperactor_mesh/src/resource.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -217,26 +217,6 @@ impl GetRankStatus {
217217
}
218218
}
219219

220-
/// Get the status of all resources across the mesh.
221-
#[derive(
222-
Clone,
223-
Debug,
224-
Serialize,
225-
Deserialize,
226-
Named,
227-
Handler,
228-
HandleClient,
229-
RefClient,
230-
Bind,
231-
Unbind
232-
)]
233-
pub struct GetAllRankStatus {
234-
/// Returns the status and rank of all resources.
235-
/// TODO: migrate to a ValueOverlay.
236-
#[binding(include)]
237-
pub reply: PortRef<Vec<(usize, Status)>>,
238-
}
239-
240220
/// The state of a resource.
241221
#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)]
242222
pub struct State<S> {

0 commit comments

Comments
 (0)