Skip to content

Commit 9073454

Browse files
moonlifacebook-github-bot
authored andcommitted
include user mesh name and endpoint name in supervision error
Summary: The current error message logs the actor name and stacktrace when an exception happens (ActorError/SupervisionError). The endpoint name is included in the stacktrace. But for cases like proc crash, the supervision error doesn't have a stacktrace, so it couldn't show the endpoint name. This diff includes the user actor name and endpoint name to the exceptions ActorError and SupervisionError. github issue: #1899 Differential Revision: D87353113
1 parent 4d2e334 commit 9073454

File tree

5 files changed

+144
-28
lines changed

5 files changed

+144
-28
lines changed

monarch_hyperactor/src/supervision.rs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,52 @@ use hyperactor::Bind;
1010
use hyperactor::Named;
1111
use hyperactor::Unbind;
1212
use hyperactor::supervision::ActorSupervisionEvent;
13-
use pyo3::create_exception;
1413
use pyo3::exceptions::PyRuntimeError;
1514
use pyo3::prelude::*;
1615
use serde::Deserialize;
1716
use serde::Serialize;
1817

19-
create_exception!(
20-
monarch._rust_bindings.monarch_hyperactor.supervision,
21-
SupervisionError,
22-
PyRuntimeError
23-
);
18+
#[pyclass(
19+
name = "SupervisionError",
20+
module = "monarch._rust_bindings.monarch_hyperactor.supervision",
21+
extends = PyRuntimeError
22+
)]
23+
#[derive(Clone, Debug)]
24+
pub struct SupervisionError {
25+
#[pyo3(set)]
26+
pub endpoint: Option<String>,
27+
pub message: String,
28+
}
29+
30+
#[pymethods]
31+
impl SupervisionError {
32+
#[new]
33+
#[pyo3(signature = (message, endpoint=None))]
34+
fn new(message: String, endpoint: Option<String>) -> Self {
35+
SupervisionError { endpoint, message }
36+
}
37+
38+
#[staticmethod]
39+
pub fn new_err(message: String) -> PyErr {
40+
PyRuntimeError::new_err(message)
41+
}
42+
43+
fn __str__(&self) -> String {
44+
if let Some(ep) = &self.endpoint {
45+
format!("Endpoint call {} failed, {}", ep, self.message)
46+
} else {
47+
self.message.clone()
48+
}
49+
}
50+
51+
fn __repr__(&self) -> String {
52+
if let Some(ep) = &self.endpoint {
53+
format!("SupervisionError(endpoint='{}', '{}')", ep, self.message)
54+
} else {
55+
format!("SupervisionError('{}')", self.message)
56+
}
57+
}
58+
}
2459

