diff --git a/crates/common/src/errors.rs b/crates/common/src/errors.rs index d2fb69739..b505c9345 100644 --- a/crates/common/src/errors.rs +++ b/crates/common/src/errors.rs @@ -40,10 +40,14 @@ pub enum CompleteWfError { /// The run associated with the completion run_id: String, }, + /// Workflows have not been enabled on this worker. + #[error("Workflows are not enabled on this worker")] + WorkflowNotEnabled, } /// Errors thrown by [crate::Worker::complete_activity_task] #[derive(thiserror::Error, Debug)] +#[allow(clippy::large_enum_variant)] pub enum CompleteActivityError { /// Lang SDK sent us a malformed activity completion. This likely means a bug in the lang sdk. #[error("Lang SDK sent us a malformed activity completion ({reason}): {completion:?}")] @@ -53,6 +57,9 @@ pub enum CompleteActivityError { /// The completion, which may not be included to avoid unnecessary copies. completion: Option, }, + /// Activities have not been enabled on this worker. + #[error("Activities are not enabled on this worker")] + ActivityNotEnabled, } /// Errors thrown by [crate::Worker::complete_nexus_task] diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 5ec5c2068..d44da2924 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -129,8 +129,8 @@ pub trait Worker: Send + Sync { /// [Worker::complete_workflow_activation] and [Worker::complete_activity_task] for those /// workflows & activities until they are done. At that point, the lang SDK can end the process, /// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the - /// connection and free resources. If you have set [WorkerConfig::no_remote_activities], you may - /// skip calling [Worker::poll_activity_task]. + /// connection and free resources. If you have set [WorkerConfig::task_types] to exclude + /// [WorkerTaskTypes::activity_only()], you may skip calling [Worker::poll_activity_task]. /// /// Lang implementations should use [Worker::initiate_shutdown] followed by /// [Worker::finalize_shutdown]. diff --git a/crates/common/src/worker.rs b/crates/common/src/worker.rs index 8af1c7c99..df81f6a2a 100644 --- a/crates/common/src/worker.rs +++ b/crates/common/src/worker.rs @@ -16,6 +16,59 @@ use std::{ time::Duration, }; +/// Specifies which task types a worker will poll for. +/// +/// Workers can be configured to handle any combination of workflows, activities, and nexus operations. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct WorkerTaskTypes { + pub enable_workflows: bool, + pub enable_activities: bool, + pub enable_nexus: bool, +} + +impl WorkerTaskTypes { + /// Check if no task types are enabled + pub fn is_empty(&self) -> bool { + !self.enable_workflows && !self.enable_activities && !self.enable_nexus + } + + /// Create a config with all task types enabled + pub fn all() -> WorkerTaskTypes { + WorkerTaskTypes { + enable_workflows: true, + enable_activities: true, + enable_nexus: true, + } + } + + /// Create a config with only workflow tasks enabled + pub fn workflow_only() -> WorkerTaskTypes { + WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: false, + } + } + + /// Create a config with only activity tasks enabled + pub fn activity_only() -> WorkerTaskTypes { + WorkerTaskTypes { + enable_workflows: false, + enable_activities: true, + enable_nexus: false, + } + } + + /// Create a config with only nexus tasks enabled + pub fn nexus_only() -> WorkerTaskTypes { + WorkerTaskTypes { + enable_workflows: false, + enable_activities: false, + enable_nexus: true, + } + } +} + /// Defines per-worker configuration options #[derive(Clone, derive_builder::Builder)] #[builder(setter(into), build_fn(validate = "Self::validate"))] @@ -64,10 +117,10 @@ pub struct WorkerConfig { /// worker's task queue #[builder(default = "PollerBehavior::SimpleMaximum(5)")] pub nexus_task_poller_behavior: PollerBehavior, - /// If set to true this worker will only handle workflow tasks and local activities, it will not - /// poll for activity tasks. - #[builder(default = "false")] - pub no_remote_activities: bool, + /// Specifies which task types this worker will poll for. + /// + /// Note: At least one task type must be specified or the worker will fail validation. + pub task_types: WorkerTaskTypes, /// How long a workflow task is allowed to sit on the sticky queue before it is timed out /// and moved to the non-sticky queue where it may be picked up by any worker. #[builder(default = "Duration::from_secs(10)")] @@ -218,6 +271,15 @@ impl WorkerConfigBuilder { } fn validate(&self) -> Result<(), String> { + let task_types = self + .task_types + .as_ref() + .cloned() + .unwrap_or_else(WorkerTaskTypes::all); + if task_types.is_empty() { + return Err("At least one task type must be enabled in `task_types`".to_owned()); + } + if let Some(b) = self.workflow_task_poller_behavior.as_ref() { b.validate()? } @@ -249,6 +311,17 @@ impl WorkerConfigBuilder { return Err("`max_outstanding_nexus_tasks` must be > 0".to_owned()); } + // Validate workflow cache is consistent with task_types + if !task_types.enable_workflows + && let Some(cache) = self.max_cached_workflows.as_ref() + && *cache > 0 + { + return Err( + "Cannot have `max_cached_workflows` > 0 when workflows are not enabled in `task_types`" + .to_owned(), + ); + } + if let Some(cache) = self.max_cached_workflows.as_ref() && *cache > 0 { diff --git a/crates/common/tests/worker_task_types_test.rs b/crates/common/tests/worker_task_types_test.rs new file mode 100644 index 000000000..8495a52f0 --- /dev/null +++ b/crates/common/tests/worker_task_types_test.rs @@ -0,0 +1,121 @@ +use temporalio_common::worker::{WorkerConfigBuilder, WorkerTaskTypes, WorkerVersioningStrategy}; + +fn default_versioning_strategy() -> WorkerVersioningStrategy { + WorkerVersioningStrategy::None { + build_id: String::new(), + } +} + +#[test] +fn test_default_configuration_polls_all_types() { + let config = WorkerConfigBuilder::default() + .namespace("default") + .task_queue("test-queue") + .versioning_strategy(default_versioning_strategy()) + .task_types(WorkerTaskTypes::all()) + .build() + .expect("Failed to build default config"); + + let effective = &config.task_types; + assert!( + effective.enable_workflows, + "Should poll workflows by default" + ); + assert!( + effective.enable_activities, + "Should poll activities by default" + ); + assert!(effective.enable_nexus, "Should poll nexus by default"); +} + +#[test] +fn test_empty_task_types_fails_validation() { + let result = WorkerConfigBuilder::default() + .namespace("default") + .task_queue("test-queue") + .versioning_strategy(default_versioning_strategy()) + .task_types(WorkerTaskTypes { + enable_workflows: false, + enable_activities: false, + enable_nexus: false, + }) + .build(); + + assert!(result.is_err(), "Empty task_types should fail validation"); + let err = result.err().unwrap().to_string(); + assert!( + err.contains("At least one task type"), + "Error should mention task types: {err}", + ); +} + +#[test] +fn test_workflow_cache_without_workflows_fails() { + let result = WorkerConfigBuilder::default() + .namespace("default") + .task_queue("test-queue") + .versioning_strategy(default_versioning_strategy()) + .task_types(WorkerTaskTypes::activity_only()) + .max_cached_workflows(10usize) + .build(); + + assert!( + result.is_err(), + "Workflow cache > 0 without workflows should fail" + ); + let err = result.err().unwrap().to_string(); + assert!( + err.contains("max_cached_workflows"), + "Error should mention max_cached_workflows: {err}", + ); +} + +#[test] +fn test_all_combinations() { + let combinations = [ + (WorkerTaskTypes::workflow_only(), "workflows only"), + (WorkerTaskTypes::activity_only(), "activities only"), + (WorkerTaskTypes::nexus_only(), "nexus only"), + ( + WorkerTaskTypes { + enable_workflows: true, + enable_activities: true, + enable_nexus: false, + }, + "workflows + activities", + ), + ( + WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: true, + }, + "workflows + nexus", + ), + ( + WorkerTaskTypes { + enable_workflows: false, + enable_activities: true, + enable_nexus: true, + }, + "activities + nexus", + ), + (WorkerTaskTypes::all(), "all types"), + ]; + + for (task_types, description) in combinations { + let config = WorkerConfigBuilder::default() + .namespace("default") + .task_queue("test-queue") + .versioning_strategy(default_versioning_strategy()) + .task_types(task_types) + .build() + .unwrap_or_else(|e| panic!("Failed to build config for {description}: {e:?}")); + + let effective = config.task_types; + assert_eq!( + effective, task_types, + "Effective types should match for {description}", + ); + } +} diff --git a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h index d17de11a7..050119ee0 100644 --- a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -738,6 +738,12 @@ typedef struct TemporalCoreTunerHolder { struct TemporalCoreSlotSupplier nexus_task_slot_supplier; } TemporalCoreTunerHolder; +typedef struct TemporalCoreWorkerTaskTypes { + bool enable_workflows; + bool enable_activities; + bool enable_nexus; +} TemporalCoreWorkerTaskTypes; + typedef struct TemporalCorePollerBehaviorSimpleMaximum { uintptr_t simple_maximum; } TemporalCorePollerBehaviorSimpleMaximum; @@ -765,7 +771,7 @@ typedef struct TemporalCoreWorkerOptions { struct TemporalCoreByteArrayRef identity_override; uint32_t max_cached_workflows; struct TemporalCoreTunerHolder tuner; - bool no_remote_activities; + struct TemporalCoreWorkerTaskTypes task_types; uint64_t sticky_queue_schedule_to_start_timeout_millis; uint64_t max_heartbeat_throttle_interval_millis; uint64_t default_heartbeat_throttle_interval_millis; diff --git a/crates/sdk-core-c-bridge/src/worker.rs b/crates/sdk-core-c-bridge/src/worker.rs index 542d06d99..022761ca1 100644 --- a/crates/sdk-core-c-bridge/src/worker.rs +++ b/crates/sdk-core-c-bridge/src/worker.rs @@ -43,7 +43,7 @@ pub struct WorkerOptions { pub identity_override: ByteArrayRef, pub max_cached_workflows: u32, pub tuner: TunerHolder, - pub no_remote_activities: bool, + pub task_types: WorkerTaskTypes, pub sticky_queue_schedule_to_start_timeout_millis: u64, pub max_heartbeat_throttle_interval_millis: u64, pub default_heartbeat_throttle_interval_millis: u64, @@ -58,6 +58,23 @@ pub struct WorkerOptions { pub nondeterminism_as_workflow_fail_for_types: ByteArrayRefArray, } +#[repr(C)] +pub struct WorkerTaskTypes { + pub enable_workflows: bool, + pub enable_activities: bool, + pub enable_nexus: bool, +} + +impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes { + fn from(t: &WorkerTaskTypes) -> Self { + Self { + enable_workflows: t.enable_workflows, + enable_activities: t.enable_activities, + enable_nexus: t.enable_nexus, + } + } +} + #[repr(C)] pub struct PollerBehaviorSimpleMaximum { pub simple_maximum: usize, @@ -1183,7 +1200,9 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig { .client_identity_override(opt.identity_override.to_option_string()) .max_cached_workflows(opt.max_cached_workflows as usize) .tuner(Arc::new(converted_tuner)) - .no_remote_activities(opt.no_remote_activities) + .task_types(temporalio_common::worker::WorkerTaskTypes::from( + &opt.task_types, + )) .sticky_queue_schedule_to_start_timeout(Duration::from_millis( opt.sticky_queue_schedule_to_start_timeout_millis, )) diff --git a/crates/sdk-core/src/core_tests/activity_tasks.rs b/crates/sdk-core/src/core_tests/activity_tasks.rs index 56bdd57a0..3e0ad2240 100644 --- a/crates/sdk-core/src/core_tests/activity_tasks.rs +++ b/crates/sdk-core/src/core_tests/activity_tasks.rs @@ -53,7 +53,7 @@ use temporalio_common::{ }, test_utils::start_timer_cmd, }, - worker::PollerBehavior, + worker::{PollerBehavior, WorkerTaskTypes}, }; use tokio::{join, time::sleep}; use tokio_util::sync::CancellationToken; @@ -725,7 +725,7 @@ async fn no_eager_activities_requested_when_worker_options_disable_it( mock.worker_cfg(|wc| { wc.max_cached_workflows = 2; if reason == "no_remote" { - wc.no_remote_activities = true; + wc.task_types = WorkerTaskTypes::workflow_only(); } else { wc.max_task_queue_activities_per_second = Some(1.0); } diff --git a/crates/sdk-core/src/core_tests/workers.rs b/crates/sdk-core/src/core_tests/workers.rs index 7171b963d..b84b6b14f 100644 --- a/crates/sdk-core/src/core_tests/workers.rs +++ b/crates/sdk-core/src/core_tests/workers.rs @@ -1,3 +1,4 @@ +use crate::test_help::QueueResponse; use crate::{ PollError, prost_dur, test_help::{ @@ -14,6 +15,17 @@ use crate::{ }; use futures_util::{stream, stream::StreamExt}; use std::{cell::RefCell, time::Duration}; +use temporalio_common::protos::coresdk::ActivityTaskCompletion; +use temporalio_common::protos::coresdk::activity_result::ActivityExecutionResult; +use temporalio_common::protos::coresdk::nexus::NexusTaskCompletion; +use temporalio_common::protos::temporal::api::nexus::v1::{ + Request as NexusRequest, Response as NexusResponse, StartOperationRequest, + StartOperationResponse, start_operation_response, +}; +use temporalio_common::protos::temporal::api::workflowservice::v1::RespondNexusTaskCompletedResponse; +use temporalio_common::protos::temporal::api::workflowservice::v1::{ + PollActivityTaskQueueResponse, RespondActivityTaskCompletedResponse, +}; use temporalio_common::{ Worker, protos::{ @@ -23,13 +35,14 @@ use temporalio_common::{ workflow_commands::{CompleteWorkflowExecution, StartTimer, workflow_command}, workflow_completion::WorkflowActivationCompletion, }, + temporal::api::common::v1::{ActivityType, WorkflowExecution}, temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, - ShutdownWorkerResponse, + PollNexusTaskQueueResponse, PollWorkflowTaskQueueResponse, + RespondWorkflowTaskCompletedResponse, ShutdownWorkerResponse, }, test_utils::start_timer_cmd, }, - worker::PollerBehavior, + worker::{PollerBehavior, WorkerTaskTypes}, }; use tokio::sync::{Barrier, watch}; @@ -135,7 +148,7 @@ async fn can_shutdown_local_act_only_worker_when_act_polling() { let mh = MockPollCfg::from_resp_batches("fakeid", t, [1], mock); let mut mock = build_mock_pollers(mh); mock.worker_cfg(|w| { - w.no_remote_activities = true; + w.task_types = WorkerTaskTypes::workflow_only(); w.max_cached_workflows = 1; }); let worker = mock_worker(mock); @@ -368,3 +381,194 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool) ); }); } + +fn create_test_activity_task() -> PollActivityTaskQueueResponse { + PollActivityTaskQueueResponse { + task_token: b"act-task".to_vec(), + workflow_execution: Some(WorkflowExecution { + workflow_id: "test".to_string(), + run_id: "run-id".to_string(), + }), + activity_id: "activity".to_string(), + activity_type: Some(ActivityType { + name: "activity".to_string(), + }), + ..Default::default() + } +} + +fn create_test_nexus_task() -> PollNexusTaskQueueResponse { + PollNexusTaskQueueResponse { + task_token: b"nex-task".to_vec(), + request: Some(NexusRequest { + header: Default::default(), + scheduled_time: None, + variant: Some(temporalio_common::protos::temporal::api::nexus::v1::request::Variant::StartOperation( + StartOperationRequest { + service: "test-service".to_string(), + operation: "test-operation".to_string(), + request_id: "test-request-id".to_string(), + callback: "".to_string(), + payload: None, + callback_header: Default::default(), + links: vec![], + } + )), + }), + poller_scaling_decision: None, + } +} + +fn create_test_nexus_completion(task_token: &[u8]) -> NexusTaskCompletion { + NexusTaskCompletion { + task_token: task_token.to_vec(), + status: Some(temporalio_common::protos::coresdk::nexus::nexus_task_completion::Status::Completed( + NexusResponse { + variant: Some(temporalio_common::protos::temporal::api::nexus::v1::response::Variant::StartOperation( + StartOperationResponse { + variant: Some(start_operation_response::Variant::SyncSuccess( + start_operation_response::Sync { + payload: None, + links: vec![], + } + )), + } + )), + } + )), + } +} + +#[rstest::rstest] +// With tasks +#[case::activity_only_with_task(false, true, false, true, "activity-only")] +#[case::nexus_only_with_task(false, false, true, true, "nexus-only")] +#[case::workflow_only_with_task(true, false, false, true, "workflow-only")] +#[case::workflow_and_activity_with_task(true, true, false, true, "workflow-activity")] +#[case::workflow_and_nexus_with_task(true, false, true, true, "workflow-nexus")] +#[case::activity_and_nexus_with_task(false, true, true, true, "activity-nexus")] +// Without tasks (idle worker shutdown) +#[case::activity_only_idle(false, true, false, false, "activity-only-idle")] +#[case::nexus_only_idle(false, false, true, false, "nexus-only-idle")] +#[case::workflow_only_idle(true, false, false, false, "workflow-only-idle")] +#[case::workflow_and_activity_idle(true, true, false, false, "workflow-activity-idle")] +#[case::workflow_and_nexus_idle(true, false, true, false, "workflow-nexus-idle")] +#[case::activity_and_nexus_idle(false, true, true, false, "activity-nexus-idle")] +#[tokio::test] +async fn test_task_type_combinations_unified( + #[case] enable_workflows: bool, + #[case] enable_activities: bool, + #[case] enable_nexus: bool, + #[case] with_task: bool, + #[case] queue_name: &str, +) { + let mut client = mock_worker_client(); + + if with_task { + if enable_activities { + client + .expect_complete_activity_task() + .returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default())); + } + if enable_nexus { + client + .expect_complete_nexus_task() + .returning(|_, _| Ok(RespondNexusTaskCompletedResponse::default())); + } + } + + let mut mocks = if enable_workflows && with_task { + let t = canned_histories::single_timer(queue_name); + let wf_cfg = MockPollCfg::from_resp_batches(queue_name, t, [1], client); + let mut mocks = build_mock_pollers(wf_cfg); + if enable_activities { + mocks.set_act_poller_from_resps(vec![QueueResponse::from(create_test_activity_task())]); + } + if enable_nexus { + mocks.set_nexus_poller_from_resps(vec![QueueResponse::from(create_test_nexus_task())]); + } + mocks + } else { + let wft_stream = if enable_workflows { + Some(stream::empty()) + } else { + None + }; + let activity_tasks = if enable_activities && with_task { + Some(vec![QueueResponse::from(create_test_activity_task())]) + } else { + None + }; + let nexus_tasks = if enable_nexus && with_task { + Some(vec![QueueResponse::from(create_test_nexus_task())]) + } else { + None + }; + MocksHolder::from_client_with_custom(client, wft_stream, activity_tasks, nexus_tasks) + }; + + mocks.worker_cfg(|w| { + w.task_queue = queue_name.to_string(); + w.task_types = WorkerTaskTypes { + enable_workflows, + enable_activities, + enable_nexus, + }; + w.skip_client_worker_set_check = true; + }); + let worker = mock_worker(mocks); + + if with_task { + if enable_workflows { + let activation = worker.poll_workflow_activation().await.unwrap(); + worker + .complete_workflow_activation(WorkflowActivationCompletion::from_cmds( + activation.run_id.clone(), + vec![CompleteWorkflowExecution::default().into()], + )) + .await + .unwrap(); + } + + if enable_activities { + let activity_task = worker.poll_activity_task().await.unwrap(); + worker + .complete_activity_task(ActivityTaskCompletion { + task_token: activity_task.task_token, + result: Some(ActivityExecutionResult::ok(vec![1].into())), + }) + .await + .unwrap(); + } + + if enable_nexus { + let nexus_task = worker.poll_nexus_task().await.unwrap(); + worker + .complete_nexus_task(create_test_nexus_completion(nexus_task.task_token())) + .await + .unwrap(); + } + } + + worker.initiate_shutdown(); + if enable_workflows { + assert_matches!( + worker.poll_workflow_activation().await.unwrap_err(), + PollError::ShutDown + ); + } + if enable_activities { + assert_matches!( + worker.poll_activity_task().await.unwrap_err(), + PollError::ShutDown + ); + } + if enable_nexus { + assert_matches!( + worker.poll_nexus_task().await.unwrap_err(), + PollError::ShutDown + ); + } + worker.shutdown().await; + worker.finalize_shutdown().await; +} diff --git a/crates/sdk-core/src/core_tests/workflow_tasks.rs b/crates/sdk-core/src/core_tests/workflow_tasks.rs index 90b84e8c5..ffa7444f6 100644 --- a/crates/sdk-core/src/core_tests/workflow_tasks.rs +++ b/crates/sdk-core/src/core_tests/workflow_tasks.rs @@ -66,7 +66,7 @@ use temporalio_common::{ }, worker::{ PollerBehavior, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, - SlotSupplier, SlotSupplierPermit, WorkflowSlotKind, + SlotSupplier, SlotSupplierPermit, WorkerTaskTypes, WorkflowSlotKind, }, }; use tokio::{ @@ -2671,7 +2671,7 @@ async fn poller_wont_run_ahead_of_task_slots() { .max_cached_workflows(10_usize) .max_outstanding_workflow_tasks(10_usize) .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(10_usize)) - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) .build() .unwrap(), mock_client, @@ -2731,7 +2731,7 @@ async fn poller_wont_poll_until_lang_polls() { let worker = Worker::new_test( test_worker_cfg() - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) .build() .unwrap(), mock_client, @@ -2876,7 +2876,7 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() { .build(), )) .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(10_usize)) - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) .build() .unwrap(), mock_client, @@ -3024,7 +3024,7 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() { .max_outstanding_workflow_tasks(2_usize) .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2_usize)) .nonsticky_to_sticky_poll_ratio(0.2) - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) .build() .unwrap(), Some("stickytq".to_string()), diff --git a/crates/sdk-core/src/replay/mod.rs b/crates/sdk-core/src/replay/mod.rs index cd088f0e7..eb870a4c0 100644 --- a/crates/sdk-core/src/replay/mod.rs +++ b/crates/sdk-core/src/replay/mod.rs @@ -30,7 +30,7 @@ use temporalio_common::{ }, }, }, - worker::{PollerBehavior, WorkerConfig}, + worker::{PollerBehavior, WorkerConfig, WorkerTaskTypes}, }; use tokio::sync::{Mutex as TokioMutex, mpsc, mpsc::UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -62,7 +62,7 @@ where pub(crate) fn into_core_worker(mut self) -> Result { self.config.max_cached_workflows = 1; self.config.workflow_task_poller_behavior = PollerBehavior::SimpleMaximum(1); - self.config.no_remote_activities = true; + self.config.task_types = WorkerTaskTypes::workflow_only(); self.config.skip_client_worker_set_check = true; let historator = Historator::new(self.history_stream); let post_activate = historator.get_post_activate_hook(); diff --git a/crates/sdk-core/src/test_help/integ_helpers.rs b/crates/sdk-core/src/test_help/integ_helpers.rs index e27a92c9f..c0c47d75d 100644 --- a/crates/sdk-core/src/test_help/integ_helpers.rs +++ b/crates/sdk-core/src/test_help/integ_helpers.rs @@ -36,6 +36,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ Worker as WorkerTrait, errors::PollError, @@ -105,6 +106,7 @@ pub fn test_worker_cfg() -> WorkerConfigBuilder { build_id: "test_bin_id".to_string(), }) .ignore_evicts_on_shutdown(true) + .task_types(WorkerTaskTypes::all()) // Serial polling since it makes mocking much easier. .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)); wcb @@ -184,10 +186,15 @@ pub fn build_fake_worker( pub fn mock_worker(mocks: MocksHolder) -> Worker { let sticky_q = sticky_q_name_for_worker("unit-test", mocks.inputs.config.max_cached_workflows); - let act_poller = if mocks.inputs.config.no_remote_activities { + let act_poller = if mocks.inputs.config.task_types.enable_activities { + mocks.inputs.act_poller + } else { None + }; + let nexus_poller = if mocks.inputs.config.task_types.enable_nexus { + mocks.inputs.nexus_poller } else { - mocks.inputs.act_poller + None }; Worker::new_with_pollers( mocks.inputs.config, @@ -196,10 +203,7 @@ pub fn mock_worker(mocks: MocksHolder) -> Worker { TaskPollers::Mocked { wft_stream: mocks.inputs.wft_stream, act_poller, - nexus_poller: mocks - .inputs - .nexus_poller - .unwrap_or_else(|| mock_poller_from_resps([])), + nexus_poller, }, None, None, @@ -230,15 +234,38 @@ impl MocksHolder { self.inputs.act_poller = Some(poller); } + /// Helper to create and set an activity poller from responses + #[cfg(test)] + pub(crate) fn set_act_poller_from_resps(&mut self, act_tasks: ACT) + where + ACT: IntoIterator>, + ::IntoIter: Send + 'static, + { + let act_poller = mock_poller_from_resps(act_tasks); + self.set_act_poller(act_poller); + } + + /// Helper to create and set a nexus poller from responses + #[cfg(test)] + pub(crate) fn set_nexus_poller_from_resps(&mut self, nexus_tasks: NEX) + where + NEX: IntoIterator>, + ::IntoIter: Send + 'static, + { + let nexus_poller = mock_poller_from_resps(nexus_tasks); + self.inputs.nexus_poller = Some(nexus_poller); + } + /// Can be used for tests that need to avoid auto-shutdown due to running out of mock responses pub fn make_wft_stream_interminable(&mut self) { - let old_stream = std::mem::replace(&mut self.inputs.wft_stream, stream::pending().boxed()); - self.inputs.wft_stream = old_stream.chain(stream::pending()).boxed(); + if let Some(old_stream) = self.inputs.wft_stream.take() { + self.inputs.wft_stream = Some(old_stream.chain(stream::pending()).boxed()); + } } } pub struct MockWorkerInputs { - pub(crate) wft_stream: BoxStream<'static, Result>, + pub(crate) wft_stream: Option>>, pub(crate) act_poller: Option>, pub(crate) nexus_poller: Option>, pub(crate) config: WorkerConfig, @@ -255,7 +282,7 @@ impl MockWorkerInputs { wft_stream: BoxStream<'static, Result>, ) -> Self { Self { - wft_stream, + wft_stream: Some(wft_stream), act_poller: None, nexus_poller: None, config: test_worker_cfg().build().unwrap(), @@ -284,10 +311,9 @@ impl MocksHolder { ACT: IntoIterator>, ::IntoIter: Send + 'static, { - let wft_stream = stream::pending().boxed(); let mock_act_poller = mock_poller_from_resps(act_tasks); let mock_worker = MockWorkerInputs { - wft_stream, + wft_stream: None, act_poller: Some(mock_act_poller), nexus_poller: None, config: test_worker_cfg().build().unwrap(), @@ -299,6 +325,66 @@ impl MocksHolder { } } + /// Uses the provided list of tasks to create a mock poller with a randomly generated task queue + pub fn from_client_with_nexus( + client: impl WorkerClient + 'static, + nexus_tasks: NEX, + ) -> Self + where + NEX: IntoIterator>, + ::IntoIter: Send + 'static, + { + let mock_nexus_poller = mock_poller_from_resps(nexus_tasks); + let mock_worker = MockWorkerInputs { + wft_stream: None, + act_poller: None, + nexus_poller: Some(mock_nexus_poller), + config: test_worker_cfg().build().unwrap(), + }; + Self { + client: Arc::new(client), + inputs: mock_worker, + outstanding_task_map: None, + } + } + + /// Create a MocksHolder with custom combination of task pollers. + /// Allows any combination of workflow, activity, and nexus tasks. + pub fn from_client_with_custom( + client: impl WorkerClient + 'static, + wft_stream: Option, + activity_tasks: Option, + nexus_tasks: Option, + ) -> Self + where + WFT: Stream + Send + 'static, + ACT: IntoIterator>, + ::IntoIter: Send + 'static, + NEX: IntoIterator>, + ::IntoIter: Send + 'static, + { + let wft_stream = wft_stream.map(|s| { + s.map(|r| Ok(r.try_into().expect("Mock responses must be valid work"))) + .boxed() + }); + + let act_poller = activity_tasks.map(|tasks| mock_poller_from_resps(tasks)); + let nexus_poller = nexus_tasks.map(|tasks| mock_poller_from_resps(tasks)); + + let mock_worker = MockWorkerInputs { + wft_stream, + act_poller, + nexus_poller, + config: test_worker_cfg().build().unwrap(), + }; + + Self { + client: Arc::new(client), + inputs: mock_worker, + outstanding_task_map: None, + } + } + /// Uses the provided task responses and delivers them as quickly as possible when polled. /// This is only useful to test buffering, as typically you do not want to pretend that /// the server is delivering WFTs super fast for the same run. @@ -306,9 +392,11 @@ impl MocksHolder { client: impl WorkerClient + 'static, stream: impl Stream + Send + 'static, ) -> Self { - let wft_stream = stream - .map(|r| Ok(r.try_into().expect("Mock responses must be valid work"))) - .boxed(); + let wft_stream = Some( + stream + .map(|r| Ok(r.try_into().expect("Mock responses must be valid work"))) + .boxed(), + ); let mock_worker = MockWorkerInputs { wft_stream, act_poller: None, diff --git a/crates/sdk-core/src/worker/heartbeat.rs b/crates/sdk-core/src/worker/heartbeat.rs index 400c202c3..8cbd6c42d 100644 --- a/crates/sdk-core/src/worker/heartbeat.rs +++ b/crates/sdk-core/src/worker/heartbeat.rs @@ -7,7 +7,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use temporalio_client::SharedNamespaceWorkerTrait; use temporalio_common::{ protos::temporal::api::worker::v1::WorkerHeartbeat, - worker::{PollerBehavior, WorkerConfigBuilder, WorkerVersioningStrategy}, + worker::{PollerBehavior, WorkerConfigBuilder, WorkerTaskTypes, WorkerVersioningStrategy}, }; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; @@ -38,7 +38,7 @@ impl SharedNamespaceWorker { "temporal-sys/worker-commands/{namespace}/{}", client.worker_grouping_key(), )) - .no_remote_activities(true) + .task_types(WorkerTaskTypes::nexus_only()) .max_outstanding_nexus_tasks(5_usize) .versioning_strategy(WorkerVersioningStrategy::None { build_id: "1.0".to_owned(), diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index 9825db89f..d6c0ceb79 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -115,14 +115,14 @@ pub struct Worker { client: Arc, /// Worker instance key, unique identifier for this worker worker_instance_key: Uuid, - /// Manages all workflows and WFT processing - workflows: Workflows, + /// Manages all workflows and WFT processing. None if workflow polling is disabled + workflows: Option, /// Manages activity tasks for this worker/task queue at_task_mgr: Option, - /// Manages local activities - local_act_mgr: Arc, + /// Manages local activities. None if workflow polling is disabled (local activities require workflows) + local_act_mgr: Option>, /// Manages Nexus tasks - nexus_mgr: NexusManager, + nexus_mgr: Option, /// Has shutdown been called? shutdown_token: CancellationToken, /// Will be called at the end of each activation completion @@ -186,7 +186,10 @@ impl WorkerTrait for Worker { #[instrument(skip(self))] async fn poll_nexus_task(&self) -> Result { - self.nexus_mgr.next_nexus_task().await + match &self.nexus_mgr { + Some(mgr) => mgr.next_nexus_task().await, + None => Err(PollError::ShutDown), + } } async fn complete_workflow_activation( @@ -268,7 +271,9 @@ impl WorkerTrait for Worker { // Push a BumpStream message to the workflow activation queue. This ensures that // any pending workflow activation polls will resolve, even if there are no other inputs. - self.workflows.bump_stream(); + if let Some(workflows) = &self.workflows { + workflows.bump_stream(); + } // Second, we want to stop polling of both activity and workflow tasks if let Some(atm) = self.at_task_mgr.as_ref() { @@ -276,13 +281,15 @@ impl WorkerTrait for Worker { } // Let the manager know that shutdown has been initiated to try to unblock the local // activity poll in case this worker is an activity-only worker. - self.local_act_mgr.shutdown_initiated(); - - // If workflows have never been polled, immediately tell the local activity manager - // that workflows have shut down, so it can proceed with shutdown without waiting. - // This is particularly important for activity-only workers. - if !self.workflows.ever_polled() { - self.local_act_mgr.workflows_have_shutdown(); + if let Some(la_mgr) = &self.local_act_mgr { + la_mgr.shutdown_initiated(); + + // If workflows have never been polled, immediately tell the local activity manager + // that workflows have shut down, so it can proceed with shutdown without waiting. + // This is particularly important for activity-only workers. + if self.workflows.as_ref().is_none_or(|w| !w.ever_polled()) { + la_mgr.workflows_have_shutdown(); + } } } @@ -446,28 +453,32 @@ impl Worker { ); let (wft_stream, act_poller, nexus_poller) = match task_pollers { TaskPollers::Real => { - let wft_stream = make_wft_poller( - &config, - &sticky_queue_name, - &client, - &metrics, - &shutdown_token, - &wft_slots, - wf_last_suc_poll_time.clone(), - wf_sticky_last_suc_poll_time.clone(), - ); - let wft_stream = if !client.is_mock() { - // Some replay tests combine a mock client with real pollers, - // and they don't need to use the external stream - stream::select(wft_stream, UnboundedReceiverStream::new(external_wft_rx)) - .left_stream() + let wft_stream = if config.task_types.enable_workflows { + let stream = make_wft_poller( + &config, + &sticky_queue_name, + &client, + &metrics, + &shutdown_token, + &wft_slots, + wf_last_suc_poll_time.clone(), + wf_sticky_last_suc_poll_time.clone(), + ) + .boxed(); + let stream = if !client.is_mock() { + // Some replay tests combine a mock client with real pollers, + // and they don't need to use the external stream + stream::select(stream, UnboundedReceiverStream::new(external_wft_rx)) + .left_stream() + } else { + stream.right_stream() + }; + Some(stream) } else { - wft_stream.right_stream() + None }; - let act_poll_buffer = if config.no_remote_activities { - None - } else { + let act_poll_buffer = if config.task_types.enable_activities { let act_metrics = metrics.with_new_attrs([activity_poller()]); let ap = LongPollBuffer::new_activity_task( client.clone(), @@ -483,23 +494,28 @@ impl Worker { act_last_suc_poll_time.clone(), ); Some(Box::from(ap) as BoxedActPoller) + } else { + None }; - let np_metrics = metrics.with_new_attrs([nexus_poller()]); - - let nexus_poll_buffer = Box::new(LongPollBuffer::new_nexus_task( - client.clone(), - config.task_queue.clone(), - config.nexus_task_poller_behavior, - nexus_slots.clone(), - shutdown_token.child_token(), - Some(move |np| np_metrics.record_num_pollers(np)), - nexus_last_suc_poll_time.clone(), - shared_namespace_worker, - )) as BoxedNexusPoller; + let nexus_poll_buffer = if config.task_types.enable_nexus { + let np_metrics = metrics.with_new_attrs([nexus_poller()]); + Some(Box::new(LongPollBuffer::new_nexus_task( + client.clone(), + config.task_queue.clone(), + config.nexus_task_poller_behavior, + nexus_slots.clone(), + shutdown_token.child_token(), + Some(move |np| np_metrics.record_num_pollers(np)), + nexus_last_suc_poll_time.clone(), + shared_namespace_worker, + )) as BoxedNexusPoller) + } else { + None + }; #[cfg(any(feature = "test-utilities", test))] - let wft_stream = wft_stream.left_stream(); + let wft_stream = wft_stream.map(|s| s.left_stream()); (wft_stream, act_poll_buffer, nexus_poll_buffer) } #[cfg(any(feature = "test-utilities", test))] @@ -508,27 +524,45 @@ impl Worker { act_poller, nexus_poller, } => { + let wft_stream = config + .task_types + .enable_workflows + .then_some(wft_stream) + .flatten(); + let act_poller = config + .task_types + .enable_activities + .then_some(act_poller) + .flatten(); + let nexus_poller = config + .task_types + .enable_nexus + .then_some(nexus_poller) + .flatten(); + let ap = act_poller .map(|ap| MockPermittedPollBuffer::new(Arc::new(act_slots.clone()), ap)); - let np = MockPermittedPollBuffer::new(Arc::new(nexus_slots.clone()), nexus_poller); - let wft_semaphore = wft_slots.clone(); - let wfs = wft_stream.then(move |s| { - let wft_semaphore = wft_semaphore.clone(); - async move { - let permit = wft_semaphore.acquire_owned().await; - s.map(|s| (s, permit)) - } + let np = nexus_poller + .map(|np| MockPermittedPollBuffer::new(Arc::new(nexus_slots.clone()), np)); + let wfs = wft_stream.map(|stream| { + let wft_semaphore = wft_slots.clone(); + let wfs = stream.then(move |s| { + let wft_semaphore = wft_semaphore.clone(); + async move { + let permit = wft_semaphore.acquire_owned().await; + s.map(|s| (s, permit)) + } + }); + wfs.right_stream() }); - let wfs = wfs.right_stream(); ( wfs, ap.map(|ap| Box::new(ap) as BoxedActPoller), - Box::new(np) as BoxedNexusPoller, + np.map(|np| Box::new(np) as BoxedNexusPoller), ) } }; - let (hb_tx, hb_rx) = unbounded_channel(); let la_permit_dealer = MeteredPermitDealer::new( tuner.local_activity_slot_supplier(), metrics.with_new_attrs([local_activity_worker_type()]), @@ -537,12 +571,21 @@ impl Worker { meter.clone(), ); let la_permits = la_permit_dealer.get_extant_count_rcv(); - let local_act_mgr = Arc::new(LocalActivityManager::new( - config.namespace.clone(), - la_permit_dealer.clone(), - hb_tx, - metrics.clone(), - )); + + let (local_act_mgr, la_sink, hb_rx) = if config.task_types.enable_workflows { + let (hb_tx, hb_rx) = unbounded_channel(); + let local_act_mgr = Arc::new(LocalActivityManager::new( + config.namespace.clone(), + la_permit_dealer.clone(), + hb_tx, + metrics.clone(), + )); + let la_sink = LAReqSink::new(local_act_mgr.clone()); + (Some(local_act_mgr), Some(la_sink), Some(hb_rx)) + } else { + (None, None, None) + }; + let at_task_mgr = act_poller.map(|ap| { WorkerActivityTasks::new( act_slots.clone(), @@ -559,14 +602,15 @@ impl Worker { if !poll_on_non_local_activities && !shared_namespace_worker { info!("Activity polling is disabled for this worker"); }; - let la_sink = LAReqSink::new(local_act_mgr.clone()); - let nexus_mgr = NexusManager::new( - nexus_poller, - metrics.clone(), - config.graceful_shutdown_period, - shutdown_token.child_token(), - ); + let nexus_mgr = nexus_poller.map(|poller| { + NexusManager::new( + poller, + metrics.clone(), + config.graceful_shutdown_period, + shutdown_token.child_token(), + ) + }); let deployment_options = match &config.versioning_strategy { temporalio_common::worker::WorkerVersioningStrategy::WorkerDeploymentBased(opts) => { @@ -626,47 +670,51 @@ impl Worker { Ok(Self { worker_instance_key, client: client.clone(), - workflows: Workflows::new( - WorkflowBasics { - worker_config: Arc::new(config.clone()), - shutdown_token: shutdown_token.child_token(), - metrics, - server_capabilities: client.capabilities().unwrap_or_default(), - sdk_name: sdk_name_and_ver.0, - sdk_version: sdk_name_and_ver.1, - default_versioning_behavior: config - .versioning_strategy - .default_versioning_behavior(), - }, - sticky_queue_name.map(|sq| StickyExecutionAttributes { - worker_task_queue: Some(TaskQueue { - name: sq, - kind: TaskQueueKind::Sticky as i32, - normal_name: config.task_queue.clone(), + workflows: wft_stream.map(|stream| { + Workflows::new( + WorkflowBasics { + worker_config: Arc::new(config.clone()), + shutdown_token: shutdown_token.child_token(), + metrics, + server_capabilities: client.capabilities().unwrap_or_default(), + sdk_name: sdk_name_and_ver.0, + sdk_version: sdk_name_and_ver.1, + default_versioning_behavior: config + .versioning_strategy + .default_versioning_behavior(), + }, + sticky_queue_name.map(|sq| StickyExecutionAttributes { + worker_task_queue: Some(TaskQueue { + name: sq, + kind: TaskQueueKind::Sticky as i32, + normal_name: config.task_queue.clone(), + }), + schedule_to_start_timeout: Some( + config + .sticky_queue_schedule_to_start_timeout + .try_into() + .expect("timeout fits into proto"), + ), }), - schedule_to_start_timeout: Some( - config - .sticky_queue_schedule_to_start_timeout - .try_into() - .expect("timeout fits into proto"), - ), - }), - client, - wft_slots, - wft_stream, - la_sink, - local_act_mgr.clone(), - hb_rx, - at_task_mgr.as_ref().and_then(|mgr| { - match config.max_task_queue_activities_per_second { - Some(persec) if persec > 0.0 => None, - _ => Some(mgr.get_handle_for_workflows()), - } - }), - worker_telemetry - .as_ref() - .and_then(|telem| telem.trace_subscriber.clone()), - ), + client, + wft_slots, + stream, + la_sink.expect("LA sink must exist when workflows enabled"), + local_act_mgr + .clone() + .expect("LA manager must exist when workflows enabled"), + hb_rx.expect("Heartbeat channel must exist when workflows enabled"), + at_task_mgr.as_ref().and_then(|mgr| { + match config.max_task_queue_activities_per_second { + Some(persec) if persec > 0.0 => None, + _ => Some(mgr.get_handle_for_workflows()), + } + }), + worker_telemetry + .as_ref() + .and_then(|telem| telem.trace_subscriber.clone()), + ) + }), at_task_mgr, local_act_mgr, config, @@ -690,7 +738,9 @@ impl Worker { /// completed async fn shutdown(&self) { self.initiate_shutdown(); - if let Some(name) = self.workflows.get_sticky_queue_name() { + if let Some(workflows) = &self.workflows + && let Some(name) = workflows.get_sticky_queue_name() + { let heartbeat = self .client_worker_registrator .heartbeat_manager @@ -715,20 +765,24 @@ impl Worker { } // We need to wait for all local activities to finish so no more workflow task heartbeats // will be generated - self.local_act_mgr - .wait_all_outstanding_tasks_finished() - .await; + if let Some(la_mgr) = &self.local_act_mgr { + la_mgr.wait_all_outstanding_tasks_finished().await; + } // Wait for workflows to finish - self.workflows - .shutdown() - .await - .expect("Workflow processing terminates cleanly"); + if let Some(workflows) = &self.workflows { + workflows + .shutdown() + .await + .expect("Workflow processing terminates cleanly"); + } // Wait for activities to finish if let Some(acts) = self.at_task_mgr.as_ref() { acts.shutdown().await; } // Wait for nexus tasks to finish - self.nexus_mgr.shutdown().await; + if let Some(nexus) = &self.nexus_mgr { + nexus.shutdown().await; + } // Wait for all permits to be released, but don't totally hang real-world shutdown. tokio::select! { _ = async { self.all_permits_tracker.lock().await.all_done().await } => {}, @@ -752,30 +806,38 @@ impl Worker { /// Returns number of currently cached workflows pub async fn cached_workflows(&self) -> usize { - self.workflows - .get_state_info() - .await - .map(|r| r.cached_workflows) - .unwrap_or_default() + match &self.workflows { + Some(workflows) => workflows + .get_state_info() + .await + .map(|r| r.cached_workflows) + .unwrap_or_default(), + None => 0, + } } /// Returns number of currently outstanding workflow tasks #[cfg(test)] pub(crate) async fn outstanding_workflow_tasks(&self) -> usize { - self.workflows - .get_state_info() - .await - .map(|r| r.outstanding_wft) - .unwrap_or_default() + match &self.workflows { + Some(workflows) => workflows + .get_state_info() + .await + .map(|r| r.outstanding_wft) + .unwrap_or_default(), + None => 0, + } } #[allow(unused)] pub(crate) fn available_wft_permits(&self) -> Option { - self.workflows.available_wft_permits() + self.workflows + .as_ref() + .and_then(|w| w.available_wft_permits()) } #[cfg(test)] pub(crate) fn unused_wft_permits(&self) -> Option { - self.workflows.unused_wft_permits() + self.workflows.as_ref().and_then(|w| w.unused_wft_permits()) } /// Get new activity tasks (may be local or nonlocal). Local activities are returned first @@ -817,16 +879,23 @@ impl Worker { future::pending::<()>().await; unreachable!() } - match self.local_act_mgr.next_pending().await { - Some(NextPendingLAAction::Dispatch(r)) => Ok(Some(r)), - Some(NextPendingLAAction::Autocomplete(action)) => { - Ok(self.handle_la_complete_action(action)) - } - None => { - if self.shutdown_token.is_cancelled() { - self.local_activities_complete - .store(true, Ordering::Relaxed); + match &self.local_act_mgr { + Some(la_mgr) => match la_mgr.next_pending().await { + Some(NextPendingLAAction::Dispatch(r)) => Ok(Some(r)), + Some(NextPendingLAAction::Autocomplete(action)) => { + Ok(self.handle_la_complete_action(action)) + } + None => { + if self.shutdown_token.is_cancelled() { + self.local_activities_complete + .store(true, Ordering::Relaxed); + } + Ok(None) } + }, + None => { + self.local_activities_complete + .store(true, Ordering::Relaxed); Ok(None) } } @@ -873,29 +942,35 @@ impl Worker { if let Some(atm) = &self.at_task_mgr { atm.complete(task_token, status, &*self.client).await; + Ok(()) } else { - error!( - "Tried to complete activity {} on a worker that does not have an activity manager", - task_token - ); + Err(CompleteActivityError::ActivityNotEnabled) } - Ok(()) } #[instrument(skip(self), fields(run_id, workflow_id, task_queue=%self.config.task_queue))] pub(crate) async fn next_workflow_activation(&self) -> Result { - let r = self.workflows.next_workflow_activation().await; - // In the event workflows are shutdown or erroring, begin shutdown of everything else. Once - // they are shut down, tell the local activity manager that, so that it can know to cancel - // any remaining outstanding LAs and shutdown. - if let Err(ref e) = r { - // This is covering the situation where WFT pollers dying is the reason for shutdown - self.initiate_shutdown(); - if matches!(e, PollError::ShutDown) { - self.local_act_mgr.workflows_have_shutdown(); + match &self.workflows { + Some(workflows) => { + let r = workflows.next_workflow_activation().await; + // In the event workflows are shutdown or erroring, begin shutdown of everything else. Once + // they are shut down, tell the local activity manager that, so that it can know to cancel + // any remaining outstanding LAs and shutdown. + if let Err(ref e) = r { + // This is covering the situation where WFT pollers dying is the reason for shutdown + self.initiate_shutdown(); + if matches!(e, PollError::ShutDown) { + if let Some(la_mgr) = &self.local_act_mgr { + la_mgr.workflows_have_shutdown(); + } else { + dbg_panic!("la_mgr should be set if worker supports workflows"); + } + } + } + r } + None => Err(PollError::ShutDown), } - r } #[instrument(skip(self, completion), @@ -905,16 +980,21 @@ impl Worker { &self, completion: WorkflowActivationCompletion, ) -> Result<(), CompleteWfError> { - self.workflows - .activation_completed( - completion, - false, - self.post_activate_hook - .as_ref() - .map(|h| |data: PostActivateHookData| h(self, data)), - ) - .await?; - Ok(()) + match &self.workflows { + Some(workflows) => { + workflows + .activation_completed( + completion, + false, + self.post_activate_hook + .as_ref() + .map(|h| |data: PostActivateHookData| h(self, data)), + ) + .await?; + Ok(()) + } + None => Err(CompleteWfError::WorkflowNotEnabled), + } } #[instrument( @@ -926,9 +1006,10 @@ impl Worker { tt: TaskToken, status: nexus_task_completion::Status, ) -> Result<(), CompleteNexusError> { - self.nexus_mgr - .complete_task(tt, status, &*self.client) - .await + match &self.nexus_mgr { + Some(mgr) => mgr.complete_task(tt, status, &*self.client).await, + None => Err(CompleteNexusError::NexusNotEnabled), + } } /// Request a workflow eviction @@ -938,7 +1019,11 @@ impl Worker { message: impl Into, reason: EvictionReason, ) { - self.workflows.request_eviction(run_id, message, reason); + if let Some(workflows) = &self.workflows { + workflows.request_eviction(run_id, message, reason); + } else { + dbg_panic!("trying to request wf eviction when workflows not enabled for this worker"); + } } /// Sets a function to be called at the end of each activation completion @@ -950,9 +1035,10 @@ impl Worker { } fn complete_local_act(&self, task_token: TaskToken, la_res: LocalActivityExecutionResult) { - if self - .handle_la_complete_action(self.local_act_mgr.complete(&task_token, la_res)) - .is_some() + if let Some(la_mgr) = &self.local_act_mgr + && self + .handle_la_complete_action(la_mgr.complete(&task_token, la_res)) + .is_some() { dbg_panic!("Should never be a task from direct completion"); } @@ -974,7 +1060,11 @@ impl Worker { } fn notify_local_result(&self, run_id: &str, res: LocalResolution) { - self.workflows.notify_of_local_result(run_id, res); + if let Some(workflows) = &self.workflows { + workflows.notify_of_local_result(run_id, res); + } else { + dbg_panic!("trying to notify local result when workflows not enabled for this worker"); + } } async fn verify_namespace_exists(&self) -> Result<(), WorkerValidationError> { @@ -1220,9 +1310,9 @@ pub(crate) enum TaskPollers { Real, #[cfg(any(feature = "test-utilities", test))] Mocked { - wft_stream: BoxStream<'static, Result>, + wft_stream: Option>>, act_poller: Option>, - nexus_poller: BoxedPoller, + nexus_poller: Option>, }, } diff --git a/crates/sdk-core/tests/common/mod.rs b/crates/sdk-core/tests/common/mod.rs index 5a1c400da..dbba4baf8 100644 --- a/crates/sdk-core/tests/common/mod.rs +++ b/crates/sdk-core/tests/common/mod.rs @@ -32,6 +32,7 @@ use temporalio_client::{ WfClientExt, WorkflowClientTrait, WorkflowExecutionInfo, WorkflowExecutionResult, WorkflowHandle, WorkflowOptions, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ Worker as CoreWorker, protos::{ @@ -59,6 +60,8 @@ use temporalio_sdk::{ WorkerInterceptor, }, }; +#[cfg(any(feature = "test-utilities", test))] +pub(crate) use temporalio_sdk_core::test_help::NAMESPACE; use temporalio_sdk_core::{ ClientOptions, ClientOptionsBuilder, CoreRuntime, RuntimeOptions, RuntimeOptionsBuilder, WorkerConfig, WorkerConfigBuilder, init_replay_worker, init_worker, @@ -71,9 +74,6 @@ use tonic::IntoRequest; use tracing::{debug, warn}; use url::Url; use uuid::Uuid; - -#[cfg(any(feature = "test-utilities", test))] -pub(crate) use temporalio_sdk_core::test_help::NAMESPACE; /// The env var used to specify where the integ tests should point pub(crate) const INTEG_SERVER_TARGET_ENV_VAR: &str = "TEMPORAL_SERVICE_ADDRESS"; pub(crate) const INTEG_NAMESPACE_ENV_VAR: &str = "TEMPORAL_NAMESPACE"; @@ -112,6 +112,7 @@ pub(crate) fn integ_worker_config(tq: &str) -> WorkerConfigBuilder { .versioning_strategy(WorkerVersioningStrategy::None { build_id: "test_build_id".to_owned(), }) + .task_types(WorkerTaskTypes::all()) .skip_client_worker_set_check(true); b } diff --git a/crates/sdk-core/tests/heavy_tests.rs b/crates/sdk-core/tests/heavy_tests.rs index 561e99340..6d7bd1d6c 100644 --- a/crates/sdk-core/tests/heavy_tests.rs +++ b/crates/sdk-core/tests/heavy_tests.rs @@ -22,6 +22,8 @@ use std::{ time::{Duration, Instant}, }; use temporalio_client::{GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowOptions}; + +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ protos::{ coresdk::{AsJsonPayloadExt, workflow_commands::ActivityCancellationType}, @@ -349,7 +351,7 @@ async fn can_paginate_long_history() { let mut starter = CoreWfStarter::new(wf_name); starter .worker_config - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) // Do not use sticky queues so we are forced to paginate once history gets long .max_cached_workflows(0_usize); diff --git a/crates/sdk-core/tests/integ_tests/metrics_tests.rs b/crates/sdk-core/tests/integ_tests/metrics_tests.rs index abdfdca7f..e938954fd 100644 --- a/crates/sdk-core/tests/integ_tests/metrics_tests.rs +++ b/crates/sdk-core/tests/integ_tests/metrics_tests.rs @@ -17,6 +17,7 @@ use std::{ use temporalio_client::{ REQUEST_LATENCY_HISTOGRAM_NAME, WorkflowClientTrait, WorkflowOptions, WorkflowService, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ Worker, errors::PollError, @@ -164,6 +165,7 @@ async fn one_slot_worker_reports_available_slot() { .max_outstanding_workflow_tasks(2_usize) .max_outstanding_nexus_tasks(1_usize) .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2_usize)) + .task_types(WorkerTaskTypes::all()) .build() .unwrap(); @@ -919,7 +921,11 @@ async fn nexus_metrics() { let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); let wf_name = "nexus_metrics"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); - starter.worker_config.no_remote_activities(true); + starter.worker_config.task_types(WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: true, + }); let task_queue = starter.get_task_queue().to_owned(); let mut worker = starter.worker().await; let core_worker = starter.get_worker().await; @@ -1096,7 +1102,9 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); let wf_name = "evict_on_complete_does_not_count_as_forced_eviction"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf( @@ -1179,7 +1187,9 @@ async fn metrics_available_from_custom_slot_supplier() { let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); let mut starter = CoreWfStarter::new_with_runtime("metrics_available_from_custom_slot_supplier", rt); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); starter.worker_config.clear_max_outstanding_opts(); let mut tb = TunerBuilder::default(); tb.workflow_slot_supplier(Arc::new(MetricRecordingSlotSupplier:: { @@ -1348,7 +1358,9 @@ async fn sticky_queue_label_strategy( let mut starter = CoreWfStarter::new_with_runtime(&wf_name, rt); // Enable sticky queues by setting a reasonable cache size starter.worker_config.max_cached_workflows(10_usize); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let task_queue = starter.get_task_queue().to_owned(); let mut worker = starter.worker().await; @@ -1424,7 +1436,9 @@ async fn resource_based_tuner_metrics() { let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); let wf_name = "resource_based_tuner_metrics"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); starter.worker_config.clear_max_outstanding_opts(); // Create a resource-based tuner with reasonable thresholds diff --git a/crates/sdk-core/tests/integ_tests/update_tests.rs b/crates/sdk-core/tests/integ_tests/update_tests.rs index 7265ff39a..872a79303 100644 --- a/crates/sdk-core/tests/integ_tests/update_tests.rs +++ b/crates/sdk-core/tests/integ_tests/update_tests.rs @@ -14,6 +14,7 @@ use std::{ use temporalio_client::{ Client, NamespacedClient, RetryClient, WorkflowClientTrait, WorkflowService, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ Worker, prost_dur, protos::{ @@ -723,7 +724,9 @@ async fn update_with_local_acts() { async fn update_rejection_sdk() { let wf_name = "update_rejection_sdk"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let client = starter.get_client().await; worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { @@ -767,7 +770,9 @@ async fn update_rejection_sdk() { async fn update_fail_sdk() { let wf_name = "update_fail_sdk"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let client = starter.get_client().await; worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { @@ -811,7 +816,9 @@ async fn update_fail_sdk() { async fn update_timer_sequence() { let wf_name = "update_timer_sequence"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let client = starter.get_client().await; worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { @@ -859,7 +866,9 @@ async fn update_timer_sequence() { async fn task_failure_during_validation() { let wf_name = "task_failure_during_validation"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); starter.workflow_options.task_timeout = Some(Duration::from_secs(1)); let mut worker = starter.worker().await; let client = starter.get_client().await; @@ -920,7 +929,9 @@ async fn task_failure_during_validation() { async fn task_failure_after_update() { let wf_name = "task_failure_after_update"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); starter.workflow_options.task_timeout = Some(Duration::from_secs(1)); let mut worker = starter.worker().await; let client = starter.get_client().await; diff --git a/crates/sdk-core/tests/integ_tests/worker_tests.rs b/crates/sdk-core/tests/integ_tests/worker_tests.rs index 097b4e088..9b77be17f 100644 --- a/crates/sdk-core/tests/integ_tests/worker_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_tests.rs @@ -19,6 +19,7 @@ use std::{ time::Duration, }; use temporalio_client::WorkflowOptions; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ Worker, errors::WorkerValidationError, @@ -89,6 +90,7 @@ async fn worker_validation_fails_on_nonexistent_namespace() { .versioning_strategy(WorkerVersioningStrategy::None { build_id: "blah".to_owned(), }) + .task_types(WorkerTaskTypes::all()) .build() .unwrap(), retrying_client, @@ -177,7 +179,7 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() { starter .worker_config .clear_max_outstanding_opts() - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) // 3 pollers so the minimum slots of 2 can both be handed out to a sticky poller .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(3_usize)); // Set the limits to zero so it's essentially unwilling to hand out slots @@ -210,7 +212,9 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() { async fn oversize_grpc_message() { let wf_name = "oversize_grpc_message"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut core = starter.worker().await; static OVERSIZE_GRPC_MESSAGE_RUN: AtomicBool = AtomicBool::new(false); diff --git a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs index 6154cca6e..e7ac54f09 100644 --- a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs @@ -18,7 +18,9 @@ use temporalio_common::{ }, }, }, - worker::{WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy}, + worker::{ + WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerTaskTypes, WorkerVersioningStrategy, + }, }; use temporalio_sdk::{ActivityOptions, WfContext}; use temporalio_sdk_core::test_help::WorkerTestHelpers; @@ -44,7 +46,7 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(), }, )) - .no_remote_activities(true); + .task_types(WorkerTaskTypes::workflow_only()); let core = starter.get_worker().await; let client = starter.get_client().await; diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests.rs b/crates/sdk-core/tests/integ_tests/workflow_tests.rs index 81bdb1fed..aa783aa09 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests.rs @@ -33,6 +33,7 @@ use std::{ use temporalio_client::{ WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ errors::{PollError, WorkflowErrorType}, prost_dur, @@ -77,7 +78,9 @@ use tokio::{join, sync::Notify, time::sleep}; async fn parallel_workflows_same_queue() { let wf_name = "parallel_workflows_same_queue"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut core = starter.worker().await; core.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { @@ -544,7 +547,7 @@ async fn deployment_version_correct_in_wf_info(#[values(true, false)] use_only_b starter .worker_config .versioning_strategy(version_strat) - .no_remote_activities(true); + .task_types(WorkerTaskTypes::workflow_only()); let core = starter.get_worker().await; starter.start_wf().await; let client = starter.get_client().await; @@ -767,7 +770,9 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to( let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); let wf_name = "nondeterminism_errors_fail_workflow_when_configured_to"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let typeset = HashSet::from([WorkflowErrorType::Nondeterminism]); if whole_worker { starter.worker_config.workflow_failure_errors(typeset); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_external.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_external.rs index 6bf9ff3d1..942a5a75c 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_external.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_external.rs @@ -5,6 +5,7 @@ use temporalio_common::protos::{ coresdk::{FromJsonPayloadExt, common::NamespacedWorkflowExecution}, temporal::api::enums::v1::{CommandType, EventType}, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{WfContext, WorkflowResult}; use temporalio_sdk_core::test_help::MockPollCfg; @@ -39,7 +40,9 @@ async fn cancel_receiver(ctx: WfContext) -> WorkflowResult { #[tokio::test] async fn sends_cancel_to_other_wf() { let mut starter = CoreWfStarter::new("sends_cancel_to_other_wf"); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf("sender", cancel_sender); worker.register_wf("receiver", cancel_receiver); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs index 348f9c2df..747ad7bb2 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs @@ -6,6 +6,7 @@ use temporalio_common::protos::{ coresdk::workflow_activation::{WorkflowActivationJob, workflow_activation_job}, temporal::api::enums::v1::{CommandType, WorkflowExecutionStatus}, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{WfContext, WfExitValue, WorkflowResult}; use temporalio_sdk_core::test_help::MockPollCfg; @@ -32,7 +33,9 @@ async fn cancelled_wf(ctx: WfContext) -> WorkflowResult<()> { async fn cancel_during_timer() { let wf_name = "cancel_during_timer"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let client = starter.get_client().await; worker.register_wf(wf_name.to_string(), cancelled_wf); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs index 3a2fdace3..e9029b7c1 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs @@ -3,6 +3,7 @@ use anyhow::anyhow; use assert_matches::assert_matches; use std::time::Duration; use temporalio_client::{WorkflowClientTrait, WorkflowOptions}; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ Worker, protos::{ @@ -85,7 +86,9 @@ async fn happy_parent(ctx: WfContext) -> WorkflowResult<()> { #[tokio::test] async fn child_workflow_happy_path() { let mut starter = CoreWfStarter::new("child-workflows"); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(PARENT_WF_TYPE.to_string(), happy_parent); @@ -106,7 +109,9 @@ async fn child_workflow_happy_path() { #[tokio::test] async fn abandoned_child_bug_repro() { let mut starter = CoreWfStarter::new("child-workflow-abandon-bug"); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2))); @@ -177,7 +182,9 @@ async fn abandoned_child_bug_repro() { #[tokio::test] async fn abandoned_child_resolves_post_cancel() { let mut starter = CoreWfStarter::new("child-workflow-resolves-post-cancel"); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2))); @@ -244,7 +251,9 @@ async fn abandoned_child_resolves_post_cancel() { async fn cancelled_child_gets_reason() { let wf_name = "cancelled-child-gets-reason"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_string(), move |ctx: WfContext| async move { diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs index daa142330..1785adb47 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -6,6 +6,7 @@ use temporalio_common::protos::{ coresdk::workflow_commands::ContinueAsNewWorkflowExecution, temporal::api::enums::v1::CommandType, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{WfContext, WfExitValue, WorkflowResult}; use temporalio_sdk_core::test_help::MockPollCfg; @@ -26,7 +27,9 @@ async fn continue_as_new_wf(ctx: WfContext) -> WorkflowResult<()> { async fn continue_as_new_happy_path() { let wf_name = "continue_as_new_happy_path"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_string(), continue_as_new_wf); @@ -48,7 +51,7 @@ async fn continue_as_new_multiple_concurrent() { let mut starter = CoreWfStarter::new(wf_name); starter .worker_config - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) .max_cached_workflows(5_usize) .max_outstanding_workflow_tasks(5_usize); let mut worker = starter.worker().await; diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs index 6abc1d3a2..4e435992b 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs @@ -12,6 +12,7 @@ use temporalio_common::protos::{ failure::v1::Failure, }, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{ ActContext, ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, WfContext, WorkflowResult, @@ -52,7 +53,9 @@ pub(crate) async fn timer_wf_nondeterministic(ctx: WfContext) -> WorkflowResult< async fn test_determinism_error_then_recovers() { let wf_name = "test_determinism_error_then_recovers"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), timer_wf_nondeterministic); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/eager.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/eager.rs index 1a3ca56c2..6d3c3c1cb 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/eager.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/eager.rs @@ -1,6 +1,7 @@ use crate::common::{CoreWfStarter, NAMESPACE, get_integ_server_options}; use std::time::Duration; use temporalio_client::WorkflowClientTrait; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{WfContext, WorkflowResult}; pub(crate) async fn eager_wf(_context: WfContext) -> WorkflowResult<()> { @@ -14,7 +15,9 @@ async fn eager_wf_start() { starter.workflow_options.enable_eager_workflow_start = true; // hang the test if eager task dispatch failed starter.workflow_options.task_timeout = Some(Duration::from_secs(1500)); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), eager_wf); starter.eager_start_with_worker(wf_name, &mut worker).await; @@ -28,7 +31,9 @@ async fn eager_wf_start_different_clients() { starter.workflow_options.enable_eager_workflow_start = true; // hang the test if wf task needs retry starter.workflow_options.task_timeout = Some(Duration::from_secs(1500)); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), eager_wf); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/modify_wf_properties.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/modify_wf_properties.rs index e1ee71c2e..01d29f6b1 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/modify_wf_properties.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/modify_wf_properties.rs @@ -9,6 +9,7 @@ use temporalio_common::protos::{ enums::v1::EventType, }, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{WfContext, WorkflowResult}; use temporalio_sdk_core::test_help::MockPollCfg; use uuid::Uuid; @@ -29,7 +30,9 @@ async fn sends_modify_wf_props() { let wf_name = "can_upsert_memo"; let wf_id = Uuid::new_v4(); let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name, memo_upserter); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/nexus.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/nexus.rs index a9b6a0b58..42ec1216c 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/nexus.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/nexus.rs @@ -12,6 +12,7 @@ use std::{ time::Duration, }; use temporalio_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions}; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ errors::PollError, protos::{ @@ -57,7 +58,11 @@ async fn nexus_basic( ) { let wf_name = "nexus_basic"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter.worker_config.task_types(WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: true, + }); let mut worker = starter.worker().await; let core_worker = starter.get_worker().await; @@ -202,7 +207,11 @@ async fn nexus_async( ) { let wf_name = "nexus_async"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter.worker_config.task_types(WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: true, + }); let mut worker = starter.worker().await; let core_worker = starter.get_worker().await; @@ -429,7 +438,11 @@ async fn nexus_async( async fn nexus_cancel_before_start() { let wf_name = "nexus_cancel_before_start"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter.worker_config.task_types(WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: true, + }); let mut worker = starter.worker().await; let endpoint = mk_nexus_endpoint(&mut starter).await; @@ -471,7 +484,11 @@ async fn nexus_cancel_before_start() { async fn nexus_must_complete_task_to_shutdown(#[values(true, false)] use_grace_period: bool) { let wf_name = "nexus_must_complete_task_to_shutdown"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter.worker_config.task_types(WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: true, + }); if use_grace_period { starter .worker_config @@ -571,7 +588,11 @@ async fn nexus_cancellation_types( ) { let wf_name = "nexus_cancellation_types"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter.worker_config.task_types(WorkerTaskTypes { + enable_workflows: true, + enable_activities: false, + enable_nexus: true, + }); let mut worker = starter.worker().await; let core_worker = starter.get_worker().await; diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs index 96371f014..4b700c2e1 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs @@ -29,6 +29,8 @@ use temporalio_common::protos::{ }, }, }; + +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{ActivityOptions, WfContext, WorkflowResult}; use temporalio_sdk_core::test_help::{CoreInternalFlags, MockPollCfg, ResponseType}; use tokio::{join, sync::Notify}; @@ -55,7 +57,9 @@ pub(crate) async fn changes_wf(ctx: WfContext) -> WorkflowResult<()> { async fn writes_change_markers() { let wf_name = "writes_change_markers"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), changes_wf); @@ -89,7 +93,9 @@ pub(crate) async fn no_change_then_change_wf(ctx: WfContext) -> WorkflowResult<( async fn can_add_change_markers() { let wf_name = "can_add_change_markers"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), no_change_then_change_wf); @@ -113,7 +119,9 @@ pub(crate) async fn replay_with_change_marker_wf(ctx: WfContext) -> WorkflowResu async fn replaying_with_patch_marker() { let wf_name = "replaying_with_patch_marker"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), replay_with_change_marker_wf); @@ -132,7 +140,7 @@ async fn patched_on_second_workflow_task_is_deterministic() { starter .worker_config .max_cached_workflows(0_usize) - .no_remote_activities(true); + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; // Include a task failure as well to make sure that works static FAIL_ONCE: AtomicBool = AtomicBool::new(true); @@ -155,7 +163,9 @@ async fn patched_on_second_workflow_task_is_deterministic() { async fn can_remove_deprecated_patch_near_other_patch() { let wf_name = "can_add_change_markers"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let did_die = Arc::new(AtomicBool::new(false)); worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| { @@ -186,7 +196,9 @@ async fn can_remove_deprecated_patch_near_other_patch() { async fn deprecated_patch_removal() { let wf_name = "deprecated_patch_removal"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; let client = starter.get_client().await; let wf_id = starter.get_task_queue().to_string(); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/resets.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/resets.rs index b6ad44a74..2b31d9fd0 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/resets.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/resets.rs @@ -17,6 +17,8 @@ use temporalio_common::protos::{ common::v1::WorkflowExecution, workflowservice::v1::ResetWorkflowExecutionRequest, }, }; + +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{LocalActivityOptions, WfContext}; use tokio::sync::Notify; use tonic::IntoRequest; @@ -27,7 +29,9 @@ const POST_RESET_SIG: &str = "post-reset"; async fn reset_workflow() { let wf_name = "reset_me_wf"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.fetch_results = false; let notify = Arc::new(Notify::new()); @@ -113,7 +117,9 @@ async fn reset_workflow() { async fn reset_randomseed() { let wf_name = "reset_randomseed"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.fetch_results = false; let notify = Arc::new(Notify::new()); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/signals.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/signals.rs index df49fa4ce..fe6ee4c10 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/signals.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/signals.rs @@ -16,6 +16,8 @@ use temporalio_common::protos::{ enums::v1::{CommandType, EventType}, }, }; + +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{ CancellableFuture, ChildWorkflowOptions, Signal, SignalWorkflowOptions, WfContext, WorkflowResult, @@ -46,7 +48,9 @@ async fn signal_sender(ctx: WfContext) -> WorkflowResult<()> { async fn sends_signal_to_missing_wf() { let wf_name = "sends_signal_to_missing_wf"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), signal_sender); @@ -85,7 +89,9 @@ async fn signal_with_create_wf_receiver(ctx: WfContext) -> WorkflowResult<()> { #[tokio::test] async fn sends_signal_to_other_wf() { let mut starter = CoreWfStarter::new("sends_signal_to_other_wf"); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf("sender", signal_sender); worker.register_wf("receiver", signal_receiver); @@ -114,7 +120,9 @@ async fn sends_signal_to_other_wf() { #[tokio::test] async fn sends_signal_with_create_wf() { let mut starter = CoreWfStarter::new("sends_signal_with_create_wf"); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf("receiver_signal", signal_with_create_wf_receiver); @@ -160,7 +168,9 @@ async fn signals_child(ctx: WfContext) -> WorkflowResult<()> { #[tokio::test] async fn sends_signal_to_child() { let mut starter = CoreWfStarter::new("sends_signal_to_child"); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf("child_signaler", signals_child); worker.register_wf("child_receiver", signal_receiver); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/stickyness.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/stickyness.rs index ee65b07d3..e00c976c2 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/stickyness.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/stickyness.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; use temporalio_client::WorkflowOptions; -use temporalio_common::worker::PollerBehavior; +use temporalio_common::worker::{PollerBehavior, WorkerTaskTypes}; use temporalio_sdk::{WfContext, WorkflowResult}; use tokio::sync::Barrier; @@ -14,7 +14,7 @@ async fn timer_workflow_not_sticky() { let mut starter = CoreWfStarter::new(wf_name); starter .worker_config - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) .max_cached_workflows(0_usize); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), timer_wf); @@ -42,7 +42,9 @@ async fn timer_workflow_timeout_on_sticky() { // on a not-sticky queue let wf_name = "timer_workflow_timeout_on_sticky"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); starter.workflow_options.task_timeout = Some(Duration::from_secs(2)); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), timer_timeout_wf); @@ -59,7 +61,7 @@ async fn cache_miss_ok() { let mut starter = CoreWfStarter::new(wf_name); starter .worker_config - .no_remote_activities(true) + .task_types(WorkerTaskTypes::workflow_only()) .max_outstanding_workflow_tasks(2_usize) .max_cached_workflows(0_usize) .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs index 1a991a23a..e5eacfc7f 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs @@ -1,6 +1,6 @@ -use std::time::Duration; - use crate::common::{CoreWfStarter, build_fake_sdk, init_core_and_create_wf}; +use std::time::Duration; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ prost_dur, protos::{ @@ -28,7 +28,9 @@ pub(crate) async fn timer_wf(command_sink: WfContext) -> WorkflowResult<()> { async fn timer_workflow_workflow_driver() { let wf_name = "timer_wf_new"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), timer_wf); @@ -40,7 +42,9 @@ async fn timer_workflow_workflow_driver() { async fn timer_workflow_manual() { let mut starter = init_core_and_create_wf("timer_workflow").await; let core = starter.get_worker().await; - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let task = core.poll_workflow_activation().await.unwrap(); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( task.run_id, @@ -64,7 +68,9 @@ async fn timer_workflow_manual() { async fn timer_cancel_workflow() { let mut starter = init_core_and_create_wf("timer_cancel_workflow").await; let core = starter.get_worker().await; - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let task = core.poll_workflow_activation().await.unwrap(); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( task.run_id, @@ -123,7 +129,9 @@ async fn parallel_timer_wf(command_sink: WfContext) -> WorkflowResult<()> { async fn parallel_timers() { let wf_name = "parallel_timers"; let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), parallel_timer_wf); diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/upsert_search_attrs.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/upsert_search_attrs.rs index e839895e2..af8551525 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/upsert_search_attrs.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/upsert_search_attrs.rs @@ -14,6 +14,7 @@ use temporalio_common::protos::{ enums::v1::EventType, }, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::{WfContext, WfExitValue, WorkflowResult}; use temporalio_sdk_core::test_help::MockPollCfg; use uuid::Uuid; @@ -44,7 +45,9 @@ async fn sends_upsert() { let wf_name = "sends_upsert_search_attrs"; let wf_id = Uuid::new_v4(); let mut starter = CoreWfStarter::new(wf_name); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name, search_attr_updater); diff --git a/crates/sdk-core/tests/manual_tests.rs b/crates/sdk-core/tests/manual_tests.rs index 86fb86a53..3dfc6d510 100644 --- a/crates/sdk-core/tests/manual_tests.rs +++ b/crates/sdk-core/tests/manual_tests.rs @@ -20,6 +20,7 @@ use std::{ time::{Duration, Instant}, }; use temporalio_client::{GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowOptions}; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_common::{ protos::coresdk::AsJsonPayloadExt, telemetry::PrometheusExporterOptionsBuilder, worker::PollerBehavior, @@ -214,7 +215,7 @@ async fn poller_load_sustained() { maximum: 200, initial: 5, }) - .no_remote_activities(true); + .task_types(WorkerTaskTypes::workflow_only()); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { let sigchan = ctx.make_signal_channel(SIGNAME).map(Ok); diff --git a/crates/sdk-core/tests/shared_tests/mod.rs b/crates/sdk-core/tests/shared_tests/mod.rs index 16485d964..ccb3d2ee1 100644 --- a/crates/sdk-core/tests/shared_tests/mod.rs +++ b/crates/sdk-core/tests/shared_tests/mod.rs @@ -6,6 +6,7 @@ use temporalio_common::protos::temporal::api::{ enums::v1::{EventType, WorkflowTaskFailedCause::GrpcMessageTooLarge}, history::v1::history_event::Attributes::WorkflowTaskFailedEventAttributes, }; +use temporalio_common::worker::WorkerTaskTypes; use temporalio_sdk::WfContext; pub(crate) mod priority; @@ -15,7 +16,9 @@ pub(crate) async fn grpc_message_too_large() { let mut starter = CoreWfStarter::new_cloud_or_local(wf_name, "") .await .unwrap(); - starter.worker_config.no_remote_activities(true); + starter + .worker_config + .task_types(WorkerTaskTypes::workflow_only()); let mut core = starter.worker().await; static OVERSIZE_GRPC_MESSAGE_RUN: AtomicBool = AtomicBool::new(false);