Skip to content

Commit d65d8db

Browse files
committed
fix(pb): stop actors from destroying on goingaway
1 parent 815c918 commit d65d8db

File tree

1 file changed

+7
-8
lines changed
  • engine/packages/pegboard/src/workflows/actor

1 file changed

+7
-8
lines changed

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -569,10 +569,10 @@ async fn handle_stopped(
569569
tracing::debug!(?variant, "actor stopped");
570570

571571
let force_reschedule = match &variant {
572-
// Reset retry count on successful exit
573572
StoppedVariant::Normal {
574573
code: protocol::StopCode::Ok,
575574
} => {
575+
// Reset retry count on successful exit
576576
state.reschedule_state = Default::default();
577577

578578
false
@@ -583,7 +583,6 @@ async fn handle_stopped(
583583

584584
// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
585585
state.gc_timeout_ts = None;
586-
state.going_away = false;
587586
state.stopping = false;
588587
state.runner_id = None;
589588
let old_runner_workflow_id = state.runner_workflow_id.take();
@@ -658,16 +657,15 @@ async fn handle_stopped(
658657
}
659658
// Handle rescheduling if not marked as sleeping
660659
else if !state.sleeping {
661-
// Anything besides a StopCode::Ok is considered a failure
662-
let failed = !matches!(
660+
let graceful_exit = !state.going_away && matches!(
663661
variant,
664662
StoppedVariant::Normal {
665663
code: protocol::StopCode::Ok
666664
}
667-
);
665+
));
668666

669-
match (input.crash_policy, failed) {
670-
(CrashPolicy::Restart, true) => {
667+
match (input.crash_policy, graceful_exit) {
668+
(CrashPolicy::Restart, false) => {
671669
match runtime::reschedule_actor(ctx, &input, state, false).await? {
672670
runtime::SpawnActorOutput::Allocated { .. } => {}
673671
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
@@ -678,7 +676,7 @@ async fn handle_stopped(
678676
}
679677
}
680678
}
681-
(CrashPolicy::Sleep, true) => {
679+
(CrashPolicy::Sleep, false) => {
682680
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to crash");
683681

684682
state.sleeping = true;
@@ -711,6 +709,7 @@ async fn handle_stopped(
711709

712710
state.wake_for_alarm = false;
713711
state.will_wake = false;
712+
state.going_away = false;
714713

715714
ctx.msg(Stopped {})
716715
.tag("actor_id", input.actor_id)

0 commit comments

Comments
 (0)