2560
#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Bind, Unbind)]
2661
pub struct SupervisionFailureMessage {

monarch_hyperactor/src/v1/actor_mesh.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -711,12 +711,12 @@ impl ActorMeshProtocol for PythonActorMeshImpl {
711711
.unwrap_or_else(|e| e.into_inner())
712712
{
713713
Unhealthy::StreamClosed => {
714-
return Err(SupervisionError::new_err(
714+
return Err(PyErr::new::<SupervisionError, _>(
715715
"actor mesh is stopped due to proc mesh shutdown".to_string(),
716716
));
717717
}
718718
Unhealthy::Crashed(event) => {
719-
return Err(SupervisionError::new_err(format!(
719+
return Err(PyErr::new::<SupervisionError, _>(format!(
720720
"Actor {} is unhealthy with reason: {}",
721721
event.actor_id, event.actor_status
722722
)));
@@ -730,7 +730,7 @@ impl ActorMeshProtocol for PythonActorMeshImpl {
730730
.get(&rank)
731731
.map(|entry| entry.value().clone())
732732
}) {
733-
return Err(SupervisionError::new_err(format!(
733+
return Err(PyErr::new::<SupervisionError, _>(format!(
734734
"Actor {} is unhealthy with reason: {}",
735735
event.actor_id, event.actor_status
736736
)));

python/monarch/_rust_bindings/monarch_hyperactor/supervision.pyi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66

77
# pyre-unsafe
88

9-
from typing import final
9+
from typing import final, Optional
1010

1111
@final
1212
class SupervisionError(RuntimeError):
1313
"""
1414
Custom exception for supervision-related errors in monarch_hyperactor.
1515
"""
1616

17-
...
17+
endpoint: str | None # Settable attribute
1818

1919
# TODO: Make this an exception subclass
2020
@final

python/monarch/_src/actor/actor_mesh.py

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
Region,
7373
Shape,
7474
)
75+
from monarch._rust_bindings.monarch_hyperactor.supervision import SupervisionError
7576
from monarch._rust_bindings.monarch_hyperactor.v1.logging import log_endpoint_exception
7677
from monarch._rust_bindings.monarch_hyperactor.value_mesh import (
7778
ValueMesh as HyValueMesh,
@@ -488,6 +489,7 @@ class ActorEndpoint(Endpoint[P, R]):
488489
def __init__(
489490
self,
490491
actor_mesh: "ActorMeshProtocol",
492+
mesh_name: str,
491493
shape: Shape,
492494
proc_mesh: "Optional[ProcMesh]",
493495
name: MethodSpecifier,
@@ -497,6 +499,7 @@ def __init__(
497499
) -> None:
498500
super().__init__(propagator)
499501
self._actor_mesh = actor_mesh
502+
self._mesh_name = mesh_name
500503
self._name = name
501504
self._shape = shape
502505
self._proc_mesh = proc_mesh
@@ -541,13 +544,25 @@ def _send(
541544
shape = self._shape
542545
return Extent(shape.labels, shape.ndslice.sizes)
543546

547+
def _full_name(self) -> str:
548+
method_name = "unknown"
549+
match self._name:
550+
case MethodSpecifier.Init():
551+
method_name = "__init__"
552+
case MethodSpecifier.ReturnsResponse(name=method_name):
553+
pass
554+
case MethodSpecifier.ExplicitPort(name=method_name):
555+
pass
556+
return f"{self._mesh_name}.{method_name}()"
557+
544558
def _port(self, once: bool = False) -> "Tuple[Port[R], PortReceiver[R]]":
545559
p, r = super()._port(once=once)
546560
instance = context().actor_instance._as_rust()
547561
monitor: Optional[Shared[Exception]] = self._actor_mesh.supervision_event(
548562
instance
549563
)
550-
r._set_monitor(monitor)
564+
565+
r._attach_supervision(monitor, self._full_name())
551566
return (p, r)
552567

553568
def _rref(self, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> R:
@@ -882,19 +897,33 @@ def __init__(
882897
mailbox: Mailbox,
883898
receiver: "PortReceiverBase",
884899
monitor: "Optional[Shared[Exception]]" = None,
900+
endpoint: Optional[str] = None,
885901
) -> None:
886902
self._mailbox: Mailbox = mailbox
887903
self._monitor = monitor
888904
self._receiver = receiver
905+
self._endpoint = endpoint
906+
907+
def _tag_supervision_error(self, error: Exception) -> None:
908+
"""Tag supervision error with endpoint name if available."""
909+
if self._endpoint is not None and isinstance(error, SupervisionError):
910+
error.endpoint = self._endpoint
889911

890912
async def _recv(self) -> R:
891913
awaitable = self._receiver.recv_task()
892914
if self._monitor is None:
893915
result = await awaitable
894916
else:
895-
# type: ignore
896-
result, i = await PythonTask.select_one([self._monitor.task(), awaitable])
917+
try:
918+
result, i = await PythonTask.select_one(
919+
# type: ignore
920+
[self._monitor.task(), awaitable]
921+
)
922+
except Exception as e:
923+
self._tag_supervision_error(e)
924+
raise e
897925
if i == 0:
926+
self._tag_supervision_error(result)
898927
raise result
899928
return self._process(result)
900929

@@ -905,18 +934,35 @@ def _process(self, msg: PythonMessage) -> R:
905934
case PythonMessageKind.Result():
906935
return payload
907936
case PythonMessageKind.Exception():
908-
raise cast(Exception, payload)
937+
e = cast(Exception, payload)
938+
self._tag_supervision_error(e)
939+
raise e
909940
case _:
910941
raise ValueError(f"Unexpected message kind: {msg.kind}")
911942

912943
def recv(self) -> "Future[R]":
913944
return Future(coro=self._recv())
914945

915946
def ranked(self) -> "RankedPortReceiver[R]":
916-
return RankedPortReceiver[R](self._mailbox, self._receiver, self._monitor)
947+
return RankedPortReceiver[R](
948+
self._mailbox, self._receiver, self._monitor, self._endpoint
949+
)
950+
951+
def _attach_supervision(
952+
self, monitor: "Optional[Shared[Exception]]", endpoint: str
953+
) -> None:
954+
"""
955+
Attach supervision monitoring to this port receiver.
956+
957+
Enables the receiver to detect and report errors on any supervision events.
917958
918-
def _set_monitor(self, monitor: "Optional[Shared[Exception]]") -> None:
959+
Args:
960+
monitor: Shared exception monitor that signals supervision errors
961+
from the actor mesh. None if supervision is not enabled.
962+
endpoint: Full endpoint name
963+
"""
919964
self._monitor = monitor
965+
self._endpoint = endpoint
920966

921967

922968
class RankedPortReceiver(PortReceiver[Tuple[int, R]]):
@@ -973,8 +1019,13 @@ async def handle(
9731019
) -> None:
9741020
method_name = None
9751021
MESSAGES_HANDLED.add(1)
1022+
1023+
# Initialize method_name before try block so it's always defined
1024+
method_name = "unknown"
1025+
9761026
# response_port can be None. If so, then sending to port will drop the response,
9771027
# and raise any exceptions to the caller.
1028+
9781029
try:
9791030
_set_context(ctx)
9801031

@@ -984,6 +1035,7 @@ async def handle(
9841035

9851036
match method:
9861037
case MethodSpecifier.Init():
1038+
method_name = "__init__"
9871039
ins = ctx.actor_instance
9881040
(
9891041
Class,
@@ -1000,7 +1052,7 @@ async def handle(
10001052
self._maybe_exit_debugger()
10011053
except Exception as e:
10021054
self._saved_error = ActorError(
1003-
e, f"Remote actor {Class}.__init__ call failed."
1055+
e, f"Actor call {ins.name}.{method_name} failed."
10041056
)
10051057
raise
10061058
response_port.send(None)
@@ -1024,7 +1076,7 @@ async def handle(
10241076
# message delivery mechanism, or the framework accidentally
10251077
# mixed the usage of cast and direct send.
10261078

1027-
error_message = f"Actor object is missing when executing method {method_name} on actor {ctx.actor_instance.actor_id}."
1079+
error_message = f'Actor object is missing when executing method "{method_name}" on actor {ctx.actor_instance.actor_id}.'
10281080
if self._saved_error is not None:
10291081
error_message += (
10301082
f" This is likely due to an earlier error: {self._saved_error}"
@@ -1064,14 +1116,24 @@ async def handle(
10641116
except Exception as e:
10651117
log_endpoint_exception(e, method_name)
10661118
self._post_mortem_debug(e.__traceback__)
1067-
response_port.exception(ActorError(e))
1119+
response_port.exception(
1120+
ActorError(
1121+
e,
1122+
f"Actor call {ctx.actor_instance.name}.{method_name} failed.",
1123+
)
1124+
)
10681125
except BaseException as e:
10691126
self._post_mortem_debug(e.__traceback__)
10701127
# A BaseException can be thrown in the case of a Rust panic.
10711128
# In this case, we need a way to signal the panic to the Rust side.
10721129
# See [Panics in async endpoints]
10731130
try:
1074-
panic_flag.signal_panic(e)
1131+
panic_flag.signal_panic(
1132+
ActorError(
1133+
e,
1134+
f"Actor call {ctx.actor_instance.name}.{method_name} failed with BaseException.",
1135+
)
1136+
)
10751137
except Exception:
10761138
# The channel might be closed if the Rust side has already detected the error
10771139
pass
@@ -1245,11 +1307,15 @@ class ActorMesh(MeshTrait, Generic[T]):
12451307
def __init__(
12461308
self,
12471309
Class: Type[T],
1310+
name: str,
12481311
inner: "ActorMeshProtocol",
12491312
shape: Shape,
12501313
proc_mesh: "Optional[ProcMesh]",
12511314
) -> None:
1315+
# Class name of the actor.
12521316
self.__name__: str = Class.__name__
1317+
# The name user gives when spawning the mesh
1318+
self._mesh_name = name
12531319
self._class: Type[T] = Class
12541320
self._inner: "ActorMeshProtocol" = inner
12551321
self._shape = shape
@@ -1318,6 +1384,7 @@ def _endpoint(
13181384
) -> Any:
13191385
return ActorEndpoint(
13201386
self._inner,
1387+
self._mesh_name,
13211388
self._shape,
13221389
self._proc_mesh,
13231390
name,
@@ -1340,7 +1407,7 @@ def _create(
13401407
*args: Any,
13411408
**kwargs: Any,
13421409
) -> "ActorMesh[T]":
1343-
mesh = cls(Class, actor_mesh, shape, proc_mesh)
1410+
mesh = cls(Class, name, actor_mesh, shape, proc_mesh)
13441411

13451412
# We don't start the supervision polling loop until the first call to
13461413
# supervision_event, which needs an Instance. Initialize here so events
@@ -1383,12 +1450,18 @@ def from_actor_id(
13831450
Class: Type[T],
13841451
actor_id: ActorId,
13851452
) -> "ActorMesh[T]":
1386-
return cls(Class, _SingletonActorAdapator(actor_id), singleton_shape, None)
1453+
return cls(Class, "", _SingletonActorAdapator(actor_id), singleton_shape, None)
13871454

13881455
def __reduce_ex__(
13891456
self, protocol: Any
13901457
) -> "Tuple[Type[ActorMesh[T]], Tuple[Any, ...]]":
1391-
return ActorMesh, (self._class, self._inner, self._shape, self._proc_mesh)
1458+
return ActorMesh, (
1459+
self._class,
1460+
self._mesh_name,
1461+
self._inner,
1462+
self._shape,
1463+
self._proc_mesh,
1464+
)
13921465

13931466
@property
13941467
def _ndslice(self) -> NDSlice:
@@ -1400,7 +1473,7 @@ def _labels(self) -> Iterable[str]:
14001473

14011474
def _new_with_shape(self, shape: Shape) -> "ActorMesh[T]":
14021475
sliced = self._inner.new_with_region(shape.region)
1403-
return ActorMesh(self._class, sliced, shape, self._proc_mesh)
1476+
return ActorMesh(self._class, self._mesh_name, sliced, shape, self._proc_mesh)
14041477

14051478
def __repr__(self) -> str:
14061479
return f"ActorMesh(class={self._class}, shape={self._shape}), inner={type(self._inner)})"
@@ -1423,7 +1496,7 @@ class ActorError(Exception):
14231496

14241497
def __init__(
14251498
self,
1426-
exception: Exception,
1499+
exception: BaseException,
14271500
message: str = "A remote actor call has failed.",
14281501
) -> None:
14291502
self.exception = exception

python/tests/test_actor_error.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -725,18 +725,26 @@ async def test_supervision_with_sending_error() -> None:
725725

726726
# The host mesh agent sends or the proc mesh agent sends might break.
727727
# Either case is an error that tells us that the send failed.
728-
error_msg = (
729-
".*Actor .* (is unhealthy with reason|exited because of the following reason)|"
728+
error_msg_regx = (
729+
"Actor .* (is unhealthy with reason|exited because of the following reason)|"
730730
"actor mesh is stopped due to proc mesh shutdown"
731731
)
732732

733733
# send a large payload to trigger send timeout error
734+
error_msg = (
735+
r"Endpoint call healthy\.check_with_payload\(\) failed, " + error_msg_regx
736+
)
734737
with pytest.raises(SupervisionError, match=error_msg):
735738
await actor_mesh.check_with_payload.call(payload="a" * 55000000)
736739

737740
# new call should fail with check of health state of actor mesh
741+
error_msg = r"Endpoint call healthy\.check\(\) failed, " + error_msg_regx
738742
with pytest.raises(SupervisionError, match=error_msg):
739743
await actor_mesh.check.call()
744+
745+
error_msg = (
746+
r"Endpoint call healthy\.check_with_payload\(\) failed, " + error_msg_regx
747+
)
740748
with pytest.raises(SupervisionError, match=error_msg):
741749
await actor_mesh.check_with_payload.call(payload="a")
742750

0 commit comments

Comments
 (0)