Skip to content

Commit 1c94eeb

Browse files
committed
Remove use of bitflags
1 parent 07538f4 commit 1c94eeb

35 files changed

+503
-227
lines changed

crates/common/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ test-utilities = ["history_builders"]
2323
anyhow = "1.0"
2424
async-trait = "0.1"
2525
base64 = "0.22"
26-
bitflags = "2.6"
2726
dirs = { version = "6.0", optional = true }
2827
derive_builder = { workspace = true }
2928
derive_more = { workspace = true }

crates/common/src/errors.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,14 @@ pub enum CompleteWfError {
4040
/// The run associated with the completion
4141
run_id: String,
4242
},
43+
/// Workflows have not been enabled on this worker.
44+
#[error("Workflows are not enabled on this worker")]
45+
WorkflowNotEnabled,
4346
}
4447

4548
/// Errors thrown by [crate::Worker::complete_activity_task]
4649
#[derive(thiserror::Error, Debug)]
50+
#[allow(clippy::large_enum_variant)]
4751
pub enum CompleteActivityError {
4852
/// Lang SDK sent us a malformed activity completion. This likely means a bug in the lang sdk.
4953
#[error("Lang SDK sent us a malformed activity completion ({reason}): {completion:?}")]
@@ -53,6 +57,9 @@ pub enum CompleteActivityError {
5357
/// The completion, which may not be included to avoid unnecessary copies.
5458
completion: Option<ActivityExecutionResult>,
5559
},
60+
/// Activities have not been enabled on this worker.
61+
#[error("Activities are not enabled on this worker")]
62+
ActivityNotEnabled,
5663
}
5764

5865
/// Errors thrown by [crate::Worker::complete_nexus_task]

