Skip to content

Commit 850db67

Browse files
authored
Rename WorkerShutdown to BumpStream, also handle shutdown() case (#1056)
* Rename WorkerShutdown to BumpStream, also handle shutdown() case * Update comment
1 parent 0e81599 commit 850db67

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

crates/sdk-core/src/worker/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,9 @@ impl WorkerTrait for Worker {
266266
.unregister_worker(self.worker_instance_key);
267267
}
268268

269-
// Push a Worker Shutdown message to the workflow activation queue. This ensures that
269+
// Push a BumpStream message to the workflow activation queue. This ensures that
270270
// any pending workflow activation polls will resolve, even if there are no other inputs.
271-
self.workflows.send_worker_shutdown();
271+
self.workflows.bump_stream();
272272

273273
// Second, we want to stop polling of both activity and workflow tasks
274274
if let Some(atm) = self.at_task_mgr.as_ref() {

crates/sdk-core/src/worker/workflow/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -626,10 +626,10 @@ impl Workflows {
626626
rx
627627
}
628628

629-
/// Send a `WorkerShutdown` message to the workflow stream. This ensures that any pending
630-
/// workflow activation polls will resolve during shutdown, even if there are no other inputs.
631-
pub(super) fn send_worker_shutdown(&self) {
632-
self.send_local(LocalInputs::WorkerShutdown);
629+
/// Send a `BumpStream` message to the workflow stream. This ensures that any pending
630+
/// workflow activation polls will resolve, like during shutdown, even if there are no other inputs.
631+
pub(super) fn bump_stream(&self) {
632+
self.send_local(LocalInputs::BumpStream);
633633
}
634634

635635
/// Query the state of workflow management. Can return `None` if workflow state is shut down.
@@ -654,7 +654,7 @@ impl Workflows {
654654
let mut interval = tokio::time::interval(Duration::from_millis(10));
655655
loop {
656656
interval.tick().await;
657-
let _ = self.get_state_info().await;
657+
self.bump_stream();
658658
}
659659
});
660660
let (_, jh_res) = tokio::join!(

crates/sdk-core/src/worker/workflow/workflow_stream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ impl WFStream {
147147
});
148148
None
149149
}
150-
LocalInputs::WorkerShutdown => {
151-
// Worker shutdown message is used to bump the stream to ensure
152-
// pending workflow activation polls resolve during shutdown
150+
LocalInputs::BumpStream => {
151+
// Ensures pending workflow activation polls resolve for
152+
// scenarios like during shutdown
153153
None
154154
}
155155
}
@@ -638,7 +638,7 @@ pub(super) enum LocalInputs {
638638
RequestEviction(RequestEvictMsg),
639639
HeartbeatTimeout(String),
640640
GetStateInfo(GetStateInfoMsg),
641-
WorkerShutdown,
641+
BumpStream,
642642
}
643643
impl LocalInputs {
644644
fn run_id(&self) -> Option<&str> {
@@ -649,7 +649,7 @@ impl LocalInputs {
649649
LocalInputs::PostActivation(pa) => &pa.run_id,
650650
LocalInputs::RequestEviction(re) => &re.run_id,
651651
LocalInputs::HeartbeatTimeout(hb) => hb,
652-
LocalInputs::GetStateInfo(_) | LocalInputs::WorkerShutdown => return None,
652+
LocalInputs::GetStateInfo(_) | LocalInputs::BumpStream => return None,
653653
})
654654
}
655655
}

0 commit comments

Comments
 (0)