Skip to content

Commit 32cdd90

Browse files
authored
💥 Add explicit Worker configuration for type (workflow/activity/nexus) (#1059)
1 parent 850db67 commit 32cdd90

36 files changed

+1019
-277
lines changed

‎crates/common/src/errors.rs‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,14 @@ pub enum CompleteWfError {
4040
/// The run associated with the completion
4141
run_id: String,
4242
},
43+
/// Workflows have not been enabled on this worker.
44+
#[error("Workflows are not enabled on this worker")]
45+
WorkflowNotEnabled,
4346
}
4447

4548
/// Errors thrown by [crate::Worker::complete_activity_task]
4649
#[derive(thiserror::Error, Debug)]
50+
#[allow(clippy::large_enum_variant)]
4751
pub enum CompleteActivityError {
4852
/// Lang SDK sent us a malformed activity completion. This likely means a bug in the lang sdk.
4953
#[error("Lang SDK sent us a malformed activity completion ({reason}): {completion:?}")]
@@ -53,6 +57,9 @@ pub enum CompleteActivityError {
5357
/// The completion, which may not be included to avoid unnecessary copies.
5458
completion: Option<ActivityExecutionResult>,
5559
},
60+
/// Activities have not been enabled on this worker.
61+
#[error("Activities are not enabled on this worker")]
62+
ActivityNotEnabled,
5663
}
5764

5865
/// Errors thrown by [crate::Worker::complete_nexus_task]