crates/common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ pub trait Worker: Send + Sync {
130130
/// workflows & activities until they are done. At that point, the lang SDK can end the process,
131131
/// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the
132132
/// connection and free resources. If you have set [WorkerConfig::task_types] to exclude
133-
/// [WorkerTaskTypes::ACTIVITIES], you may skip calling [Worker::poll_activity_task].
133+
/// [worker_task_types::activities()], you may skip calling [Worker::poll_activity_task].
134134
///
135135
/// Lang implementations should use [Worker::initiate_shutdown] followed by
136136
/// [Worker::finalize_shutdown].

crates/common/src/worker.rs

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::{
88
},
99
telemetry::metrics::TemporalMeter,
1010
};
11-
use bitflags::bitflags;
1211
use std::{
1312
any::Any,
1413
collections::{HashMap, HashSet},
@@ -17,36 +16,70 @@ use std::{
1716
time::Duration,
1817
};
1918

20-
bitflags! {
21-
/// Specifies which task types a worker will poll for.
22-
///
23-
/// This allows fine-grained control over what kinds of work this worker handles.
24-
/// Workers can be configured to handle any combination of workflows, activities, and nexus operations.
25-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
26-
pub struct WorkerTaskTypes: u8 {
27-
/// Poll for workflow tasks
28-
const WORKFLOWS = 0b001;
29-
/// Poll for activity tasks (remote activities)
30-
const ACTIVITIES = 0b010;
31-
/// Poll for nexus tasks
32-
const NEXUS = 0b100;
33-
}
19+
/// Represents a single task type that a worker can poll for
20+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
21+
pub enum WorkerTaskType {
22+
/// Poll for workflow tasks
23+
Workflows,
24+
/// Poll for activity tasks (remote activities)
25+
Activities,
26+
/// Poll for nexus tasks
27+
Nexus,
3428
}
3529

36-
impl WorkerTaskTypes {
37-
/// Returns true if this worker should poll for workflow tasks
38-
pub fn polls_workflows(&self) -> bool {
39-
self.contains(Self::WORKFLOWS)
30+
/// Specifies which task types a worker will poll for.
31+
///
32+
/// This is a set of [WorkerTaskType] values. Workers can be configured to handle
33+
/// any combination of workflows, activities, and nexus operations.
34+
pub type WorkerTaskTypes = HashSet<WorkerTaskType>;
35+
36+
/// Helper functions for working with WorkerTaskTypes
37+
pub mod worker_task_types {
38+
use super::{WorkerTaskType, WorkerTaskTypes};
39+
40+
/// Create a set with all task types enabled
41+
pub fn all() -> WorkerTaskTypes {
42+
[
43+
WorkerTaskType::Workflows,
44+
WorkerTaskType::Activities,
45+
WorkerTaskType::Nexus,
46+
]
47+
.into_iter()
48+
.collect()
49+
}
50+
51+
/// Create a set with only workflow tasks enabled
52+
pub fn workflows() -> WorkerTaskTypes {
53+
[WorkerTaskType::Workflows].into_iter().collect()
54+
}
55+
56+
/// Create a set with only activity tasks enabled
57+
pub fn activities() -> WorkerTaskTypes {
58+
[WorkerTaskType::Activities].into_iter().collect()
4059
}
4160

42-
/// Returns true if this worker should poll for activity tasks
43-
pub fn polls_activities(&self) -> bool {
44-
self.contains(Self::ACTIVITIES)
61+
/// Create a set with only nexus tasks enabled
62+
pub fn nexus() -> WorkerTaskTypes {
63+
[WorkerTaskType::Nexus].into_iter().collect()
4564
}
4665

47-
/// Returns true if this worker should poll for nexus tasks
48-
pub fn polls_nexus(&self) -> bool {
49-
self.contains(Self::NEXUS)
66+
/// Create a set from a bitmask (for C FFI compatibility)
67+
/// 0x01 = Workflows, 0x02 = Activities, 0x04 = Nexus
68+
pub fn from_bits(bits: u8) -> WorkerTaskTypes {
69+
if bits == 0 {
70+
return all();
71+
}
72+
let mut set = WorkerTaskTypes::new();
73+
if bits & 0x01 != 0 {
74+
set.insert(WorkerTaskType::Workflows);
75+
}
76+
if bits & 0x02 != 0 {
77+
set.insert(WorkerTaskType::Activities);
78+
}
79+
if bits & 0x04 != 0 {
80+
set.insert(WorkerTaskType::Nexus);
81+
}
82+
set
5083
}
5184
}
5285

@@ -101,12 +134,10 @@ pub struct WorkerConfig {
101134
/// Specifies which task types this worker will poll for.
102135
///
103136
/// By default, workers poll for all task types (workflows, activities, and nexus).
104-
/// You can restrict this to any combination. For example, a workflow-only worker would use
105-
/// `WorkerTaskTypes::WORKFLOWS`, while a worker handling activities and nexus but not workflows
106-
/// would use `WorkerTaskTypes::ACTIVITIES | WorkerTaskTypes::NEXUS`.
137+
/// You can restrict this to any combination.
107138
///
108139
/// Note: At least one task type must be specified or the worker will fail validation.
109-
#[builder(default = "WorkerTaskTypes::all()")]
140+
#[builder(default = "worker_task_types::all()")]
110141
pub task_types: WorkerTaskTypes,
111142
/// How long a workflow task is allowed to sit on the sticky queue before it is timed out
112143
/// and moved to the non-sticky queue where it may be picked up by any worker.
@@ -258,7 +289,11 @@ impl WorkerConfigBuilder {
258289
}
259290

260291
fn validate(&self) -> Result<(), String> {
261-
let task_types = self.task_types.unwrap_or_else(WorkerTaskTypes::all);
292+
let task_types = self
293+
.task_types
294+
.as_ref()
295+
.cloned()
296+
.unwrap_or_else(worker_task_types::all);
262297
if task_types.is_empty() {
263298
return Err("At least one task type must be enabled in `task_types`".to_owned());
264299
}
@@ -295,7 +330,7 @@ impl WorkerConfigBuilder {
295330
}
296331

297332
// Validate workflow cache is consistent with task_types
298-
if !task_types.contains(WorkerTaskTypes::WORKFLOWS)
333+
if !task_types.contains(&WorkerTaskType::Workflows)
299334
&& let Some(cache) = self.max_cached_workflows.as_ref()
300335
&& *cache > 0
301336
{

crates/common/tests/worker_task_types_test.rs

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use temporalio_common::worker::{WorkerConfigBuilder, WorkerTaskTypes, WorkerVersioningStrategy};
1+
use std::collections::HashSet;
2+
use temporalio_common::worker::{
3+
WorkerConfigBuilder, WorkerTaskType, WorkerTaskTypes, WorkerVersioningStrategy,
4+
worker_task_types,
5+
};
26

37
fn default_versioning_strategy() -> WorkerVersioningStrategy {
48
WorkerVersioningStrategy::None {
@@ -15,16 +19,19 @@ fn test_default_configuration_polls_all_types() {
1519
.build()
1620
.expect("Failed to build default config");
1721

18-
let effective = config.task_types;
22+
let effective = &config.task_types;
1923
assert!(
20-
effective.polls_workflows(),
24+
effective.contains(&WorkerTaskType::Workflows),
2125
"Should poll workflows by default"
2226
);
2327
assert!(
24-
effective.polls_activities(),
28+
effective.contains(&WorkerTaskType::Activities),
2529
"Should poll activities by default"
2630
);
27-
assert!(effective.polls_nexus(), "Should poll nexus by default");
31+
assert!(
32+
effective.contains(&WorkerTaskType::Nexus),
33+
"Should poll nexus by default"
34+
);
2835
}
2936

3037
#[test]
@@ -33,32 +40,53 @@ fn test_workflow_only_worker() {
3340
.namespace("default")
3441
.task_queue("test-queue")
3542
.versioning_strategy(default_versioning_strategy())
36-
.task_types(WorkerTaskTypes::WORKFLOWS)
43+
.task_types(HashSet::from([WorkerTaskType::Workflows]))
3744
.max_cached_workflows(0usize)
3845
.build()
3946
.expect("Failed to build workflow-only config");
4047

41-
let effective = config.task_types;
42-
assert!(effective.polls_workflows(), "Should poll workflows");
43-
assert!(!effective.polls_activities(), "Should NOT poll activities");
44-
assert!(!effective.polls_nexus(), "Should NOT poll nexus");
48+
let effective = &config.task_types;
49+
assert!(
50+
effective.contains(&WorkerTaskType::Workflows),
51+
"Should poll workflows"
52+
);
53+
assert!(
54+
!effective.contains(&WorkerTaskType::Activities),
55+
"Should NOT poll activities"
56+
);
57+
assert!(
58+
!effective.contains(&WorkerTaskType::Nexus),
59+
"Should NOT poll nexus"
60+
);
4561
}
4662

4763
#[test]
4864
fn test_activity_and_nexus_worker() {
65+
let types: WorkerTaskTypes = [WorkerTaskType::Activities, WorkerTaskType::Nexus]
66+
.into_iter()
67+
.collect();
4968
let config = WorkerConfigBuilder::default()
5069
.namespace("default")
5170
.task_queue("test-queue")
5271
.versioning_strategy(default_versioning_strategy())
53-
.task_types(WorkerTaskTypes::ACTIVITIES | WorkerTaskTypes::NEXUS)
72+
.task_types(types)
5473
.max_cached_workflows(0usize)
5574
.build()
5675
.expect("Failed to build activity+nexus config");
5776

58-
let effective = config.task_types;
59-
assert!(!effective.polls_workflows(), "Should NOT poll workflows");
60-
assert!(effective.polls_activities(), "Should poll activities");
61-
assert!(effective.polls_nexus(), "Should poll nexus");
77+
let effective = &config.task_types;
78+
assert!(
79+
!effective.contains(&WorkerTaskType::Workflows),
80+
"Should NOT poll workflows"
81+
);
82+
assert!(
83+
effective.contains(&WorkerTaskType::Activities),
84+
"Should poll activities"
85+
);
86+
assert!(
87+
effective.contains(&WorkerTaskType::Nexus),
88+
"Should poll nexus"
89+
);
6290
}
6391

6492
#[test]
@@ -67,7 +95,7 @@ fn test_empty_task_types_fails_validation() {
6795
.namespace("default")
6896
.task_queue("test-queue")
6997
.versioning_strategy(default_versioning_strategy())
70-
.task_types(WorkerTaskTypes::empty())
98+
.task_types(WorkerTaskTypes::new())
7199
.build();
72100

73101
assert!(result.is_err(), "Empty task_types should fail validation");
@@ -84,7 +112,7 @@ fn test_workflow_cache_without_workflows_fails() {
84112
.namespace("default")
85113
.task_queue("test-queue")
86114
.versioning_strategy(default_versioning_strategy())
87-
.task_types(WorkerTaskTypes::ACTIVITIES)
115+
.task_types(worker_task_types::activities())
88116
.max_cached_workflows(10usize)
89117
.build();
90118

@@ -102,30 +130,36 @@ fn test_workflow_cache_without_workflows_fails() {
102130
#[test]
103131
fn test_all_combinations() {
104132
let combinations = [
105-
(WorkerTaskTypes::WORKFLOWS, "workflows only"),
106-
(WorkerTaskTypes::ACTIVITIES, "activities only"),
107-
(WorkerTaskTypes::NEXUS, "nexus only"),
133+
(worker_task_types::workflows(), "workflows only"),
134+
(worker_task_types::activities(), "activities only"),
135+
(worker_task_types::nexus(), "nexus only"),
108136
(
109-
WorkerTaskTypes::WORKFLOWS | WorkerTaskTypes::ACTIVITIES,
137+
[WorkerTaskType::Workflows, WorkerTaskType::Activities]
138+
.into_iter()
139+
.collect(),
110140
"workflows + activities",
111141
),
112142
(
113-
WorkerTaskTypes::WORKFLOWS | WorkerTaskTypes::NEXUS,
143+
[WorkerTaskType::Workflows, WorkerTaskType::Nexus]
144+
.into_iter()
145+
.collect(),
114146
"workflows + nexus",
115147
),
116148
(
117-
WorkerTaskTypes::ACTIVITIES | WorkerTaskTypes::NEXUS,
149+
[WorkerTaskType::Activities, WorkerTaskType::Nexus]
150+
.into_iter()
151+
.collect(),
118152
"activities + nexus",
119153
),
120-
(WorkerTaskTypes::all(), "all types"),
154+
(worker_task_types::all(), "all types"),
121155
];
122156

123157
for (task_types, description) in combinations {
124158
let config = WorkerConfigBuilder::default()
125159
.namespace("default")
126160
.task_queue("test-queue")
127161
.versioning_strategy(default_versioning_strategy())
128-
.task_types(task_types)
162+
.task_types(task_types.clone())
129163
.build()
130164
.unwrap_or_else(|e| panic!("Failed to build config for {description}: {e:?}"));
131165

crates/sdk-core-c-bridge/src/worker.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,13 +1183,9 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig {
11831183
.client_identity_override(opt.identity_override.to_option_string())
11841184
.max_cached_workflows(opt.max_cached_workflows as usize)
11851185
.tuner(Arc::new(converted_tuner))
1186-
.task_types({
1187-
if opt.task_types == 0 {
1188-
temporalio_common::worker::WorkerTaskTypes::all()
1189-
} else {
1190-
temporalio_common::worker::WorkerTaskTypes::from_bits_truncate(opt.task_types)
1191-
}
1192-
})
1186+
.task_types(temporalio_common::worker::worker_task_types::from_bits(
1187+
opt.task_types,
1188+
))
11931189
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
11941190
opt.sticky_queue_schedule_to_start_timeout_millis,
11951191
))

crates/sdk-core/src/core_tests/activity_tasks.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use temporalio_common::{
5353
},
5454
test_utils::start_timer_cmd,
5555
},
56-
worker::{PollerBehavior, WorkerTaskTypes},
56+
worker::{PollerBehavior, worker_task_types},
5757
};
5858
use tokio::{join, time::sleep};
5959
use tokio_util::sync::CancellationToken;
@@ -725,7 +725,7 @@ async fn no_eager_activities_requested_when_worker_options_disable_it(
725725
mock.worker_cfg(|wc| {
726726
wc.max_cached_workflows = 2;
727727
if reason == "no_remote" {
728-
wc.task_types = WorkerTaskTypes::WORKFLOWS;
728+
wc.task_types = worker_task_types::workflows();
729729
} else {
730730
wc.max_task_queue_activities_per_second = Some(1.0);
731731
}

0 commit comments

Comments
 (0)