Skip to content

Commit 7a7eb99

Browse files
committed
Add new task_types
1 parent 850db67 commit 7a7eb99

File tree

6 files changed

+550
-199
lines changed

6 files changed

+550
-199
lines changed

crates/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ test-utilities = ["history_builders"]
2323
anyhow = "1.0"
2424
async-trait = "0.1"
2525
base64 = "0.22"
26+
bitflags = "2.6"
2627
dirs = { version = "6.0", optional = true }
2728
derive_builder = { workspace = true }
2829
derive_more = { workspace = true }

crates/common/src/worker.rs

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
},
99
telemetry::metrics::TemporalMeter,
1010
};
11+
use bitflags::bitflags;
1112
use std::{
1213
any::Any,
1314
collections::{HashMap, HashSet},
@@ -16,6 +17,39 @@ use std::{
1617
time::Duration,
1718
};
1819

20+
bitflags! {
21+
/// Specifies which task types a worker will poll for.
22+
///
23+
/// This allows fine-grained control over what kinds of work this worker handles.
24+
/// Workers can be configured to handle any combination of workflows, activities, and nexus operations.
25+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
26+
pub struct WorkerTaskTypes: u8 {
27+
/// Poll for workflow tasks
28+
const WORKFLOWS = 0b001;
29+
/// Poll for activity tasks (remote activities)
30+
const ACTIVITIES = 0b010;
31+
/// Poll for nexus tasks
32+
const NEXUS = 0b100;
33+
}
34+
}
35+
36+
impl WorkerTaskTypes {
37+
/// Returns true if this worker should poll for workflow tasks
38+
pub fn polls_workflows(&self) -> bool {
39+
self.contains(Self::WORKFLOWS)
40+
}
41+
42+
/// Returns true if this worker should poll for activity tasks
43+
pub fn polls_activities(&self) -> bool {
44+
self.contains(Self::ACTIVITIES)
45+
}
46+
47+
/// Returns true if this worker should poll for nexus tasks
48+
pub fn polls_nexus(&self) -> bool {
49+
self.contains(Self::NEXUS)
50+
}
51+
}
52+
1953
/// Defines per-worker configuration options
2054
#[derive(Clone, derive_builder::Builder)]
2155
#[builder(setter(into), build_fn(validate = "Self::validate"))]
@@ -64,8 +98,24 @@ pub struct WorkerConfig {
6498
/// worker's task queue
6599
#[builder(default = "PollerBehavior::SimpleMaximum(5)")]
66100
pub nexus_task_poller_behavior: PollerBehavior,
101+
/// Specifies which task types this worker will poll for.
102+
///
103+
/// By default, workers poll for all task types (workflows, activities, and nexus).
104+
/// You can restrict this to any combination. For example, a workflow-only worker would use
105+
/// `WorkerTaskTypes::WORKFLOWS`, while a worker handling activities and nexus but not workflows
106+
/// would use `WorkerTaskTypes::ACTIVITIES | WorkerTaskTypes::NEXUS`. If using in combination
107+
/// with `no_remote_activites`, it must be ensured that `ACTIVITIES` is not selected, as
108+
/// validation will fail.
109+
///
110+
/// Note: At least one task type must be specified or the worker will fail validation.
111+
#[builder(default = "WorkerTaskTypes::all()")]
112+
pub task_types: WorkerTaskTypes,
67113
/// If set to true this worker will only handle workflow tasks and local activities, it will not
68-
/// poll for activity tasks.
114+
/// poll for activity tasks. If this is used with `task_types`, users must ensure
115+
/// `WorkerTaskTypes::ACTIVITIES` is not set.
116+
///
117+
/// This is equivalent to setting
118+
/// `task_types = WorkerTaskTypes::WORKFLOWS | WorkerTaskTypes::NEXUS`.
69119
#[builder(default = "false")]
70120
pub no_remote_activities: bool,
71121
/// How long a workflow task is allowed to sit on the sticky queue before it is timed out
@@ -175,6 +225,18 @@ pub struct WorkerConfig {
175225
}
176226

177227
impl WorkerConfig {
228+
/// Returns the effective task types this worker should poll for, taking into account
229+
/// `no_remote_activities` field.
230+
///
231+
/// If `no_remote_activities` is true, activities are excluded from the task types.
232+
pub fn effective_task_types(&self) -> WorkerTaskTypes {
233+
if self.no_remote_activities {
234+
self.task_types - WorkerTaskTypes::ACTIVITIES
235+
} else {
236+
self.task_types
237+
}
238+
}
239+
178240
/// Returns true if the configuration specifies we should fail a workflow on a certain error
179241
/// type rather than failing the workflow task.
180242
pub fn should_fail_workflow(
@@ -218,6 +280,24 @@ impl WorkerConfigBuilder {
218280
}
219281

220282
fn validate(&self) -> Result<(), String> {
283+
let task_types = self.task_types.unwrap_or_else(WorkerTaskTypes::all);
284+
if task_types.is_empty() {
285+
return Err("At least one task type must be enabled in `task_types`".to_owned());
286+
}
287+
288+
// Handle backward compatibility with no_remote_activities
289+
if let Some(true) = self.no_remote_activities {
290+
// If no_remote_activities is set to true, warn if task_types was also explicitly set
291+
// and includes activities
292+
if self.task_types.is_some() && task_types.contains(WorkerTaskTypes::ACTIVITIES) {
293+
return Err(
294+
"Conflicting configuration: `no_remote_activities` is true but `task_types` includes ACTIVITIES. \
295+
Please update `task_types` to not allow for ACTIVITIES."
296+
.to_owned(),
297+
);
298+
}
299+
}
300+
221301
if let Some(b) = self.workflow_task_poller_behavior.as_ref() {
222302
b.validate()?
223303
}
@@ -249,6 +329,17 @@ impl WorkerConfigBuilder {
249329
return Err("`max_outstanding_nexus_tasks` must be > 0".to_owned());
250330
}
251331

332+
// Validate workflow cache is consistent with task_types
333+
if !task_types.contains(WorkerTaskTypes::WORKFLOWS)
334+
&& let Some(cache) = self.max_cached_workflows.as_ref()
335+
&& *cache > 0
336+
{
337+
return Err(
338+
"Cannot have `max_cached_workflows` > 0 when workflows are not enabled in `task_types`"
339+
.to_owned(),
340+
);
341+
}
342+
252343
if let Some(cache) = self.max_cached_workflows.as_ref()
253344
&& *cache > 0
254345
{
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
use temporalio_common::worker::{WorkerConfigBuilder, WorkerTaskTypes, WorkerVersioningStrategy};
2+
3+
fn default_versioning_strategy() -> WorkerVersioningStrategy {
4+
WorkerVersioningStrategy::None {
5+
build_id: String::new(),
6+
}
7+
}
8+
9+
#[test]
10+
fn test_default_configuration_polls_all_types() {
11+
let config = WorkerConfigBuilder::default()
12+
.namespace("default")
13+
.task_queue("test-queue")
14+
.versioning_strategy(default_versioning_strategy())
15+
.build()
16+
.expect("Failed to build default config");
17+
18+
let effective = config.effective_task_types();
19+
assert!(
20+
effective.polls_workflows(),
21+
"Should poll workflows by default"
22+
);
23+
assert!(
24+
effective.polls_activities(),
25+
"Should poll activities by default"
26+
);
27+
assert!(effective.polls_nexus(), "Should poll nexus by default");
28+
}
29+
30+
#[test]
31+
fn test_workflow_only_worker() {
32+
let config = WorkerConfigBuilder::default()
33+
.namespace("default")
34+
.task_queue("test-queue")
35+
.versioning_strategy(default_versioning_strategy())
36+
.task_types(WorkerTaskTypes::WORKFLOWS)
37+
.max_cached_workflows(0usize)
38+
.build()
39+
.expect("Failed to build workflow-only config");
40+
41+
let effective = config.effective_task_types();
42+
assert!(effective.polls_workflows(), "Should poll workflows");
43+
assert!(!effective.polls_activities(), "Should NOT poll activities");
44+
assert!(!effective.polls_nexus(), "Should NOT poll nexus");
45+
}
46+
47+
#[test]
48+
fn test_activity_and_nexus_worker() {
49+
let config = WorkerConfigBuilder::default()
50+
.namespace("default")
51+
.task_queue("test-queue")
52+
.versioning_strategy(default_versioning_strategy())
53+
.task_types(WorkerTaskTypes::ACTIVITIES | WorkerTaskTypes::NEXUS)
54+
.max_cached_workflows(0usize)
55+
.build()
56+
.expect("Failed to build activity+nexus config");
57+
58+
let effective = config.effective_task_types();
59+
assert!(!effective.polls_workflows(), "Should NOT poll workflows");
60+
assert!(effective.polls_activities(), "Should poll activities");
61+
assert!(effective.polls_nexus(), "Should poll nexus");
62+
}
63+
64+
#[test]
65+
fn test_backward_compatibility_with_no_remote_activities() {
66+
let config = WorkerConfigBuilder::default()
67+
.namespace("default")
68+
.task_queue("test-queue")
69+
.versioning_strategy(default_versioning_strategy())
70+
.no_remote_activities(true)
71+
.build()
72+
.expect("Failed to build config with no_remote_activities");
73+
74+
let effective = config.effective_task_types();
75+
assert!(effective.polls_workflows(), "Should poll workflows");
76+
assert!(!effective.polls_activities(), "Should NOT poll activities");
77+
assert!(effective.polls_nexus(), "Should poll nexus");
78+
}
79+
80+
#[test]
81+
fn test_empty_task_types_fails_validation() {
82+
let result = WorkerConfigBuilder::default()
83+
.namespace("default")
84+
.task_queue("test-queue")
85+
.versioning_strategy(default_versioning_strategy())
86+
.task_types(WorkerTaskTypes::empty())
87+
.build();
88+
89+
assert!(result.is_err(), "Empty task_types should fail validation");
90+
let err = result.err().unwrap().to_string();
91+
assert!(
92+
err.contains("At least one task type"),
93+
"Error should mention task types: {err}",
94+
);
95+
}
96+
97+
#[test]
98+
fn test_workflow_cache_without_workflows_fails() {
99+
let result = WorkerConfigBuilder::default()
100+
.namespace("default")
101+
.task_queue("test-queue")
102+
.versioning_strategy(default_versioning_strategy())
103+
.task_types(WorkerTaskTypes::ACTIVITIES)
104+
.max_cached_workflows(10usize)
105+
.build();
106+
107+
assert!(
108+
result.is_err(),
109+
"Workflow cache > 0 without workflows should fail"
110+
);
111+
let err = result.err().unwrap().to_string();
112+
assert!(
113+
err.contains("max_cached_workflows"),
114+
"Error should mention max_cached_workflows: {err}",
115+
);
116+
}
117+
118+
#[test]
119+
fn test_conflicting_no_remote_activities_and_task_types_fails() {
120+
#[allow(deprecated)]
121+
let result = WorkerConfigBuilder::default()
122+
.namespace("default")
123+
.task_queue("test-queue")
124+
.versioning_strategy(default_versioning_strategy())
125+
.task_types(WorkerTaskTypes::ACTIVITIES)
126+
.no_remote_activities(true)
127+
.build();
128+
129+
assert!(result.is_err(), "Conflicting settings should fail");
130+
131+
let err = result.err().unwrap().to_string();
132+
assert!(
133+
err.contains("Conflicting configuration"),
134+
"Error should mention conflict: {err}",
135+
);
136+
}
137+
138+
#[test]
139+
fn test_all_combinations() {
140+
let combinations = [
141+
(WorkerTaskTypes::WORKFLOWS, "workflows only"),
142+
(WorkerTaskTypes::ACTIVITIES, "activities only"),
143+
(WorkerTaskTypes::NEXUS, "nexus only"),
144+
(
145+
WorkerTaskTypes::WORKFLOWS | WorkerTaskTypes::ACTIVITIES,
146+
"workflows + activities",
147+
),
148+
(
149+
WorkerTaskTypes::WORKFLOWS | WorkerTaskTypes::NEXUS,
150+
"workflows + nexus",
151+
),
152+
(
153+
WorkerTaskTypes::ACTIVITIES | WorkerTaskTypes::NEXUS,
154+
"activities + nexus",
155+
),
156+
(WorkerTaskTypes::all(), "all types"),
157+
];
158+
159+
for (task_types, description) in combinations {
160+
let config = WorkerConfigBuilder::default()
161+
.namespace("default")
162+
.task_queue("test-queue")
163+
.versioning_strategy(default_versioning_strategy())
164+
.task_types(task_types)
165+
.build()
166+
.unwrap_or_else(|e| panic!("Failed to build config for {description}: {e:?}"));
167+
168+
let effective = config.effective_task_types();
169+
assert_eq!(
170+
effective, task_types,
171+
"Effective types should match for {description}",
172+
);
173+
}
174+
}

crates/sdk-core-c-bridge/src/worker.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub struct WorkerOptions {
4343
pub identity_override: ByteArrayRef,
4444
pub max_cached_workflows: u32,
4545
pub tuner: TunerHolder,
46+
pub task_types: u8,
4647
pub no_remote_activities: bool,
4748
pub sticky_queue_schedule_to_start_timeout_millis: u64,
4849
pub max_heartbeat_throttle_interval_millis: u64,
@@ -1183,6 +1184,13 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig {
11831184
.client_identity_override(opt.identity_override.to_option_string())
11841185
.max_cached_workflows(opt.max_cached_workflows as usize)
11851186
.tuner(Arc::new(converted_tuner))
1187+
.task_types({
1188+
if opt.task_types == 0 {
1189+
temporalio_common::worker::WorkerTaskTypes::all()
1190+
} else {
1191+
temporalio_common::worker::WorkerTaskTypes::from_bits_truncate(opt.task_types)
1192+
}
1193+
})
11861194
.no_remote_activities(opt.no_remote_activities)
11871195
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
11881196
opt.sticky_queue_schedule_to_start_timeout_millis,

crates/sdk-core/src/test_help/integ_helpers.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,14 @@ pub fn mock_worker(mocks: MocksHolder) -> Worker {
194194
sticky_q,
195195
mocks.client,
196196
TaskPollers::Mocked {
197-
wft_stream: mocks.inputs.wft_stream,
197+
wft_stream: Some(mocks.inputs.wft_stream),
198198
act_poller,
199-
nexus_poller: mocks
200-
.inputs
201-
.nexus_poller
202-
.unwrap_or_else(|| mock_poller_from_resps([])),
199+
nexus_poller: Some(
200+
mocks
201+
.inputs
202+
.nexus_poller
203+
.unwrap_or_else(|| mock_poller_from_resps([])),
204+
),
203205
},
204206
None,
205207
None,

0 commit comments

Comments
 (0)