Skip to content

Commit 0e81599

Browse files
authored
add explicit WorkerShutdown input (#1055)
1 parent 9a87ebf commit 0e81599

File tree

3 files changed

+20
-5
lines changed

3 files changed

+20
-5
lines changed

crates/sdk-core/src/worker/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,10 @@ impl WorkerTrait for Worker {
266266
.unregister_worker(self.worker_instance_key);
267267
}
268268

269+
// Push a Worker Shutdown message to the workflow activation queue. This ensures that
270+
// any pending workflow activation polls will resolve, even if there are no other inputs.
271+
self.workflows.send_worker_shutdown();
272+
269273
// Second, we want to stop polling of both activity and workflow tasks
270274
if let Some(atm) = self.at_task_mgr.as_ref() {
271275
atm.initiate_shutdown();
@@ -274,13 +278,12 @@ impl WorkerTrait for Worker {
274278
// activity poll in case this worker is an activity-only worker.
275279
self.local_act_mgr.shutdown_initiated();
276280

281+
// If workflows have never been polled, immediately tell the local activity manager
282+
// that workflows have shut down, so it can proceed with shutdown without waiting.
283+
// This is particularly important for activity-only workers.
277284
if !self.workflows.ever_polled() {
278285
self.local_act_mgr.workflows_have_shutdown();
279286
}
280-
// Bump the workflow stream with a pointless input, since if a client initiates shutdown
281-
// and then immediately blocks waiting on a workflow activation poll, it's possible that
282-
// there may not be any more inputs ever, and that poll will never resolve.
283-
self.workflows.send_get_state_info_msg();
284287
}
285288

286289
async fn shutdown(&self) {

crates/sdk-core/src/worker/workflow/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,12 @@ impl Workflows {
626626
rx
627627
}
628628

629+
/// Send a `WorkerShutdown` message to the workflow stream. This ensures that any pending
630+
/// workflow activation polls will resolve during shutdown, even if there are no other inputs.
631+
pub(super) fn send_worker_shutdown(&self) {
632+
self.send_local(LocalInputs::WorkerShutdown);
633+
}
634+
629635
/// Query the state of workflow management. Can return `None` if workflow state is shut down.
630636
pub(super) fn get_state_info(&self) -> impl Future<Output = Option<WorkflowStateInfo>> {
631637
let rx = self.send_get_state_info_msg();

crates/sdk-core/src/worker/workflow/workflow_stream.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ impl WFStream {
147147
});
148148
None
149149
}
150+
LocalInputs::WorkerShutdown => {
151+
// Worker shutdown message is used to bump the stream to ensure
152+
// pending workflow activation polls resolve during shutdown
153+
None
154+
}
150155
}
151156
}
152157
WFStreamInput::FailedFetch {
@@ -633,6 +638,7 @@ pub(super) enum LocalInputs {
633638
RequestEviction(RequestEvictMsg),
634639
HeartbeatTimeout(String),
635640
GetStateInfo(GetStateInfoMsg),
641+
WorkerShutdown,
636642
}
637643
impl LocalInputs {
638644
fn run_id(&self) -> Option<&str> {
@@ -643,7 +649,7 @@ impl LocalInputs {
643649
LocalInputs::PostActivation(pa) => &pa.run_id,
644650
LocalInputs::RequestEviction(re) => &re.run_id,
645651
LocalInputs::HeartbeatTimeout(hb) => hb,
646-
LocalInputs::GetStateInfo(_) => return None,
652+
LocalInputs::GetStateInfo(_) | LocalInputs::WorkerShutdown => return None,
647653
})
648654
}
649655
}

0 commit comments

Comments
 (0)