Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Changelog

- **Added** `--concurrency-limit` flag to limit the number of tasks running at the same time (defaults to 4) ([#288](https://github.com/voidzero-dev/vite-task/pull/288), [#309](https://github.com/voidzero-dev/vite-task/pull/309))
- **Added** `--parallel` flag to ignore task dependencies and run all tasks at once with unlimited concurrency (unless `--concurrency-limit` is also specified) ([#309](https://github.com/voidzero-dev/vite-task/pull/309))
- **Added** object form for `input` entries: `{ "pattern": "...", "base": "workspace" | "package" }` to resolve glob patterns relative to the workspace root instead of the package directory ([#295](https://github.com/voidzero-dev/vite-task/pull/295))
- **Fixed** arguments after the task name being consumed by `vp` instead of passed through to the task ([#286](https://github.com/voidzero-dev/vite-task/pull/286), [#290](https://github.com/voidzero-dev/vite-task/pull/290))
- **Changed** default untracked env patterns to align with Turborepo, covering more CI and platform-specific variables ([#262](https://github.com/voidzero-dev/vite-task/pull/262))
Expand Down
13 changes: 13 additions & 0 deletions crates/vite_task/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pub struct RunFlags {
/// How task output is displayed.
#[clap(long, default_value = "interleaved")]
pub log: LogMode,

/// Maximum number of tasks to run concurrently. Defaults to 4.
#[clap(long)]
pub concurrency_limit: Option<usize>,

/// Run tasks without dependency ordering. Sets concurrency to unlimited
/// unless `--concurrency-limit` is also specified.
#[clap(long, default_value = "false")]
pub parallel: bool,
}

impl RunFlags {
Expand Down Expand Up @@ -206,6 +215,8 @@ impl ResolvedRunCommand {

let cache_override = self.flags.cache_override();
let include_explicit_deps = !self.flags.ignore_depends_on;
let concurrency_limit = self.flags.concurrency_limit.map(|n| n.max(1));
let parallel = self.flags.parallel;

let (package_query, is_cwd_only) =
self.flags.package_query.into_package_query(task_specifier.package_name, cwd)?;
Expand All @@ -220,6 +231,8 @@ impl ResolvedRunCommand {
plan_options: PlanOptions {
extra_args: self.additional_args.into(),
cache_override,
concurrency_limit,
parallel,
},
},
is_cwd_only,
Expand Down
19 changes: 8 additions & 11 deletions crates/vite_task/src/session/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub enum SpawnOutcome {
Failed,
}

/// Maximum number of tasks that can execute concurrently within a single
/// execution graph level.
const CONCURRENCY_LIMIT: usize = 10;

/// Holds shared references needed during graph execution.
///
/// The `reporter` field is wrapped in `RefCell` because concurrent futures
Expand All @@ -80,25 +76,26 @@ impl ExecutionContext<'_> {
///
/// Uses a DAG scheduler: tasks whose dependencies have all completed are scheduled
/// onto a `FuturesUnordered`, bounded by a per-graph `Semaphore` with
/// [`CONCURRENCY_LIMIT`] permits. Each recursive `Expanded` graph creates its own
/// `concurrency_limit` permits. Each recursive `Expanded` graph creates its own
/// semaphore, so nested graphs have independent concurrency limits.
///
/// Fast-fail: if any task fails, `execute_leaf` cancels the `CancellationToken`
/// (killing in-flight child processes). This method detects the cancellation,
/// closes the semaphore, drains remaining futures, and returns.
#[tracing::instrument(level = "debug", skip_all)]
async fn execute_expanded_graph(&self, graph: &ExecutionGraph) {
if graph.node_count() == 0 {
if graph.graph.node_count() == 0 {
return;
}

let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));
let semaphore =
Arc::new(Semaphore::new(graph.concurrency_limit.min(Semaphore::MAX_PERMITS)));

// Compute dependency count for each node.
// Edge A→B means "A depends on B", so A's dependency count = outgoing edge count.
let mut dep_count: FxHashMap<ExecutionNodeIndex, usize> = FxHashMap::default();
for node_ix in graph.node_indices() {
dep_count.insert(node_ix, graph.neighbors(node_ix).count());
for node_ix in graph.graph.node_indices() {
dep_count.insert(node_ix, graph.graph.neighbors(node_ix).count());
}

let mut futures = FuturesUnordered::new();
Expand All @@ -123,7 +120,7 @@ impl ExecutionContext<'_> {
// Find dependents of the completed node (nodes that depend on it).
// Edge X→completed means "X depends on completed", so X is a predecessor
// in graph direction = neighbor in Incoming direction.
for dependent in graph.neighbors_directed(completed_ix, Direction::Incoming) {
for dependent in graph.graph.neighbors_directed(completed_ix, Direction::Incoming) {
let count = dep_count.get_mut(&dependent).expect("all nodes are in dep_count");
*count -= 1;
if *count == 0 {
Expand Down Expand Up @@ -162,7 +159,7 @@ impl ExecutionContext<'_> {
/// in order; if any item fails, `execute_leaf` cancels the `CancellationToken`
/// and remaining items are skipped (preserving `&&` semantics).
async fn execute_node(&self, graph: &ExecutionGraph, node_ix: ExecutionNodeIndex) {
let task_execution = &graph[node_ix];
let task_execution = &graph.graph[node_ix];

for item in &task_execution.items {
if self.cancellation_token.is_cancelled() {
Expand Down
19 changes: 15 additions & 4 deletions crates/vite_task/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,14 @@ impl<'a> Session<'a> {
let (graph, is_cwd_only) =
self.plan_from_cli_run_resolved(cwd, run_command.clone()).await?;

if graph.node_count() == 0 {
// No tasks matched. With is_cwd_only (no scope flags) the
// task name is a typo — show the selector. Otherwise error.
if is_cwd_only {
if graph.graph.node_count() == 0 {
// No tasks matched. Show the interactive selector only when
// the command has no scope flags and no execution flags
// (concurrency-limit, parallel) — otherwise the user intended
// a specific execution mode and a typo should be an error.
let has_execution_flags = run_command.flags.concurrency_limit.is_some()
|| run_command.flags.parallel;
if is_cwd_only && !has_execution_flags {
let qpr = self.handle_no_task(is_interactive, &run_command).await?;
self.plan_from_query(qpr).await?
} else {
Expand Down Expand Up @@ -508,6 +512,8 @@ impl<'a> Session<'a> {
plan_options: PlanOptions {
extra_args: run_command.additional_args.clone().into(),
cache_override: run_command.flags.cache_override(),
concurrency_limit: None,
parallel: false,
},
})
}
Expand Down Expand Up @@ -589,6 +595,11 @@ impl<'a> Session<'a> {
&self.envs
}

/// Mutably access the environment map, cloning the `Arc` if shared.
pub fn envs_mut(&mut self) -> &mut FxHashMap<Arc<OsStr>, Arc<OsStr>> {
Arc::make_mut(&mut self.envs)
}

pub const fn cwd(&self) -> &Arc<AbsolutePath> {
&self.cwd
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "parallel-execution-test",
"private": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "@parallel/a",
"scripts": {
"build": "vtt barrier ../../.barrier sync 2"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "@parallel/b",
"scripts": {
"build": "vtt barrier ../../.barrier sync 2"
},
"dependencies": {
"@parallel/a": "workspace:*"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
packages:
- packages/*
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Package b depends on a, so without --parallel they run sequentially.
# Both use a barrier requiring 2 participants — if run sequentially the
# first would wait forever and the test would timeout.
# --parallel discards dependency edges, allowing both to run at once.

[[e2e]]
name = "parallel flag runs dependent tasks concurrently"
steps = [["vt", "run", "-r", "--parallel", "build"]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
source: crates/vite_task_bin/tests/e2e_snapshots/main.rs
expression: e2e_outputs
---
> vt run -r --parallel build
~/packages/a$ vtt barrier ../../.barrier sync 2cache disabled
~/packages/b$ vtt barrier ../../.barrier sync 2cache disabled


---
vt run: 0/2 cache hit (0%). (Run `vt run --last-details` for full details)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"cache": false
}
3 changes: 3 additions & 0 deletions crates/vite_task_plan/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ pub enum Error {
#[error("Task \"{0}\" not found")]
NoTasksMatched(Str),

#[error("Invalid value for VP_RUN_CONCURRENCY_LIMIT: {0:?}")]
InvalidConcurrencyLimitEnv(Arc<OsStr>),

/// A cycle was detected in the task dependency graph during planning.
///
/// This is caught by `AcyclicGraph::try_from_graph`, which validates that the
Expand Down
35 changes: 32 additions & 3 deletions crates/vite_task_plan/src/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,44 @@ impl<N, Ix: IndexType> Index<NodeIndex<Ix>> for AcyclicGraph<N, Ix> {
}
}

/// The execution graph type alias, specialized for task execution.
pub type ExecutionGraph = AcyclicGraph<TaskExecution, ExecutionIx>;

impl<N: vite_graph_ser::GetKey + Serialize, Ix: IndexType> Serialize for AcyclicGraph<N, Ix> {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
vite_graph_ser::serialize_by_key(&self.graph, serializer)
}
}

/// The default concurrency limit for task execution within a single graph level.
pub const DEFAULT_CONCURRENCY_LIMIT: usize = 4;

/// An execution graph with a per-level concurrency limit.
///
/// Wraps an [`AcyclicGraph`] of task executions together with the maximum number
/// of tasks that may run concurrently within this graph level. Nested `Expanded`
/// graphs carry their own concurrency limit, enabling per-level control.
#[derive(Debug, Serialize)]
pub struct ExecutionGraph {
/// The underlying acyclic task execution graph.
pub graph: AcyclicGraph<TaskExecution, ExecutionIx>,

/// Maximum number of tasks that can execute concurrently within this graph level.
pub concurrency_limit: usize,
}

impl ExecutionGraph {
/// Validate that `graph` is acyclic and wrap it in an `ExecutionGraph` with
/// the given concurrency limit.
///
/// # Errors
///
/// Returns [`CycleError`] if the graph contains a cycle.
pub fn try_from_graph(
graph: InnerExecutionGraph,
concurrency_limit: usize,
) -> Result<Self, CycleError<ExecutionIx>> {
Ok(Self { graph: AcyclicGraph::try_from_graph(graph)?, concurrency_limit })
}
}

/// Find a cycle in the directed graph, returning the cycle path if one exists.
///
/// Uses a DFS with predecessor tracking. When a back edge `u → v` is detected
Expand Down
6 changes: 1 addition & 5 deletions crates/vite_task_plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{collections::BTreeMap, ffi::OsStr, fmt::Debug, sync::Arc};

use context::PlanContext;
pub use error::Error;
pub use execution_graph::ExecutionGraph;
pub use execution_graph::{DEFAULT_CONCURRENCY_LIMIT, ExecutionGraph};
pub use in_process::InProcessExecution;
pub use path_env::{get_path_env, prepend_path_env};
use plan::{ParentCacheConfig, plan_query_request, plan_synthetic_request};
Expand Down Expand Up @@ -138,10 +138,6 @@ pub enum LeafExecutionKind {
InProcess(InProcessExecution),
}

/// Serialize an `ExecutionGraph` using `serialize_by_key`.
///
/// `vite_graph_ser::serialize_by_key` expects `&DiGraph<N, E, Ix>`, so we call `.inner()`
/// to get the underlying `DiGraph` reference.
/// An execution item, from a split subcommand in a task's command (`item1 && item2 && ...`).
#[derive(Debug, Serialize)]
#[expect(
Expand Down
46 changes: 44 additions & 2 deletions crates/vite_task_plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ async fn plan_task_as_execution_node(
// An empty execution graph means no tasks matched the query.
// At the top level the session shows the task selector UI,
// but in a nested context there is no UI — propagate as an error.
if execution_graph.node_count() == 0 {
if execution_graph.graph.node_count() == 0 {
return Err(Error::NestPlan {
task_display: task_node.task_display.clone(),
command: Str::from(&command_str[add_item_span]),
Expand Down Expand Up @@ -675,6 +675,27 @@ pub async fn plan_query_request(
);
context.set_resolved_global_cache(final_cache);
}
// Resolve effective concurrency for this level.
//
// Priority (highest to lowest):
// 1. `--concurrency-limit N` CLI flag
// 2. `--parallel` (without the above) → unlimited
// 3. `VP_RUN_CONCURRENCY_LIMIT` env var
// 4. `DEFAULT_CONCURRENCY_LIMIT` (4)
let effective_concurrency = match plan_options.concurrency_limit {
Some(n) => n,
None => {
if plan_options.parallel {
usize::MAX
} else {
concurrency_limit_from_env(context.envs())?
.unwrap_or(crate::DEFAULT_CONCURRENCY_LIMIT)
}
}
};

let parallel = plan_options.parallel;

context.set_extra_args(plan_options.extra_args);
context.set_parent_query(Arc::clone(&query));

Expand Down Expand Up @@ -751,10 +772,15 @@ pub async fn plan_query_request(
}
}

// If --parallel, discard all edges so tasks run independently.
if parallel {
inner_graph.clear_edges();
}

// Validate the graph is acyclic.
// `try_from_graph` performs a DFS; if a cycle is found, it returns
// `CycleError` containing the full cycle path as node indices.
ExecutionGraph::try_from_graph(inner_graph).map_err(|cycle| {
ExecutionGraph::try_from_graph(inner_graph, effective_concurrency).map_err(|cycle| {
// Map each execution node index in the cycle path to its human-readable TaskDisplay.
// Every node in the cycle was added via `inner_graph.add_node()` above,
// with a corresponding entry in `execution_node_indices_by_task_index`.
Expand All @@ -778,6 +804,22 @@ pub async fn plan_query_request(
})
}

/// Parse `VP_RUN_CONCURRENCY_LIMIT` from the environment variables.
///
/// Returns `Ok(None)` if the variable is not set.
/// Returns `Err` if the variable is set but cannot be parsed as a positive integer.
#[expect(clippy::result_large_err, reason = "Error type is shared across all plan functions")]
fn concurrency_limit_from_env(
envs: &FxHashMap<Arc<OsStr>, Arc<OsStr>>,
) -> Result<Option<usize>, Error> {
let Some(value) = envs.get(OsStr::new("VP_RUN_CONCURRENCY_LIMIT")) else {
return Ok(None);
};
let s = value.to_str().ok_or_else(|| Error::InvalidConcurrencyLimitEnv(Arc::clone(value)))?;
let n: usize = s.parse().map_err(|_| Error::InvalidConcurrencyLimitEnv(Arc::clone(value)))?;
Ok(Some(n.max(1)))
}

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
Expand Down
7 changes: 7 additions & 0 deletions crates/vite_task_plan/src/plan_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ pub enum CacheOverride {
pub struct PlanOptions {
pub extra_args: Arc<[Str]>,
pub cache_override: CacheOverride,
/// Per-level concurrency limit. `None` means inherit from the parent level
/// (or default to [`crate::DEFAULT_CONCURRENCY_LIMIT`] at the root).
pub concurrency_limit: Option<usize>,
/// When `true`, discard dependency edges between tasks at this level,
/// running all tasks as independent. If `concurrency` is also `None`,
/// this sets the effective concurrency to `usize::MAX`.
pub parallel: bool,
}

#[derive(Debug)]
Expand Down
Loading
Loading