‎crates/common/src/lib.rs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ pub trait Worker: Send + Sync {
129129
/// [Worker::complete_workflow_activation] and [Worker::complete_activity_task] for those
130130
/// workflows & activities until they are done. At that point, the lang SDK can end the process,
131131
/// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the
132-
/// connection and free resources. If you have set [WorkerConfig::no_remote_activities], you may
133-
/// skip calling [Worker::poll_activity_task].
132+
/// connection and free resources. If you have set [WorkerConfig::task_types] to exclude
133+
/// [WorkerTaskTypes::activity_only()], you may skip calling [Worker::poll_activity_task].
134134
///
135135
/// Lang implementations should use [Worker::initiate_shutdown] followed by
136136
/// [Worker::finalize_shutdown].

‎crates/common/src/worker.rs‎

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,59 @@ use std::{
1616
time::Duration,
1717
};
1818

19+
/// Specifies which task types a worker will poll for.
20+
///
21+
/// Workers can be configured to handle any combination of workflows, activities, and nexus operations.
22+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
23+
pub struct WorkerTaskTypes {
24+
pub enable_workflows: bool,
25+
pub enable_activities: bool,
26+
pub enable_nexus: bool,
27+
}
28+
29+
impl WorkerTaskTypes {
30+
/// Check if no task types are enabled
31+
pub fn is_empty(&self) -> bool {
32+
!self.enable_workflows && !self.enable_activities && !self.enable_nexus
33+
}
34+
35+
/// Create a config with all task types enabled
36+
pub fn all() -> WorkerTaskTypes {
37+
WorkerTaskTypes {
38+
enable_workflows: true,
39+
enable_activities: true,
40+
enable_nexus: true,
41+
}
42+
}
43+
44+
/// Create a config with only workflow tasks enabled
45+
pub fn workflow_only() -> WorkerTaskTypes {
46+
WorkerTaskTypes {
47+
enable_workflows: true,
48+
enable_activities: false,
49+
enable_nexus: false,
50+
}
51+
}
52+
53+
/// Create a config with only activity tasks enabled
54+
pub fn activity_only() -> WorkerTaskTypes {
55+
WorkerTaskTypes {
56+
enable_workflows: false,
57+
enable_activities: true,
58+
enable_nexus: false,
59+
}
60+
}
61+
62+
/// Create a config with only nexus tasks enabled
63+
pub fn nexus_only() -> WorkerTaskTypes {
64+
WorkerTaskTypes {
65+
enable_workflows: false,
66+
enable_activities: false,
67+
enable_nexus: true,
68+
}
69+
}
70+
}
71+
1972
/// Defines per-worker configuration options
2073
#[derive(Clone, derive_builder::Builder)]
2174
#[builder(setter(into), build_fn(validate = "Self::validate"))]
@@ -64,10 +117,10 @@ pub struct WorkerConfig {
64117
/// worker's task queue
65118
#[builder(default = "PollerBehavior::SimpleMaximum(5)")]
66119
pub nexus_task_poller_behavior: PollerBehavior,
67-
/// If set to true this worker will only handle workflow tasks and local activities, it will not
68-
/// poll for activity tasks.
69-
#[builder(default = "false")]
70-
pub no_remote_activities: bool,
120+
/// Specifies which task types this worker will poll for.
121+
///
122+
/// Note: At least one task type must be specified or the worker will fail validation.
123+
pub task_types: WorkerTaskTypes,
71124
/// How long a workflow task is allowed to sit on the sticky queue before it is timed out
72125
/// and moved to the non-sticky queue where it may be picked up by any worker.
73126
#[builder(default = "Duration::from_secs(10)")]
@@ -218,6 +271,15 @@ impl WorkerConfigBuilder {
218271
}
219272

220273
fn validate(&self) -> Result<(), String> {
274+
let task_types = self
275+
.task_types
276+
.as_ref()
277+
.cloned()
278+
.unwrap_or_else(WorkerTaskTypes::all);
279+
if task_types.is_empty() {
280+
return Err("At least one task type must be enabled in `task_types`".to_owned());
281+
}
282+
221283
if let Some(b) = self.workflow_task_poller_behavior.as_ref() {
222284
b.validate()?
223285
}
@@ -249,6 +311,17 @@ impl WorkerConfigBuilder {
249311
return Err("`max_outstanding_nexus_tasks` must be > 0".to_owned());
250312
}
251313

314+
// Validate workflow cache is consistent with task_types
315+
if !task_types.enable_workflows
316+
&& let Some(cache) = self.max_cached_workflows.as_ref()
317+
&& *cache > 0
318+
{
319+
return Err(
320+
"Cannot have `max_cached_workflows` > 0 when workflows are not enabled in `task_types`"
321+
.to_owned(),
322+
);
323+
}
324+
252325
if let Some(cache) = self.max_cached_workflows.as_ref()
253326
&& *cache > 0
254327
{
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use temporalio_common::worker::{WorkerConfigBuilder, WorkerTaskTypes, WorkerVersioningStrategy};
2+
3+
fn default_versioning_strategy() -> WorkerVersioningStrategy {
4+
WorkerVersioningStrategy::None {
5+
build_id: String::new(),
6+
}
7+
}
8+
9+
#[test]
10+
fn test_default_configuration_polls_all_types() {
11+
let config = WorkerConfigBuilder::default()
12+
.namespace("default")
13+
.task_queue("test-queue")
14+
.versioning_strategy(default_versioning_strategy())
15+
.task_types(WorkerTaskTypes::all())
16+
.build()
17+
.expect("Failed to build default config");
18+
19+
let effective = &config.task_types;
20+
assert!(
21+
effective.enable_workflows,
22+
"Should poll workflows by default"
23+
);
24+
assert!(
25+
effective.enable_activities,
26+
"Should poll activities by default"
27+
);
28+
assert!(effective.enable_nexus, "Should poll nexus by default");
29+
}
30+
31+
#[test]
32+
fn test_empty_task_types_fails_validation() {
33+
let result = WorkerConfigBuilder::default()
34+
.namespace("default")
35+
.task_queue("test-queue")
36+
.versioning_strategy(default_versioning_strategy())
37+
.task_types(WorkerTaskTypes {
38+
enable_workflows: false,
39+
enable_activities: false,
40+
enable_nexus: false,
41+
})
42+
.build();
43+
44+
assert!(result.is_err(), "Empty task_types should fail validation");
45+
let err = result.err().unwrap().to_string();
46+
assert!(
47+
err.contains("At least one task type"),
48+
"Error should mention task types: {err}",
49+
);
50+
}
51+
52+
#[test]
53+
fn test_workflow_cache_without_workflows_fails() {
54+
let result = WorkerConfigBuilder::default()
55+
.namespace("default")
56+
.task_queue("test-queue")
57+
.versioning_strategy(default_versioning_strategy())
58+
.task_types(WorkerTaskTypes::activity_only())
59+
.max_cached_workflows(10usize)
60+
.build();
61+
62+
assert!(
63+
result.is_err(),
64+
"Workflow cache > 0 without workflows should fail"
65+
);
66+
let err = result.err().unwrap().to_string();
67+
assert!(
68+
err.contains("max_cached_workflows"),
69+
"Error should mention max_cached_workflows: {err}",
70+
);
71+
}
72+
73+
#[test]
74+
fn test_all_combinations() {
75+
let combinations = [
76+
(WorkerTaskTypes::workflow_only(), "workflows only"),
77+
(WorkerTaskTypes::activity_only(), "activities only"),
78+
(WorkerTaskTypes::nexus_only(), "nexus only"),
79+
(
80+
WorkerTaskTypes {
81+
enable_workflows: true,
82+
enable_activities: true,
83+
enable_nexus: false,
84+
},
85+
"workflows + activities",
86+
),
87+
(
88+
WorkerTaskTypes {
89+
enable_workflows: true,
90+
enable_activities: false,
91+
enable_nexus: true,
92+
},
93+
"workflows + nexus",
94+
),
95+
(
96+
WorkerTaskTypes {
97+
enable_workflows: false,
98+
enable_activities: true,
99+
enable_nexus: true,
100+
},
101+
"activities + nexus",
102+
),
103+
(WorkerTaskTypes::all(), "all types"),
104+
];
105+
106+
for (task_types, description) in combinations {
107+
let config = WorkerConfigBuilder::default()
108+
.namespace("default")
109+
.task_queue("test-queue")
110+
.versioning_strategy(default_versioning_strategy())
111+
.task_types(task_types)
112+
.build()
113+
.unwrap_or_else(|e| panic!("Failed to build config for {description}: {e:?}"));
114+
115+
let effective = config.task_types;
116+
assert_eq!(
117+
effective, task_types,
118+
"Effective types should match for {description}",
119+
);
120+
}
121+
}

‎crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,12 @@ typedef struct TemporalCoreTunerHolder {
738738
struct TemporalCoreSlotSupplier nexus_task_slot_supplier;
739739
} TemporalCoreTunerHolder;
740740

741+
typedef struct TemporalCoreWorkerTaskTypes {
742+
bool enable_workflows;
743+
bool enable_activities;
744+
bool enable_nexus;
745+
} TemporalCoreWorkerTaskTypes;
746+
741747
typedef struct TemporalCorePollerBehaviorSimpleMaximum {
742748
uintptr_t simple_maximum;
743749
} TemporalCorePollerBehaviorSimpleMaximum;
@@ -765,7 +771,7 @@ typedef struct TemporalCoreWorkerOptions {
765771
struct TemporalCoreByteArrayRef identity_override;
766772
uint32_t max_cached_workflows;
767773
struct TemporalCoreTunerHolder tuner;
768-
bool no_remote_activities;
774+
struct TemporalCoreWorkerTaskTypes task_types;
769775
uint64_t sticky_queue_schedule_to_start_timeout_millis;
770776
uint64_t max_heartbeat_throttle_interval_millis;
771777
uint64_t default_heartbeat_throttle_interval_millis;

‎crates/sdk-core-c-bridge/src/worker.rs‎

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub struct WorkerOptions {
4343
pub identity_override: ByteArrayRef,
4444
pub max_cached_workflows: u32,
4545
pub tuner: TunerHolder,
46-
pub no_remote_activities: bool,
46+
pub task_types: WorkerTaskTypes,
4747
pub sticky_queue_schedule_to_start_timeout_millis: u64,
4848
pub max_heartbeat_throttle_interval_millis: u64,
4949
pub default_heartbeat_throttle_interval_millis: u64,
@@ -58,6 +58,23 @@ pub struct WorkerOptions {
5858
pub nondeterminism_as_workflow_fail_for_types: ByteArrayRefArray,
5959
}
6060

61+
#[repr(C)]
62+
pub struct WorkerTaskTypes {
63+
pub enable_workflows: bool,
64+
pub enable_activities: bool,
65+
pub enable_nexus: bool,
66+
}
67+
68+
impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes {
69+
fn from(t: &WorkerTaskTypes) -> Self {
70+
Self {
71+
enable_workflows: t.enable_workflows,
72+
enable_activities: t.enable_activities,
73+
enable_nexus: t.enable_nexus,
74+
}
75+
}
76+
}
77+
6178
#[repr(C)]
6279
pub struct PollerBehaviorSimpleMaximum {
6380
pub simple_maximum: usize,
@@ -1183,7 +1200,9 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig {
11831200
.client_identity_override(opt.identity_override.to_option_string())
11841201
.max_cached_workflows(opt.max_cached_workflows as usize)
11851202
.tuner(Arc::new(converted_tuner))
1186-
.no_remote_activities(opt.no_remote_activities)
1203+
.task_types(temporalio_common::worker::WorkerTaskTypes::from(
1204+
&opt.task_types,
1205+
))
11871206
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
11881207
opt.sticky_queue_schedule_to_start_timeout_millis,
11891208
))

‎crates/sdk-core/src/core_tests/activity_tasks.rs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use temporalio_common::{
5353
},
5454
test_utils::start_timer_cmd,
5555
},
56-
worker::PollerBehavior,
56+
worker::{PollerBehavior, WorkerTaskTypes},
5757
};
5858
use tokio::{join, time::sleep};
5959
use tokio_util::sync::CancellationToken;
@@ -725,7 +725,7 @@ async fn no_eager_activities_requested_when_worker_options_disable_it(
725725
mock.worker_cfg(|wc| {
726726
wc.max_cached_workflows = 2;
727727
if reason == "no_remote" {
728-
wc.no_remote_activities = true;
728+
wc.task_types = WorkerTaskTypes::workflow_only();
729729
} else {
730730
wc.max_task_queue_activities_per_second = Some(1.0);
731731
}

0 commit comments

Comments
 (0)