diff --git a/Cargo.lock b/Cargo.lock index 4eefa2c3..2f0f4470 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1362,10 +1362,11 @@ dependencies = [ "derive_more", "futures", "rusqlite", - "tar", "tempfile", "tokio", + "tokio-util", "tracing", + "uuid", ] [[package]] diff --git a/crates/hm-dsl-engine/harmont-py/harmont/__init__.py b/crates/hm-dsl-engine/harmont-py/harmont/__init__.py index 9cfc586c..59e17bbc 100644 --- a/crates/hm-dsl-engine/harmont-py/harmont/__init__.py +++ b/crates/hm-dsl-engine/harmont-py/harmont/__init__.py @@ -51,6 +51,7 @@ CacheNone, CacheOnChange, CachePolicy, + CachePredicate, CacheTTL, ) from .triggers import pull_request, push @@ -58,6 +59,7 @@ from .types import Pipeline if TYPE_CHECKING: + from collections.abc import Callable from datetime import timedelta @@ -158,6 +160,12 @@ def forever(env_keys: tuple[str, ...] = ()) -> CacheForever: itself encodes the version (e.g. downloading a pinned binary). Do not use for steps that fetch mutable remote resources. + On a cache hit only SYSTEM state (the snapshot) is restored — files the + step wrote into the workspace are not replayed across runs; downstream + steps see the current run's source tree instead. Write build outputs to + system paths (e.g. ``/usr/local``) or use ``on_change`` if the step's + workspace outputs must survive cache hits. + Args: env_keys: Environment variable names whose values are folded into the cache key. Use this when the command's behavior varies by @@ -196,6 +204,28 @@ def compose(*policies: CachePolicy) -> CacheCompose: return CacheCompose(policies=tuple(policies)) +def predicate(fn: Callable[[], str]) -> CachePredicate: + """Cache policy that invalidates when ``fn()`` returns a new value. + + ``fn`` is called once at plan time. Its string return value is + folded into the cache key. + + Example:: + + import json + + def package_manager(): + with open("package.json") as f: + return json.load(f).get("packageManager", "") + + step.run("install", cache=hm.predicate(package_manager)) + """ + if not callable(fn): + msg = f"hm.predicate() requires a callable, got {type(fn).__name__}" + raise TypeError(msg) + return CachePredicate(fn=fn) + + def timeout(duration: str | int | timedelta, step: Step) -> Step: """Apply a wall-clock timeout to a single step. @@ -305,6 +335,7 @@ def group(steps: list[Step] | tuple[Step, ...]) -> tuple[Step, ...]: "CacheNone", "CacheOnChange", "CachePolicy", + "CachePredicate", "CacheTTL", "JsProject", "Pipeline", @@ -324,6 +355,7 @@ def group(steps: list[Step] | tuple[Step, ...]) -> tuple[Step, ...]: "pipeline", "pipeline_to_json", "pr", + "predicate", "pull_request", "push", "py", diff --git a/crates/hm-dsl-engine/harmont-py/harmont/_pipeline.py b/crates/hm-dsl-engine/harmont-py/harmont/_pipeline.py index cf601d86..62cc15f1 100644 --- a/crates/hm-dsl-engine/harmont-py/harmont/_pipeline.py +++ b/crates/hm-dsl-engine/harmont-py/harmont/_pipeline.py @@ -21,6 +21,7 @@ CacheNone, CacheOnChange, CachePolicy, + CachePredicate, CacheTTL, ) @@ -245,6 +246,16 @@ def _cache_to_dict(policy: CachePolicy) -> dict[str, Any]: "policy": "compose", "sub_policies": [_cache_to_dict(p) for p in policy.policies], } + if isinstance(policy, CachePredicate): + try: + value = policy.fn() # type: ignore[operator] + except Exception as exc: + msg = f"hm.predicate() callback raised {type(exc).__name__}: {exc}" + raise ValueError(msg) from exc + if value is None: + msg = "hm.predicate() callback returned None; it must return a string" + raise ValueError(msg) + return {"policy": "predicate", "value": str(value)} msg = f"unknown CachePolicy: {type(policy).__name__}" raise TypeError(msg) diff --git a/crates/hm-dsl-engine/harmont-py/harmont/cache.py b/crates/hm-dsl-engine/harmont-py/harmont/cache.py index 1b9da422..878acfc0 100644 --- a/crates/hm-dsl-engine/harmont-py/harmont/cache.py +++ b/crates/hm-dsl-engine/harmont-py/harmont/cache.py @@ -33,6 +33,12 @@ class CacheForever(CachePolicy): Use for pure computations whose only inputs are visible to the planner. DO NOT use for installs that fetch the public internet — package repos drift; manual cache busts will be needed. + + On a cache hit only SYSTEM state (the snapshot) is restored — files the + step wrote into the workspace are not replayed across runs; downstream + steps see the current run's source tree instead. Write build outputs to + system paths (e.g. ``/usr/local``) or use ``on_change`` if the step's + workspace outputs must survive cache hits. """ env_keys: tuple[str, ...] = () @@ -78,3 +84,14 @@ class CacheCompose(CachePolicy): """ policies: tuple[CachePolicy, ...] + + +@dataclass(frozen=True) +class CachePredicate(CachePolicy): + """Cache keyed on the return value of a user-supplied function. + + The function is called at plan time. If it returns a different value + than the previous run, the cache key changes and the step re-runs. + """ + + fn: object # Callable[[], str] — called at plan time diff --git a/crates/hm-dsl-engine/harmont-py/harmont/keygen.py b/crates/hm-dsl-engine/harmont-py/harmont/keygen.py index f7285c33..14096ea7 100644 --- a/crates/hm-dsl-engine/harmont-py/harmont/keygen.py +++ b/crates/hm-dsl-engine/harmont-py/harmont/keygen.py @@ -130,6 +130,8 @@ def _resolve_policy( for sub in subs ] return "compose-" + _sha256_hex("".join(parts)) + if kind == "predicate": + return "predicate-" + _sha256_hex(policy["value"]) msg = f"resolve-policy-key: unknown policy {kind!r}" raise ValueError(msg) diff --git a/crates/hm-dsl-engine/harmont-py/tests/test_cache_predicate.py b/crates/hm-dsl-engine/harmont-py/tests/test_cache_predicate.py new file mode 100644 index 00000000..9bfc4e49 --- /dev/null +++ b/crates/hm-dsl-engine/harmont-py/tests/test_cache_predicate.py @@ -0,0 +1,129 @@ +"""Tests for the ``predicate`` cache policy.""" + +from __future__ import annotations + +import hashlib +from pathlib import Path + +import harmont as hm +from harmont._pipeline import _cache_to_dict +from harmont.cache import CachePredicate +from harmont.keygen import resolve_pipeline_keys + +NUL = "\x00" + + +def _sha256_hex(s: str) -> str: + return hashlib.sha256(s.encode("utf-8")).hexdigest() + + +def _make_graph(nodes, edges=None): + return { + "nodes": nodes, + "node_holes": [], + "edge_property": "directed", + "edges": edges or [], + } + + +# -- factory ----------------------------------------------------------------- + + +def test_predicate_factory_returns_cache_predicate(): + policy = hm.predicate(lambda: "v1") + assert isinstance(policy, CachePredicate) + + +# -- lowering ---------------------------------------------------------------- + + +def test_predicate_lowering_calls_fn_and_produces_dict(): + policy = hm.predicate(lambda: "v1") + d = _cache_to_dict(policy) + assert d == {"policy": "predicate", "value": "v1"} + + +def test_predicate_lowering_stringifies_return_value(): + policy = hm.predicate(lambda: 42) + d = _cache_to_dict(policy) + assert d == {"policy": "predicate", "value": "42"} + + +def test_predicate_fn_is_called_at_lowering_time(): + calls: list[int] = [] + + def counter(): + calls.append(1) + return "x" + + policy = hm.predicate(counter) + assert len(calls) == 0 + _cache_to_dict(policy) + assert len(calls) == 1 + + +# -- keygen ------------------------------------------------------------------ + + +def test_predicate_keygen_produces_deterministic_key(): + graph = _make_graph( + [ + { + "step": { + "key": "a", + "cmd": "echo", + "cache": {"policy": "predicate", "value": "v1"}, + }, + "env": {}, + }, + ] + ) + out = resolve_pipeline_keys( + graph, + pipeline_org="default", + pipeline_slug="default", + now=0, + base_path=Path("/tmp"), # noqa: S108 + env={}, + ) + policy_res = "predicate-" + _sha256_hex("v1") + expected = _sha256_hex( + "default" + NUL + "default" + NUL + "a" + NUL + "scratch" + NUL + policy_res + ) + assert out["nodes"][0]["step"]["cache"]["key"] == expected + + +def test_different_predicate_values_produce_different_keys(): + def make_graph(value): + return _make_graph( + [ + { + "step": { + "key": "a", + "cmd": "echo", + "cache": {"policy": "predicate", "value": value}, + }, + "env": {}, + }, + ] + ) + + g1 = make_graph("v1") + g2 = make_graph("v2") + resolve_pipeline_keys( + g1, + pipeline_org="o", + pipeline_slug="s", + now=0, + base_path=Path("/tmp"), # noqa: S108 + env={}, + ) + resolve_pipeline_keys( + g2, + pipeline_org="o", + pipeline_slug="s", + now=0, + base_path=Path("/tmp"), # noqa: S108 + env={}, + ) + assert g1["nodes"][0]["step"]["cache"]["key"] != g2["nodes"][0]["step"]["cache"]["key"] diff --git a/crates/hm-dsl-engine/harmont-ts/src/cache.ts b/crates/hm-dsl-engine/harmont-ts/src/cache.ts index fc423c6d..40da0ce1 100644 --- a/crates/hm-dsl-engine/harmont-ts/src/cache.ts +++ b/crates/hm-dsl-engine/harmont-ts/src/cache.ts @@ -19,8 +19,22 @@ export interface CacheCompose { readonly policies: readonly CachePolicy[]; } -export type CachePolicy = CacheForever | CacheTTL | CacheOnChange | CacheCompose; - +export interface CachePredicate { + readonly kind: "predicate"; + readonly value: string; +} + +export type CachePolicy = CacheForever | CacheTTL | CacheOnChange | CacheCompose | CachePredicate; + +/** + * Create a permanent cache policy, keyed on (command, parent, envKeys). + * + * On a cache hit only SYSTEM state (the snapshot) is restored — files the + * step wrote into the workspace are not replayed across runs; downstream + * steps see the current run's source tree instead. Write build outputs to + * system paths (e.g. `/usr/local`) or use `onChange` if the step's + * workspace outputs must survive cache hits. + */ export function forever(opts?: { envKeys?: string[] }): CacheForever { return { kind: "forever", envKeys: opts?.envKeys ?? [] }; } @@ -39,3 +53,7 @@ export function onChange(...paths: string[]): CacheOnChange { export function compose(...policies: CachePolicy[]): CacheCompose { return { kind: "compose", policies }; } + +export function predicate(value: string): CachePredicate { + return { kind: "predicate", value }; +} diff --git a/crates/hm-dsl-engine/harmont-ts/src/index.ts b/crates/hm-dsl-engine/harmont-ts/src/index.ts index c950c850..e6306bfa 100644 --- a/crates/hm-dsl-engine/harmont-ts/src/index.ts +++ b/crates/hm-dsl-engine/harmont-ts/src/index.ts @@ -5,10 +5,12 @@ export { type CacheTTL, type CacheOnChange, type CacheCompose, + type CachePredicate, forever, ttl, onChange, compose, + predicate, } from "./cache.js"; export { type Trigger, diff --git a/crates/hm-dsl-engine/harmont-ts/src/keygen.ts b/crates/hm-dsl-engine/harmont-ts/src/keygen.ts index 2867b3c5..19bfd34d 100644 --- a/crates/hm-dsl-engine/harmont-ts/src/keygen.ts +++ b/crates/hm-dsl-engine/harmont-ts/src/keygen.ts @@ -130,6 +130,10 @@ function resolvePolicy( return "compose-" + sha256hex(parts.join("")); } + if (policy === "predicate") { + return "predicate-" + sha256hex(cache.value as string); + } + throw new Error(`resolve-policy-key: unknown policy "${policy}"`); } diff --git a/crates/hm-dsl-engine/harmont-ts/src/pipeline.ts b/crates/hm-dsl-engine/harmont-ts/src/pipeline.ts index a531fb92..cf6d6500 100644 --- a/crates/hm-dsl-engine/harmont-ts/src/pipeline.ts +++ b/crates/hm-dsl-engine/harmont-ts/src/pipeline.ts @@ -204,5 +204,7 @@ function cachePolicyToDict(policy: CachePolicy): Record { policy: "compose", sub_policies: policy.policies.map(cachePolicyToDict), }; + case "predicate": + return { policy: "predicate", value: policy.value }; } } diff --git a/crates/hm-dsl-engine/harmont-ts/tests/cache.test.ts b/crates/hm-dsl-engine/harmont-ts/tests/cache.test.ts index cee168de..bb0310e4 100644 --- a/crates/hm-dsl-engine/harmont-ts/tests/cache.test.ts +++ b/crates/hm-dsl-engine/harmont-ts/tests/cache.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { forever, ttl, onChange, compose, type CachePolicy } from "../src/cache.js"; +import { forever, ttl, onChange, compose, predicate, type CachePolicy } from "../src/cache.js"; describe("forever", () => { it("creates a forever policy with no env keys", () => { @@ -42,6 +42,19 @@ describe("compose", () => { }); }); +describe("predicate", () => { + it("creates a predicate policy with the given value", () => { + const p = predicate("v1"); + expect(p).toEqual({ kind: "predicate", value: "v1" }); + }); + + it("different values produce distinct objects", () => { + const p1 = predicate("v1"); + const p2 = predicate("v2"); + expect(p1.value).not.toBe(p2.value); + }); +}); + describe("type discrimination", () => { it("kind field enables type narrowing", () => { const p: CachePolicy = forever(); diff --git a/crates/hm-dsl-engine/harmont-ts/tests/keygen.test.ts b/crates/hm-dsl-engine/harmont-ts/tests/keygen.test.ts index 9804a8a0..9d4204e4 100644 --- a/crates/hm-dsl-engine/harmont-ts/tests/keygen.test.ts +++ b/crates/hm-dsl-engine/harmont-ts/tests/keygen.test.ts @@ -6,7 +6,7 @@ import { describe, expect, it, beforeEach } from "vitest"; import { resolvePipelineCacheKeys, type CacheKeyOptions } from "../src/keygen.js"; import { pipeline, type PipelineIR } from "../src/pipeline.js"; import { sh } from "../src/step.js"; -import { forever, ttl, onChange } from "../src/cache.js"; +import { forever, ttl, onChange, predicate } from "../src/cache.js"; function sha256(s: string): string { return createHash("sha256").update(s, "utf8").digest("hex"); @@ -153,6 +153,35 @@ describe("resolvePipelineCacheKeys", () => { expect(cache.key).toBe(expected); }); + it("predicate policy produces correct key", () => { + const ir = pipeline([sh("echo test", { label: "test", cache: predicate("v1") })]); + const opts = makeOpts({ pipelineOrg: "myorg", pipelineSlug: "myslug" }); + + resolvePipelineCacheKeys(ir.graph, opts); + + const stepKey = ir.graph.nodes[0].step.key as string; + const policyRes = "predicate-" + sha256("v1"); + const expected = sha256( + "myorg" + NUL + "myslug" + NUL + stepKey + NUL + "scratch" + NUL + policyRes, + ); + + const cache = ir.graph.nodes[0].step.cache as Record; + expect(cache.key).toBe(expected); + }); + + it("different predicate values produce different keys", () => { + const ir1 = pipeline([sh("echo a", { label: "a", cache: predicate("v1") })]); + const ir2 = pipeline([sh("echo a", { label: "a", cache: predicate("v2") })]); + const opts = makeOpts(); + + resolvePipelineCacheKeys(ir1.graph, opts); + resolvePipelineCacheKeys(ir2.graph, opts); + + const k1 = (ir1.graph.nodes[0].step.cache as Record).key; + const k2 = (ir2.graph.nodes[0].step.cache as Record).key; + expect(k1).not.toBe(k2); + }); + it("child step uses parent resolved key", () => { const base = sh("apt-get install", { label: "apt", cache: forever() }); const child = base.sh("make", { label: "build", cache: forever() }); diff --git a/crates/hm-exec/src/local/backend.rs b/crates/hm-exec/src/local/backend.rs index 0713100c..8fa06127 100644 --- a/crates/hm-exec/src/local/backend.rs +++ b/crates/hm-exec/src/local/backend.rs @@ -48,8 +48,10 @@ impl LocalBackend { /// Build the runner registry, constructing the [`HmVm`] orchestrator /// (VM backend + snapshot registry) and registering the [`VmRunner`] as - /// the default runner. - fn build_registry(&self) -> Result { + /// the default runner. The orchestrator handle is also returned so the + /// run loop can drain its deferred eviction queue once the DAG has + /// finished. + fn build_registry(&self) -> Result<(RunnerRegistry, Arc)> { let cache_dir = hm_util::dirs::hm_cache_dir().ok_or_else(|| { BackendError::Local("cannot resolve the Harmont cache directory".into()) })?; @@ -62,11 +64,11 @@ impl LocalBackend { ..Default::default() }; - let hmvm = HmVm::new(Arc::clone(&self.vm_backend), registry, config); + let hmvm = Arc::new(HmVm::new(Arc::clone(&self.vm_backend), registry, config)); let mut runners = RunnerRegistry::new(); - runners.register(Arc::new(VmRunner::new(Arc::new(hmvm))), true); - Ok(runners) + runners.register(Arc::new(VmRunner::new(Arc::clone(&hmvm))), true); + Ok((runners, hmvm)) } } @@ -81,14 +83,58 @@ impl ExecutionBackend for LocalBackend { } async fn start(&self, req: RunRequest) -> Result { - let registry = Arc::new(self.build_registry()?); + let (registry, hmvm) = self.build_registry()?; + + // Best-effort, detached GC of aged snapshot images in the + // *run-scoped* namespaces only: + // - `harmont-ephemeral`: uncached snapshots whose run died before + // its end-of-run cleanup. + // - `ephemeral`: the EXACT legacy repo pre-fix versions used for + // uncached snapshots (`ephemeral:latest`, `ephemeral:<64hex>`). + // Deliberately not a glob — a wildcard like `ephemeral-*` would + // force-remove unrelated user images. + // These tags are created and removed within a single run, so any + // image older than the 24h floor is residue of a dead run; the + // floor also keeps a starting run from deleting a concurrent run's + // freshly committed, still-in-use ephemeral images. + // + // `harmont-cache/*` is deliberately NOT swept here. The registry DB + // is per-user (`hm_util::dirs::harmont_cache_dir`) while the Docker + // daemon can be shared across users and CI jobs, so absence from + // THIS registry does not prove a cache image is orphaned — sweeping + // would delete other registries' live caches and silently force + // their cached steps to re-run. It would also race deferred LRU + // eviction: an evicted row vanishes from SQLite the moment `put` + // commits, while in-flight steps (ours or a concurrent process's) + // may still restore from the evicted tag until their DAG drains — + // a window no registry-based liveness check can observe. Crash + // residue under `harmont-cache/*` is bounded by the registry + // capacity and reclaimable via `hm cache clean` plus `docker rmi`. + // Failures are logged and never block or fail the run. + let gc_vm = Arc::clone(&hmvm); + tokio::spawn(async move { + #[allow( + clippy::duration_suboptimal_units, + reason = "from_hours is nightly-only" + )] + const GC_AGE: std::time::Duration = std::time::Duration::from_secs(24 * 3600); + for reference in ["harmont-ephemeral", "ephemeral"] { + match gc_vm.gc_orphaned_snapshots(reference, GC_AGE).await { + Ok(0) => {} + Ok(n) => tracing::debug!(reference, removed = n, "snapshot GC removed images"), + Err(e) => tracing::warn!(reference, error = %e, "snapshot GC failed"), + } + } + }); + let registry = Arc::new(registry); let (tx, rx) = mpsc::channel(1024); let cancel = CancellationToken::new(); let parallelism = self.parallelism; let keep_going = req.options.keep_going; let token = cancel.clone(); + let run_vm = Arc::clone(&hmvm); let join = tokio::spawn(async move { - crate::local::run( + let result = crate::local::run( req.plan.graph, req.repo_root, req.pipeline_slug, @@ -96,9 +142,16 @@ impl ExecutionBackend for LocalBackend { registry, tx, token, + Some(run_vm), keep_going, ) - .await + .await; + // Snapshot images evicted from the registry during the run are + // only removed now, strictly after every step has finished: an + // in-flight step may restore from an evicted tag long after the + // eviction (cache-hit outcomes propagate only the tag). + hmvm.cleanup_deferred_evictions().await; + result }); Ok(BackendHandle::spawn(rx, cancel, join)) } diff --git a/crates/hm-exec/src/local/runner/mod.rs b/crates/hm-exec/src/local/runner/mod.rs index 1974e519..b316c06d 100644 --- a/crates/hm-exec/src/local/runner/mod.rs +++ b/crates/hm-exec/src/local/runner/mod.rs @@ -29,6 +29,16 @@ pub struct StepContext { pub event_bus: Arc, pub archives: Arc, pub cancel: CancellationToken, + /// When set, the step should COW-copy this directory as its + /// workspace base instead of copying from the shared source base. + /// Populated by the scheduler for child steps in a `BuildsIn` chain; + /// always a live, run-owned temp directory of a step that executed + /// this run (cache hits propagate `None`). + pub parent_workspace_dir: Option, + /// Lazily-extracted, once-per-run copy of the current source archive. + /// Steps without a `parent_workspace_dir` COW their workspace from + /// this directory, guaranteeing they see the current run's sources. + pub source_base: Arc>, } /// Async trait implemented by step executors (e.g. the VM runner). @@ -167,6 +177,8 @@ mod tests { exit_code: 0, committed_snapshot: None, artifacts: vec![], + workspace_dir: None, + ephemeral_snapshot: false, }) }) } diff --git a/crates/hm-exec/src/local/runner/vm.rs b/crates/hm-exec/src/local/runner/vm.rs index 9c2d2b70..032e2567 100644 --- a/crates/hm-exec/src/local/runner/vm.rs +++ b/crates/hm-exec/src/local/runner/vm.rs @@ -2,7 +2,7 @@ //! //! Each step runs inside a lightweight VM managed by [`HmVm`]. The //! source archive is extracted to a host-side temp directory and -//! injected into the VM before the step command runs. System-level +//! bind-mounted into the VM before the step command runs. System-level //! state propagates via VM snapshots. use std::future::Future; @@ -14,7 +14,7 @@ use hm_plugin_protocol::{ BuildEvent, CacheDecision, ExecutorInput, SnapshotRef, StdStream, StepResult, }; use hm_vm::types::OutputSink; -use hm_vm::{Action, CachingPolicy, HmVm, ImageSource, SnapshotId}; +use hm_vm::{Action, CachingPolicy, HmVm, ImageSource, SnapshotId, WorkspaceMount}; use uuid::Uuid; use super::{StepContext, StepRunner}; @@ -66,6 +66,7 @@ impl StepRunner for VmRunner { } } +#[allow(clippy::too_many_lines)] #[tracing::instrument(skip(vm, ctx), fields(step_key = %input.step.key))] async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Result { let policy = match &input.cache_lookup { @@ -75,6 +76,36 @@ async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Resu CacheDecision::MissNoCommit => CachingPolicy::None, }; + // Fast path: check cache before doing any workspace prep. COW copies + // are expensive and entirely wasted on cache hits. + if let CachingPolicy::Cache { ref key } = policy + && let Some(result) = vm.peek_cache(key).await? + { + ctx.event_bus.emit(BuildEvent::StepCacheHit { + step_id: input.step_id, + key: input + .step + .cache + .as_ref() + .and_then(|c| c.key.clone()) + .unwrap_or_default(), + tag: result + .snapshot + .as_ref() + .map_or_else(String::new, ToString::to_string), + }); + // Cache hits carry no workspace: workspace state is strictly + // run-scoped, so children rebase onto the current source instead + // of inheriting a stale tree from the original run. + return Ok(StepResult { + exit_code: 0, + committed_snapshot: result.snapshot.map(|s| SnapshotRef(s.to_string())), + artifacts: vec![], + workspace_dir: None, + ephemeral_snapshot: false, + }); + } + let source = if let Some(ref snap) = input.parent_snapshot { ImageSource::Snapshot(SnapshotId::new(snap.0.clone())) } else { @@ -87,27 +118,48 @@ async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Resu ) }; - // Inject the current workspace on every executing step, overlaying it - // onto the system state inherited from the parent snapshot (apt packages, - // installed runtimes, `node_modules`, …). Injecting only at the chain root - // is wrong: root steps such as `apt_base` are `CacheForever`, so their - // snapshots freeze the source tree captured at first build and every COW - // descendant inherits that stale tree — source edits never reach leaf - // steps. A true cache hit short-circuits inside `HmVm::execute` before - // inject runs, so this overlay only happens when a step actually executes; - // the overlay (Docker PUT-archive) adds/overwrites files without deleting - // the inherited system state. - let (inject, _temp_guard) = { - let archive_bytes = ctx - .archives - .get_bytes(input.workspace_archive_id) - .ok_or_else(|| anyhow::anyhow!("source archive not found"))?; - let dir = - extract_archive_to_tempdir(&archive_bytes).context("extracting workspace archive")?; - let path = dir.path().to_path_buf(); - (Some(path), Some(dir)) + // Prepare the workspace: COW-copy from the parent step's live workspace + // (child of a step that executed this run) or from the shared + // once-per-run source base (root step, or child of a cache-hit parent). + // Either way, bind-mount the result into the VM. This overlays the + // current source onto the system state inherited from the parent + // snapshot on every executing step, so source edits always reach leaf + // steps even when ancestors are `CacheForever` and froze an older tree. + let step_ws = tempfile::tempdir().context("creating step workspace")?; + + let cow_src: std::path::PathBuf = if let Some(ref parent_ws) = ctx.parent_workspace_dir { + std::path::PathBuf::from(parent_ws) + } else { + let base = ctx + .source_base + .get_or_try_init(|| async { + let archive_bytes = ctx + .archives + .get_bytes(input.workspace_archive_id) + .ok_or_else(|| anyhow::anyhow!("source archive not found"))?; + tokio::task::spawn_blocking(move || extract_archive_to_tempdir(&archive_bytes)) + .await + .context("archive extraction task panicked")? + .context("extracting workspace archive") + }) + .await?; + base.path().to_path_buf() }; + { + let dst = step_ws.path().to_path_buf(); + let src = cow_src.clone(); + tokio::task::spawn_blocking(move || hm_vm::workspace::cow_copy(&src, &dst)) + .await + .context("workspace COW task panicked")? + .with_context(|| format!("COW copy {} into step workspace", cow_src.display()))?; + } + + let workspace = Some(WorkspaceMount { + host_path: step_ws.path().to_path_buf(), + guest_path: input.workdir.clone(), + }); + // Baseline env for shell operation inside VMs. let mut env: Vec<(String, String)> = vec![ ("HOME".into(), "/root".into()), @@ -124,7 +176,7 @@ async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Resu env, working_dir: input.workdir.clone(), timeout: None, - inject, + workspace, }; let sink = EventBusSink { @@ -132,13 +184,17 @@ async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Resu bus: Arc::clone(&ctx.event_bus), }; - let result = tokio::select! { - r = vm.execute(action, policy, &sink) => r, - () = ctx.cancel.cancelled() => { - anyhow::bail!("step cancelled (build timeout or sibling failure)") - } - } - .context("vm execute failed")?; + // Cancellation is cooperative INSIDE `HmVm::execute` (the token is + // threaded down): on Ctrl-C / sibling failure / step timeout it bails + // with exit 130 only after destroying the container, which reclaims + // bind-mount ownership of root-written files. Never `select!`-drop + // this future: doing so would tear down `step_ws` concurrently with a + // still-running container and leak a (root-owned, on native Linux) + // workspace directory. + let result = vm + .execute(action, policy, &sink, &ctx.cancel) + .await + .context("vm execute failed")?; if result.cached { ctx.event_bus.emit(BuildEvent::StepCacheHit { @@ -156,10 +212,20 @@ async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Resu }); } + // Steps that executed successfully (cached-miss and uncached alike) + // keep their live tempdir alive so same-run children can COW-copy + // from it; the scheduler removes every kept dir after the DAG drains. + // Cache hits (rare race: a concurrent fill between peek and execute) + // and failures propagate no workspace -- the TempDir self-cleans. + let workspace_dir = + (result.exit_code == 0 && !result.cached).then(|| step_ws.keep().display().to_string()); + Ok(StepResult { exit_code: result.exit_code, committed_snapshot: result.snapshot.map(|s| SnapshotRef(s.to_string())), artifacts: vec![], + workspace_dir, + ephemeral_snapshot: result.ephemeral_snapshot, }) } diff --git a/crates/hm-exec/src/local/scheduler.rs b/crates/hm-exec/src/local/scheduler.rs index 5d49f8d9..3721204d 100644 --- a/crates/hm-exec/src/local/scheduler.rs +++ b/crates/hm-exec/src/local/scheduler.rs @@ -26,6 +26,7 @@ use std::collections::HashMap; use std::num::NonZeroUsize; use std::path::PathBuf; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use daggy::petgraph::algo::toposort; @@ -60,6 +61,17 @@ struct StepOutcome { /// `None` only for steps short-circuited because a predecessor failed /// or the build was cancelled before they could run. summary: Option, + /// Host-side workspace path produced by this step's runner, if any. + /// The scheduler propagates this to downstream `BuildsIn` children + /// so they can COW-copy instead of re-extracting. By construction it + /// is always a run-owned kept tempdir of a step that executed this + /// run with exit 0; cache hits, skips and failures carry `None`. + /// The dir is deleted as soon as the step's last `BuildsIn` child + /// finishes (refcounted), with an end-of-run sweep as backstop. + workspace_dir: Option, + /// True when the snapshot is ephemeral and must be cleaned up after + /// all downstream steps finish. + ephemeral_snapshot: bool, /// Set when this step did not complete successfully — it failed, timed /// out, was cancelled, or was itself skipped. Descendants gate on this /// (not on `exit_code`) so a skip propagates transitively: a skipped @@ -93,6 +105,7 @@ pub(crate) async fn run( runner_registry: Arc, tx: tokio::sync::mpsc::Sender, cancel: CancellationToken, + vm: Option>, keep_going: bool, ) -> crate::Result { // Set up per-run state. @@ -138,6 +151,8 @@ pub(crate) async fn run( event_bus: bus.clone(), archives: archives.clone(), cancel: cancel.clone(), + parent_workspace_dir: None, + source_base: Arc::new(tokio::sync::OnceCell::new()), }; let semaphore = Arc::new(tokio::sync::Semaphore::new(parallelism.get())); @@ -150,6 +165,27 @@ pub(crate) async fn run( crate::BackendError::Local(format!("pipeline graph has a cycle at {:?}", c.node_id())) })?; + // Per-node refcount of `BuildsIn` children: the only consumers of a + // step's kept workspace dir. Each child decrements its parent's count + // when it finishes (whether it ran, was skipped, or failed); the child + // that drops the count to zero deletes the parent's dir. Steps with no + // `BuildsIn` children free their own dir immediately. This caps the + // run's temp-space footprint at the live DAG frontier instead of + // accumulating one workspace copy per executed step until the end of + // the run (full byte copies on non-reflink filesystems, RAM on + // tmpfs-mounted /tmp). + let ws_consumers: HashMap> = order + .iter() + .map(|&n| { + let count = dag + .children(n) + .iter(dag) + .filter(|(e, _)| dag.edge_weight(*e).copied() == Some(EdgeKind::BuildsIn)) + .count(); + (n, Arc::new(AtomicUsize::new(count))) + }) + .collect(); + let started_at = chrono::Utc::now(); bus.emit(BuildEvent::BuildStart { run_id, @@ -166,11 +202,19 @@ pub(crate) async fn run( let mut done: HashMap = HashMap::new(); for &n in &order { - let preds: Vec<(EdgeKind, StepFuture)> = dag + // (edge kind, parent future, parent's workspace-consumer refcount) + let preds: Vec<(EdgeKind, StepFuture, Arc)> = dag .parents(n) .iter(dag) - .map(|(e, p)| (*dag.edge_weight(e).expect("edge in DAG"), done[&p].clone())) + .map(|(e, p)| { + ( + *dag.edge_weight(e).expect("edge in DAG"), + done[&p].clone(), + Arc::clone(&ws_consumers[&p]), + ) + }) .collect(); + let own_ws_consumers = Arc::clone(&ws_consumers[&n]); let transition = dag[n].clone(); let node_key = transition.step.key.clone(); @@ -190,80 +234,123 @@ pub(crate) async fn run( let fut: StepFuture = async move { // Await all predecessors. let pred_outcomes: Vec = - join_all(preds.iter().map(|(_, f)| f.clone())).await; - - // Early exit if any predecessor failed/was skipped, or the build - // was cancelled. Gating on `failed_or_skipped` (not `exit_code`) - // is what makes the skip propagate transitively: a skipped - // predecessor reports `exit_code == 0`, so an exit-code-only gate - // would let a skipped step's descendants run anyway. - if cancel.is_cancelled() || pred_outcomes.iter().any(|o| o.failed_or_skipped) { - let status = if cancel.is_cancelled() { - StepStatus::Canceled - } else { - StepStatus::Skipped - }; - return StepOutcome { - exit_code: 0, - snapshot: None, - summary: Some(StepResultSummary { - step_id: Uuid::new_v4(), - key: node_key, - status, - exit_code: None, - duration_ms: 0, - }), - failed_or_skipped: true, - }; - } + join_all(preds.iter().map(|(_, f, _)| f.clone())).await; - // Acquire parallelism permit. - let _permit = sem - .acquire_owned() - .await - .expect("semaphore closed unexpectedly"); - - // Find the BuildsIn parent's snapshot for container lineage. - let parent_snapshot = preds - .iter() - .zip(&pred_outcomes) - .find(|((ek, _), _)| *ek == EdgeKind::BuildsIn) - .and_then(|(_, outcome)| outcome.snapshot.clone()); - - match execute_step( - n, - transition, - parent_snapshot, - chain_id, - chain_pos, - parent_key, - archive_id, - run_id, - run_ctx, - reg, - bus, - cancel, - keep_going, - ) - .await - { - Ok(outcome) => outcome, - Err(e) => { - tracing::error!(%e, "step execution failed"); - StepOutcome { - exit_code: 1, + // Run the step (or short-circuit). All exit paths of this inner + // block flow into the workspace refcount release below. + let outcome = async { + // Early exit if any predecessor failed/was skipped, or the build + // was cancelled. Gating on `failed_or_skipped` (not `exit_code`) + // is what makes the skip propagate transitively: a skipped + // predecessor reports `exit_code == 0`, so an exit-code-only gate + // would let a skipped step's descendants run anyway. + if cancel.is_cancelled() || pred_outcomes.iter().any(|o| o.failed_or_skipped) { + let status = if cancel.is_cancelled() { + StepStatus::Canceled + } else { + StepStatus::Skipped + }; + return StepOutcome { + exit_code: 0, snapshot: None, summary: Some(StepResultSummary { step_id: Uuid::new_v4(), - key: node_key, - status: StepStatus::Failed, - exit_code: Some(1), + key: node_key.clone(), + status, + exit_code: None, duration_ms: 0, }), + ephemeral_snapshot: false, + workspace_dir: None, failed_or_skipped: true, + }; + } + + // Acquire parallelism permit. + let _permit = sem + .acquire_owned() + .await + .expect("semaphore closed unexpectedly"); + + // Find the BuildsIn parent's snapshot and workspace dir for + // container lineage and COW workspace propagation. + let (parent_snapshot, parent_workspace_dir) = preds + .iter() + .zip(&pred_outcomes) + .find(|((ek, _, _), _)| *ek == EdgeKind::BuildsIn) + .map_or((None, None), |(_, outcome)| { + (outcome.snapshot.clone(), outcome.workspace_dir.clone()) + }); + + let mut step_ctx = run_ctx.clone(); + step_ctx.parent_workspace_dir = parent_workspace_dir; + + match execute_step( + n, + transition, + parent_snapshot, + chain_id, + chain_pos, + parent_key, + archive_id, + run_id, + step_ctx, + reg, + bus, + cancel, + keep_going, + ) + .await + { + Ok(outcome) => outcome, + Err(e) => { + tracing::error!(%e, "step execution failed"); + StepOutcome { + exit_code: 1, + snapshot: None, + summary: Some(StepResultSummary { + step_id: Uuid::new_v4(), + key: node_key.clone(), + status: StepStatus::Failed, + exit_code: Some(1), + duration_ms: 0, + }), + workspace_dir: None, + ephemeral_snapshot: false, + failed_or_skipped: true, + } } } } + .await; + + // This step is done with its parents' workspaces (the COW copy, + // if any, happened inside the runner). Decrement each BuildsIn + // parent's consumer count; the last child to finish deletes the + // parent's kept dir so temp space tracks the live DAG frontier. + for ((kind, _, counter), pred_outcome) in preds.iter().zip(&pred_outcomes) { + if *kind == EdgeKind::BuildsIn + && counter.fetch_sub(1, Ordering::AcqRel) == 1 + && let Some(ws) = pred_outcome.workspace_dir.clone() + { + tokio::task::spawn_blocking(move || std::fs::remove_dir_all(ws).ok()) + .await + .ok(); + } + } + // No BuildsIn children will ever read this step's workspace: + // free it now. (Children, if any, observe this outcome only + // after this future resolves, so the load cannot race a + // decrement.) + if own_ws_consumers.load(Ordering::Acquire) == 0 + && let Some(ws) = outcome.workspace_dir.clone() + { + tokio::task::spawn_blocking(move || std::fs::remove_dir_all(ws).ok()) + .await + .ok(); + } + + outcome } .boxed() .shared(); @@ -334,6 +421,32 @@ pub(crate) async fn run( let steps: Vec = outcomes.iter().filter_map(|o| o.summary.clone()).collect(); + // Clean up ephemeral Docker snapshots and kept temp workspace dirs. + // Workspace state is strictly run-scoped: every `Some(workspace_dir)` + // names a tempdir kept alive (TempDir::keep) by a step that executed + // this run so children could COW-copy from it. Most dirs were already + // freed incrementally by the last-consumer refcount above; this pass is + // a backstop (`remove_dir_all` on an already-deleted path is a no-op) + // now that all steps have drained. + for outcome in &outcomes { + if outcome.ephemeral_snapshot + && let (Some(vm), Some(snap)) = (vm.as_ref(), outcome.snapshot.as_ref()) + { + // Guarded removal: a demoted-to-ephemeral `harmont-cache/*` tag + // may have been re-registered by a concurrent run since this + // step marked it ephemeral; destroying it would kill that run's + // live cache entry. + vm.remove_snapshot_unless_registered(&hm_vm::SnapshotId::new(snap.0.clone())) + .await; + } + if let Some(ref ws) = outcome.workspace_dir { + let ws = ws.clone(); + tokio::task::spawn_blocking(move || std::fs::remove_dir_all(ws).ok()) + .await + .ok(); + } + } + let dur = started_total.elapsed().as_millis() as u64; bus.emit(BuildEvent::BuildEnd { @@ -470,51 +583,88 @@ async fn execute_step( ) })?; + // Give the runner a step-scoped cancellation token (a child of the + // build token, so build-level cancellation still propagates). The + // per-step timeout FIRES this token instead of dropping the runner + // future: the runner tears down cooperatively — the VM layer destroys + // the container and reclaims bind-mount ownership of root-written + // files before the workspace tempdir is touched — so a timed-out step + // can never leak a workspace dir or race the host-side cleanup. + let mut run_ctx = run_ctx; + let step_cancel = run_ctx.cancel.child_token(); + run_ctx.cancel = step_cancel.clone(); + let exec = runner.execute(&run_ctx, input); - let result: anyhow::Result = match step_timeout_secs { + let (result, step_timed_out): (anyhow::Result, bool) = match step_timeout_secs { Some(secs) => { - match tokio::time::timeout(Duration::from_secs(u64::from(secs.get())), exec).await { - Ok(r) => r, - Err(_elapsed) => { - // Per-step wall-clock budget exceeded. Emit a step-end with the - // conventional timeout exit code (124), fail the chain, and - // cancel siblings — same shape as a non-zero exit below. - let dur_ms = started.elapsed().as_millis() as u64; - bus.emit(BuildEvent::StepEnd { - step_id, - exit_code: 124, - duration_ms: dur_ms, - snapshot: None, - }); - bus.emit(BuildEvent::ChainFailed { - chain_idx: chain_id, - failed_step_id: step_id, - failed_step_key: step_key.clone(), - exit_code: 124, - message: format!("step '{step_key}' timed out after {secs}s"), - ts: chrono::Utc::now(), - }); - if !keep_going { - cancel.cancel(); - } - return Ok(StepOutcome { - exit_code: 124, - snapshot: None, - summary: Some(StepResultSummary { - step_id, - key: step_key.clone(), - status: StepStatus::TimedOut, - exit_code: Some(124), - duration_ms: dur_ms, - }), - failed_or_skipped: true, - }); + tokio::pin!(exec); + tokio::select! { + r = &mut exec => (r, false), + () = tokio::time::sleep(Duration::from_secs(u64::from(secs.get()))) => { + step_cancel.cancel(); + // Await the cooperative teardown to completion; never + // drop the in-flight future. + (exec.await, true) } } } - _ => exec.await, + _ => (exec.await, false), }; + if step_timed_out { + // Per-step wall-clock budget exceeded. Emit a step-end with the + // conventional timeout exit code (124), fail the chain, and cancel + // siblings — same shape as a non-zero exit below. Whatever the + // post-cancel teardown returned is superseded by the timeout + // verdict, but any resources it reports (a kept workspace dir or + // an ephemeral snapshot, if the step happened to finish in the + // cancellation race window) are carried into the outcome so the + // scheduler's cleanup passes reclaim them. + let dur_ms = started.elapsed().as_millis() as u64; + let (snapshot, workspace_dir, ephemeral_snapshot) = match result { + Ok(sr) => ( + sr.committed_snapshot, + sr.workspace_dir, + sr.ephemeral_snapshot, + ), + Err(_) => (None, None, false), + }; + bus.emit(BuildEvent::StepEnd { + step_id, + exit_code: 124, + duration_ms: dur_ms, + snapshot: None, + }); + bus.emit(BuildEvent::ChainFailed { + chain_idx: chain_id, + failed_step_id: step_id, + failed_step_key: step_key.clone(), + exit_code: 124, + message: format!( + "step '{step_key}' timed out after {}s", + step_timeout_secs.map_or(0, std::num::NonZeroU32::get) + ), + ts: chrono::Utc::now(), + }); + if !keep_going { + cancel.cancel(); + } + return Ok(StepOutcome { + exit_code: 124, + snapshot, + summary: Some(StepResultSummary { + step_id, + key: step_key.clone(), + status: StepStatus::TimedOut, + exit_code: Some(124), + duration_ms: dur_ms, + }), + workspace_dir, + ephemeral_snapshot, + failed_or_skipped: true, + }); + } + let dur_ms = started.elapsed().as_millis() as u64; match result { Ok(sr) => { @@ -554,6 +704,8 @@ async fn execute_step( exit_code: Some(sr.exit_code), duration_ms: dur_ms, }), + workspace_dir: sr.workspace_dir, + ephemeral_snapshot: sr.ephemeral_snapshot, failed_or_skipped: sr.exit_code != 0, }) } @@ -651,3 +803,269 @@ fn compute_chain_info(dag: &Dag) -> ChainInfo { node_chain_pos, } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use crate::local::runner::{StepContext, StepRunner}; + use hm_plugin_protocol::{ExecutorInput, StepResult}; + use std::future::Future; + use std::pin::Pin; + use std::sync::Mutex; + + /// Runner stub that materializes a kept tempdir per step (mirroring the + /// VM runner's workspace contract) and records, at the start of each + /// step, which previously produced workspace dirs still exist on disk. + /// This makes the scheduler's incremental workspace reclamation + /// observable mid-run. + #[derive(Debug, Default)] + struct WorkspaceProbeRunner { + /// `(step key, kept workspace dir)` in execution order. + dirs: Mutex>, + /// step key -> keys of earlier steps whose dirs were still on disk + /// when this step started. + observed: Mutex>>, + } + + impl StepRunner for WorkspaceProbeRunner { + fn name(&self) -> &'static str { + "probe" + } + + fn execute( + &self, + _ctx: &StepContext, + input: ExecutorInput, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + let key = input.step.key.clone(); + let alive: Vec = self + .dirs + .lock() + .unwrap() + .iter() + .filter(|(_, p)| p.exists()) + .map(|(k, _)| k.clone()) + .collect(); + self.observed.lock().unwrap().insert(key.clone(), alive); + + let ws = tempfile::tempdir().unwrap().keep(); + self.dirs.lock().unwrap().push((key, ws.clone())); + Ok(StepResult { + exit_code: 0, + committed_snapshot: None, + artifacts: vec![], + workspace_dir: Some(ws.display().to_string()), + ephemeral_snapshot: false, + }) + }) + } + } + + /// Build a [`PipelineGraph`] from step keys plus `(from, to)` `BuildsIn` + /// edges (indices into `keys`). + fn graph_with_edges(keys: &[&str], edges: &[(usize, usize)]) -> PipelineGraph { + let nodes: Vec = keys + .iter() + .map(|k| serde_json::json!({ "step": { "key": k, "cmd": "true" }, "env": {} })) + .collect(); + let edges: Vec = edges + .iter() + .map(|(a, b)| serde_json::json!([a, b, "builds_in"])) + .collect(); + serde_json::from_value(serde_json::json!({ + "version": "0", + "graph": { + "nodes": nodes, + "node_holes": [], + "edge_property": "directed", + "edges": edges, + } + })) + .unwrap() + } + + async fn run_probe( + graph: PipelineGraph, + runner: Arc, + ) -> crate::BuildOutcome { + let repo = tempfile::tempdir().unwrap(); + std::fs::write(repo.path().join("marker.txt"), "v1").unwrap(); + let mut registry = RunnerRegistry::new(); + registry.register(runner, true); + // Hold `_rx` so the event forwarder keeps a live receiver; the + // handful of events a tiny pipeline emits fits the channel. + let (tx, _rx) = tokio::sync::mpsc::channel(1024); + run( + graph, + repo.path().to_path_buf(), + "test-pipeline".into(), + NonZeroUsize::new(1).unwrap(), + Arc::new(registry), + tx, + CancellationToken::new(), + None, + false, + ) + .await + .unwrap() + } + + /// Chain a -> b -> c: a's kept workspace must be deleted as soon as its + /// only `BuildsIn` child (b) finishes — i.e. before c starts — not at the + /// end of the run. This caps temp-space at the live DAG frontier. + #[tokio::test] + async fn chain_frees_parent_workspace_when_last_child_finishes() { + let runner = Arc::new(WorkspaceProbeRunner::default()); + let graph = graph_with_edges(&["a", "b", "c"], &[(0, 1), (1, 2)]); + + let outcome = run_probe(graph, Arc::clone(&runner)).await; + assert_eq!(outcome.status, crate::BuildStatus::Passed); + + let observed = runner.observed.lock().unwrap().clone(); + // b starts while a's dir is alive (it COWs from it)... + assert_eq!(observed["b"], vec!["a".to_owned()]); + // ...but by the time c starts, b (a's last consumer) has finished + // and a's dir is already gone. Only b's dir is alive. + assert_eq!(observed["c"], vec!["b".to_owned()]); + + // Backstop: nothing survives the run. + for (_, dir) in runner.dirs.lock().unwrap().iter() { + assert!(!dir.exists(), "workspace dir leaked: {}", dir.display()); + } + } + + /// Fork a -> {b, c}: the first child to finish must NOT free a's dir + /// (its sibling still needs it); the last one does. A leaf's own dir + /// (b has no `BuildsIn` children) is freed as soon as the leaf finishes. + #[tokio::test] + async fn fork_frees_parent_workspace_only_after_last_sibling() { + let runner = Arc::new(WorkspaceProbeRunner::default()); + let graph = graph_with_edges(&["a", "b", "c"], &[(0, 1), (0, 2)]); + + let outcome = run_probe(graph, Arc::clone(&runner)).await; + assert_eq!(outcome.status, crate::BuildStatus::Passed); + + let observed = runner.observed.lock().unwrap().clone(); + let exec_order: Vec = runner + .dirs + .lock() + .unwrap() + .iter() + .map(|(k, _)| k.clone()) + .collect(); + assert_eq!(exec_order[0], "a"); + // The first sibling sees a alive. The second sibling must STILL see + // a alive: it is a's remaining consumer, so the first sibling's + // completion must not have freed a's dir. (The first sibling's own + // leaf dir is reclaimed concurrently with the second sibling's + // start — the permit is released before the cleanup runs — so no + // assertion is made about it mid-run; the leak check below covers + // it.) + assert_eq!(observed[&exec_order[1]], vec!["a".to_owned()]); + assert!(observed[&exec_order[2]].contains(&"a".to_owned())); + + for (_, dir) in runner.dirs.lock().unwrap().iter() { + assert!(!dir.exists(), "workspace dir leaked: {}", dir.display()); + } + } + + /// Runner that hangs until its step-scoped cancellation token fires, + /// then performs (simulated) teardown work before returning — mirroring + /// the VM runner, whose post-cancel path destroys the container and + /// reclaims workspace ownership before resolving. + #[derive(Debug, Default)] + struct CooperativeHangRunner { + /// Keys of steps whose futures ran to completion (were awaited + /// through teardown rather than dropped). + torn_down: Mutex>, + } + + impl StepRunner for CooperativeHangRunner { + fn name(&self) -> &'static str { + "hang" + } + + fn execute( + &self, + ctx: &StepContext, + input: ExecutorInput, + ) -> Pin> + Send + '_>> { + let cancel = ctx.cancel.clone(); + Box::pin(async move { + #[allow( + clippy::duration_suboptimal_units, + reason = "from_hours is nightly-only" + )] + const HANG: Duration = Duration::from_secs(3600); + tokio::select! { + () = cancel.cancelled() => {} + () = tokio::time::sleep(HANG) => {} + } + // Teardown must be awaited by the scheduler, never cut + // short by a dropped future. + tokio::time::sleep(Duration::from_millis(100)).await; + self.torn_down.lock().unwrap().push(input.step.key.clone()); + Ok(StepResult { + exit_code: 130, + committed_snapshot: None, + artifacts: vec![], + workspace_dir: None, + ephemeral_snapshot: false, + }) + }) + } + } + + /// A per-step timeout must cancel the runner COOPERATIVELY and await + /// its teardown to completion (dropping the in-flight future would + /// race the container's bind-mount ownership reclaim and leak the + /// workspace dir), while still reporting the step as timed out. + #[tokio::test] + async fn step_timeout_awaits_cooperative_teardown() { + let runner = Arc::new(CooperativeHangRunner::default()); + let graph: PipelineGraph = serde_json::from_value(serde_json::json!({ + "version": "0", + "graph": { + "nodes": [{ + "step": { "key": "slow", "cmd": "true", "timeout_seconds": 1 }, + "env": {} + }], + "node_holes": [], + "edge_property": "directed", + "edges": [], + } + })) + .unwrap(); + + let repo = tempfile::tempdir().unwrap(); + std::fs::write(repo.path().join("marker.txt"), "v1").unwrap(); + let mut registry = RunnerRegistry::new(); + registry.register(Arc::clone(&runner) as Arc, true); + let (tx, _rx) = tokio::sync::mpsc::channel(1024); + let outcome = run( + graph, + repo.path().to_path_buf(), + "test-pipeline".into(), + NonZeroUsize::new(1).unwrap(), + Arc::new(registry), + tx, + CancellationToken::new(), + None, + false, + ) + .await + .unwrap(); + + // The runner's future was awaited through its teardown... + assert_eq!( + runner.torn_down.lock().unwrap().clone(), + vec!["slow".to_owned()] + ); + // ...and the step is still reported as timed out. + assert_eq!(outcome.steps.len(), 1); + assert_eq!(outcome.steps[0].status, StepStatus::TimedOut); + assert_eq!(outcome.steps[0].exit_code, Some(124)); + } +} diff --git a/crates/hm-exec/tests/backend_contract.rs b/crates/hm-exec/tests/backend_contract.rs index d920da93..0108ccfe 100644 --- a/crates/hm-exec/tests/backend_contract.rs +++ b/crates/hm-exec/tests/backend_contract.rs @@ -74,6 +74,7 @@ impl hm_vm::VmBackend for NoopVmBackend { &self, _image: &str, _config: &hm_vm::VmConfig, + _workspace: Option<&hm_vm::WorkspaceMount>, ) -> anyhow::Result> { anyhow::bail!("noop backend") } @@ -81,6 +82,7 @@ impl hm_vm::VmBackend for NoopVmBackend { &self, _snapshot: &hm_vm::SnapshotId, _config: &hm_vm::VmConfig, + _workspace: Option<&hm_vm::WorkspaceMount>, ) -> anyhow::Result> { anyhow::bail!("noop backend") } diff --git a/crates/hm-plugin-protocol/src/executor.rs b/crates/hm-plugin-protocol/src/executor.rs index 75323192..8a5d3292 100644 --- a/crates/hm-plugin-protocol/src/executor.rs +++ b/crates/hm-plugin-protocol/src/executor.rs @@ -98,4 +98,15 @@ pub struct StepResult { /// (typically only on `CacheDecision::MissBuildAs`). pub committed_snapshot: Option, pub artifacts: Vec, + /// Host-side path to the workspace directory that survived this step. + /// Used by the scheduler to propagate workspace state to downstream + /// steps via COW copy instead of re-extracting the source archive. + #[serde(skip)] + #[schemars(skip)] + pub workspace_dir: Option, + /// True when the snapshot is ephemeral (not registered in the cache) + /// and must be cleaned up by the caller after downstream steps finish. + #[serde(skip)] + #[schemars(skip)] + pub ephemeral_snapshot: bool, } diff --git a/crates/hm-vm/Cargo.toml b/crates/hm-vm/Cargo.toml index 97032d7c..5554334e 100644 --- a/crates/hm-vm/Cargo.toml +++ b/crates/hm-vm/Cargo.toml @@ -10,18 +10,19 @@ description = "Local VM/container backends (Docker) that run hm pipeline steps o anyhow = { workspace = true } async-trait = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } tracing = { workspace = true } derive_more = { workspace = true } rusqlite = { version = "0.37", features = ["bundled"] } +uuid = { workspace = true } bollard = { version = "0.18", optional = true } futures = { version = "0.3", optional = true } -tar = { version = "0.4", optional = true } [features] default = ["docker-backend"] -docker-backend = ["dep:bollard", "dep:futures", "dep:tar"] +docker-backend = ["dep:bollard", "dep:futures"] [dev-dependencies] tempfile = "3" diff --git a/crates/hm-vm/src/backend.rs b/crates/hm-vm/src/backend.rs index 948c45ca..abf16c6b 100644 --- a/crates/hm-vm/src/backend.rs +++ b/crates/hm-vm/src/backend.rs @@ -1,35 +1,67 @@ //! Backend trait for pluggable VM implementations. use std::fmt; -use std::path::Path; use anyhow::Result; use async_trait::async_trait; -use crate::types::{OutputSink, SnapshotId, SnapshotLabel, VmConfig}; +use crate::types::{OutputSink, SnapshotId, SnapshotLabel, VmConfig, WorkspaceMount}; /// Factory that creates and manages virtual machines. #[async_trait] pub trait VmBackend: Send + Sync + fmt::Debug { /// Boot a new VM from the given OCI image reference. - async fn create(&self, image: &str, config: &VmConfig) -> Result>; + async fn create( + &self, + image: &str, + config: &VmConfig, + workspace: Option<&WorkspaceMount>, + ) -> Result>; /// Restore a VM from a previously taken snapshot. - async fn restore(&self, snapshot: &SnapshotId, config: &VmConfig) -> Result>; + async fn restore( + &self, + snapshot: &SnapshotId, + config: &VmConfig, + workspace: Option<&WorkspaceMount>, + ) -> Result>; /// Check whether a snapshot exists in the backend store. async fn snapshot_exists(&self, snapshot: &SnapshotId) -> Result; /// Delete a snapshot from the backend store. async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()>; + + /// Best-effort garbage collection of snapshots matching `reference` + /// (a backend-specific reference filter pattern) whose creation time is + /// older than `older_than`. Returns the number of snapshots removed. + /// + /// `keep` is consulted once per matching snapshot tag; tags for which + /// it returns `true` are retained. Callers use it to protect snapshots + /// that are still referenced (e.g. by the snapshot registry) so that GC + /// only ever removes orphans. + /// + /// Backends without a notion of aged snapshot storage may keep the + /// default no-op implementation. + /// + /// # Errors + /// + /// Returns an error only when the backend cannot be queried at all; + /// per-snapshot removal failures are logged and skipped. + async fn gc_snapshots( + &self, + reference: &str, + older_than: std::time::Duration, + keep: &(dyn for<'a> Fn(&'a str) -> bool + Send + Sync), + ) -> Result { + let _ = (reference, older_than, keep); + Ok(0) + } } /// Handle to a running virtual machine. #[async_trait] pub trait Vm: Send { - /// Copy a host path into the guest filesystem. - async fn inject(&self, host_path: &Path, guest_path: &str) -> Result<()>; - /// Run a command inside the VM and stream output to `sink`. async fn exec( &self, diff --git a/crates/hm-vm/src/docker.rs b/crates/hm-vm/src/docker.rs index 87f776b2..13a2b371 100644 --- a/crates/hm-vm/src/docker.rs +++ b/crates/hm-vm/src/docker.rs @@ -5,14 +5,14 @@ //! image commits. use std::collections::HashMap; -use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; use anyhow::{Context, Result}; use async_trait::async_trait; use bollard::Docker; use bollard::container::{ Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions, - StopContainerOptions, UploadToContainerOptions, + StopContainerOptions, }; use bollard::exec::{CreateExecOptions, StartExecResults}; use bollard::image::{ @@ -22,7 +22,7 @@ use futures::StreamExt; use tracing::instrument; use crate::backend::{Vm, VmBackend}; -use crate::types::{OutputSink, SnapshotId, SnapshotLabel, VmConfig}; +use crate::types::{OutputSink, SnapshotId, SnapshotLabel, VmConfig, WorkspaceMount}; /// Docker-based VM backend. /// @@ -78,11 +78,51 @@ impl DockerBackend { Ok(!images.is_empty()) } + /// Compute the host-ownership reclaim spec for a bind-mounted workspace. + /// + /// Steps run as the image's default user (typically root) and write + /// straight into the bind-mounted host directory, so on native Linux + /// Docker the files land on the host owned by uid 0. The unprivileged + /// `hm` user then can neither COW-copy 0600/0700 entries into child + /// workspaces nor delete them during cleanup. The reclaim spec records + /// the owner of the host tempdir itself (always the `hm` user that + /// created it) so [`reclaim_workspace_ownership`] can chown everything + /// back before the host touches the directory again. + #[cfg(unix)] + fn reclaim_spec(workspace: Option<&WorkspaceMount>) -> Option { + use std::os::unix::fs::MetadataExt; + let ws = workspace?; + let meta = std::fs::metadata(&ws.host_path).ok()?; + Some(WorkspaceReclaim { + guest_path: ws.guest_path.clone(), + uid: meta.uid(), + gid: meta.gid(), + }) + } + + #[cfg(not(unix))] + fn reclaim_spec(_workspace: Option<&WorkspaceMount>) -> Option { + None + } + #[instrument(skip(self))] - async fn start_container(&self, image: &str) -> Result { + async fn start_container( + &self, + image: &str, + workspace: Option<&WorkspaceMount>, + ) -> Result { + let host_config = workspace.map(|ws| bollard::service::HostConfig { + binds: Some(vec![format!( + "{}:{}:rw", + ws.host_path.display(), + ws.guest_path + )]), + ..Default::default() + }); let cfg = Config { image: Some(image.to_string()), cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]), + host_config, ..Default::default() }; let create = self @@ -90,32 +130,74 @@ impl DockerBackend { .create_container(None::>, cfg) .await .context("create container")?; - self.client + if let Err(e) = self + .client .start_container(&create.id, None::>) .await - .context("start container")?; + { + // The daemon validates bind-mount sources at START time (e.g. + // Docker Desktop's file-sharing allowlist), so this path is + // reachable on every step. No `Vm` handle exists yet — neither + // `HmVm::execute`'s `destroy()` nor `DockerVm`'s `Drop` backstop + // will ever see this container, and image GC cannot reclaim an + // image pinned by a Created-state container — so remove it here, + // best-effort, before propagating the error. + if let Err(rm_err) = self + .client + .remove_container( + &create.id, + Some(RemoveContainerOptions { + force: true, + v: true, + ..Default::default() + }), + ) + .await + { + tracing::warn!( + container = %create.id, + error = %rm_err, + "failed to remove container that never started" + ); + } + return Err(anyhow::Error::new(e).context("start container")); + } Ok(create.id) } } #[async_trait] impl VmBackend for DockerBackend { - #[instrument(skip(self, _config))] - async fn create(&self, image: &str, _config: &VmConfig) -> Result> { + #[instrument(skip(self, _config, workspace))] + async fn create( + &self, + image: &str, + _config: &VmConfig, + workspace: Option<&WorkspaceMount>, + ) -> Result> { self.ensure_image(image).await?; - let container_id = self.start_container(image).await?; + let container_id = self.start_container(image, workspace).await?; Ok(Box::new(DockerVm { client: self.client.clone(), container_id: Some(container_id), + workspace_reclaim: Self::reclaim_spec(workspace), + exec_in_flight: AtomicBool::new(false), })) } - #[instrument(skip(self, _config))] - async fn restore(&self, snapshot: &SnapshotId, _config: &VmConfig) -> Result> { - let container_id = self.start_container(snapshot.as_ref()).await?; + #[instrument(skip(self, _config, workspace))] + async fn restore( + &self, + snapshot: &SnapshotId, + _config: &VmConfig, + workspace: Option<&WorkspaceMount>, + ) -> Result> { + let container_id = self.start_container(snapshot.as_ref(), workspace).await?; Ok(Box::new(DockerVm { client: self.client.clone(), container_id: Some(container_id), + workspace_reclaim: Self::reclaim_spec(workspace), + exec_in_flight: AtomicBool::new(false), })) } @@ -139,6 +221,170 @@ impl VmBackend for DockerBackend { .with_context(|| format!("removing image '{snapshot}'"))?; Ok(()) } + + #[instrument(skip(self, keep))] + async fn gc_snapshots( + &self, + reference: &str, + older_than: std::time::Duration, + keep: &(dyn for<'a> Fn(&'a str) -> bool + Send + Sync), + ) -> Result { + let mut filters = HashMap::new(); + filters.insert("reference".to_string(), vec![reference.to_string()]); + let images = self + .client + .list_images(Some(ListImagesOptions { + filters, + ..Default::default() + })) + .await + .with_context(|| format!("listing images for GC reference '{reference}'"))?; + + let cutoff: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .saturating_sub(older_than) + .as_secs() + .try_into() + .unwrap_or(i64::MAX); + + let mut removed: u64 = 0; + for image in images { + if image.created >= cutoff { + continue; + } + // Remove by tag, never by image id: the daemon's `reference` + // filter returns whole images, and a multi-tag image may carry + // tags outside the GC pattern (or tags the keeper protects). + // Untagging only the matched, unprotected references leaves + // every other tag pointing at the image intact; the image data + // itself is reclaimed when its last tag goes. + for tag in &image.repo_tags { + if !reference_matches(reference, tag) { + continue; + } + if keep(tag) { + tracing::debug!(image = %tag, "GC keeping referenced snapshot"); + continue; + } + match self + .client + .remove_image( + tag, + Some(RemoveImageOptions { + force: true, + noprune: false, + }), + None, + ) + .await + { + Ok(_) => removed += 1, + Err(e) => { + tracing::warn!(image = %tag, error = %e, "failed to GC snapshot image"); + } + } + } + } + Ok(removed) + } +} + +/// Client-side check that a `repo:tag` reference belongs to a GC pattern. +/// +/// The daemon's `reference` filter pre-selects *images*, but a multi-tag +/// image's `repo_tags` can include tags outside the pattern; only matching +/// tags may be untagged. Supports the two pattern shapes the GC uses: +/// +/// - an exact repository name (matches every tag of exactly that repo); +/// - `/*` (matches repositories exactly one path component below +/// ``, mirroring Docker's reference-filter glob semantics). +fn reference_matches(pattern: &str, repo_tag: &str) -> bool { + let repo = repo_tag.rsplit_once(':').map_or(repo_tag, |(r, _)| r); + pattern.strip_suffix("/*").map_or_else( + || repo == pattern, + |prefix| { + repo.strip_prefix(prefix).is_some_and(|rest| { + rest.strip_prefix('/') + .is_some_and(|name| !name.is_empty() && !name.contains('/')) + }) + }, + ) +} + +/// Ownership-reclaim parameters for a bind-mounted workspace: chown the +/// guest path back to the host user that owns the workspace tempdir. +#[derive(Debug, Clone)] +struct WorkspaceReclaim { + guest_path: String, + uid: u32, + gid: u32, +} + +/// Best-effort: chown the bind-mounted workspace back to the host user. +/// +/// Runs `chown -R : ` inside the container as root +/// (regardless of the image's default user). Because a bind mount shares +/// inodes with the host, this restores host-side ownership of every file +/// the step wrote as root, so the unprivileged host user can COW-copy the +/// workspace into children and delete it during cleanup. On Docker Desktop +/// (macOS) ownership is already remapped by the file sharing layer and +/// this is a harmless no-op. +/// +/// Failures are logged, never propagated: a missing `chown` binary or a +/// dead container must not fail the build — host-side cleanup will then +/// warn about anything it cannot remove. +async fn reclaim_workspace_ownership(client: &Docker, container_id: &str, spec: &WorkspaceReclaim) { + let owner = format!("{}:{}", spec.uid, spec.gid); + let exec = match client + .create_exec( + container_id, + CreateExecOptions { + cmd: Some(vec!["chown", "-R", &owner, &spec.guest_path]), + user: Some("0:0"), + attach_stdout: Some(true), + attach_stderr: Some(true), + ..Default::default() + }, + ) + .await + { + Ok(exec) => exec, + Err(e) => { + tracing::warn!(container = %container_id, error = %e, + "failed to create workspace ownership-reclaim exec"); + return; + } + }; + + match client.start_exec(&exec.id, None).await { + Ok(StartExecResults::Attached { mut output, .. }) => { + // Drain so the exec runs to completion. + while let Some(item) = output.next().await { + if item.is_err() { + break; + } + } + } + Ok(StartExecResults::Detached) => {} + Err(e) => { + tracing::warn!(container = %container_id, error = %e, + "failed to start workspace ownership-reclaim exec"); + return; + } + } + + match client.inspect_exec(&exec.id).await { + Ok(inspect) if inspect.exit_code.unwrap_or(0) == 0 => {} + Ok(inspect) => { + tracing::warn!(container = %container_id, exit_code = ?inspect.exit_code, + "workspace ownership reclaim (chown) exited non-zero; host-side cleanup may be incomplete"); + } + Err(e) => { + tracing::warn!(container = %container_id, error = %e, + "failed to inspect workspace ownership-reclaim exec"); + } + } } /// Handle to a running Docker container acting as a VM. @@ -147,89 +393,93 @@ struct DockerVm { #[debug(skip)] client: Docker, container_id: Option, + /// When a workspace is bind-mounted, the chown target used to hand + /// root-written files back to the host user before teardown. + workspace_reclaim: Option, + /// True while a command may still be running inside the container. + /// + /// Set when `exec` starts and cleared only when it completes cleanly, + /// so a cancelled/timed-out (dropped) exec future — or a stream error + /// of unknown outcome — leaves it set. Teardown uses it to quiesce the + /// container (SIGKILL every process, then restart the idle `sleep`) + /// before the ownership-reclaim chown, so no in-container writer can + /// race the chown or dirty the bind mount afterwards. + exec_in_flight: AtomicBool, +} + +/// Shared container teardown used by both [`DockerVm::destroy`] (awaited) +/// and [`DockerVm::drop`] (detached backstop). +/// +/// Order matters: when `exec_in_flight` is set, a cancelled or timed-out +/// command may still be writing into the bind mount as root, so the +/// container is stopped first (SIGKILL, quiescing every process) and then +/// restarted (its command is `sleep infinity`) so the reclaim chown runs +/// with no concurrent writers. Only after the chown does the final +/// stop/remove happen — nothing can re-dirty the workspace between the +/// chown and the host-side removal of the directory. +async fn teardown_container( + client: &Docker, + id: &str, + reclaim: Option, + exec_in_flight: bool, +) -> Result<()> { + if let Some(spec) = reclaim { + let mut can_reclaim = true; + if exec_in_flight { + let _ = client + .stop_container(id, Some(StopContainerOptions { t: 0 })) + .await; + if let Err(e) = client + .start_container(id, None::>) + .await + { + tracing::warn!(container = %id, error = %e, + "failed to restart container for workspace ownership reclaim; \ + host-side workspace cleanup may be incomplete"); + can_reclaim = false; + } + } + if can_reclaim { + reclaim_workspace_ownership(client, id, &spec).await; + } + } + let _ = client + .stop_container(id, Some(StopContainerOptions { t: 0 })) + .await; + client + .remove_container( + id, + Some(RemoveContainerOptions { + force: true, + v: true, + ..Default::default() + }), + ) + .await + .with_context(|| format!("removing container '{id}'"))?; + Ok(()) } impl Drop for DockerVm { fn drop(&mut self) { if let Some(id) = self.container_id.take() { let client = self.client.clone(); + let reclaim = self.workspace_reclaim.take(); + let exec_in_flight = self.exec_in_flight.load(Ordering::Acquire); tokio::spawn(async move { - let opts = StopContainerOptions { t: 0 }; - let _ = client.stop_container(&id, Some(opts)).await; - let rm = RemoveContainerOptions { - force: true, - v: true, - ..Default::default() - }; - let _ = client.remove_container(&id, Some(rm)).await; - tracing::debug!(container = %id, "dropped container cleaned up"); + match teardown_container(&client, &id, reclaim, exec_in_flight).await { + Ok(()) => tracing::debug!(container = %id, "dropped container cleaned up"), + Err(e) => { + tracing::warn!(container = %id, error = %e, "dropped container cleanup failed"); + } + } }); } } } -/// Build a tar archive from a host directory. -/// -/// The archive contains all files under `host_path` with paths relative -/// to `host_path` itself (i.e. the directory contents, not the directory). -fn tar_directory(host_path: &Path) -> Result> { - let mut archive = tar::Builder::new(Vec::new()); - archive - .append_dir_all(".", host_path) - .with_context(|| format!("archiving '{}'", host_path.display()))?; - archive.finish().context("finalizing tar archive")?; - archive.into_inner().context("extracting tar bytes") -} - #[async_trait] impl Vm for DockerVm { - #[instrument(skip(self), fields(host = %host_path.display()))] - async fn inject(&self, host_path: &Path, guest_path: &str) -> Result<()> { - // Ensure the destination directory exists inside the container. - let cid = self - .container_id - .as_deref() - .context("container already destroyed")?; - let mkdir = self - .client - .create_exec( - cid, - CreateExecOptions { - cmd: Some(vec!["mkdir", "-p", guest_path]), - attach_stdout: Some(true), - attach_stderr: Some(true), - ..Default::default() - }, - ) - .await - .context("create mkdir exec")?; - if let StartExecResults::Attached { mut output, .. } = self - .client - .start_exec(&mkdir.id, None) - .await - .context("start mkdir exec")? - { - while output.next().await.is_some() {} - } - - let tar_bytes = tar_directory(host_path)?; - let options = UploadToContainerOptions { - path: guest_path, - ..Default::default() - }; - self.client - .upload_to_container(cid, Some(options), tar_bytes.into()) - .await - .with_context(|| { - format!( - "uploading '{}' to container '{}:{guest_path}'", - host_path.display(), - cid.get(..12).unwrap_or(cid), - ) - })?; - Ok(()) - } - #[instrument(skip(self, env, sink))] async fn exec( &self, @@ -242,6 +492,12 @@ impl Vm for DockerVm { .container_id .as_deref() .context("container already destroyed")?; + // Mark the container as possibly-running-a-command until we have + // proof of completion. Cleared only on the clean-return path below; + // a dropped (cancelled/timed-out) future or a stream error leaves + // it set so teardown quiesces the container before the workspace + // ownership reclaim. + self.exec_in_flight.store(true, Ordering::Release); let env_strings: Vec = env.iter().map(|(k, v)| format!("{k}={v}")).collect(); let exec = self .client @@ -304,6 +560,9 @@ impl Vm for DockerVm { reason = "docker exit codes fit in i32" )] let exit_code = inspect.exit_code.unwrap_or(0) as i32; + // The command finished and its exit code was observed: no process + // of ours is writing to the bind mount any more. + self.exec_in_flight.store(false, Ordering::Release); Ok(exit_code) } @@ -354,21 +613,63 @@ impl Vm for DockerVm { let Some(id) = self.container_id.take() else { return Ok(()); }; - let _ = self - .client - .stop_container(&id, Some(StopContainerOptions { t: 0 })) - .await; - self.client - .remove_container( - &id, - Some(RemoveContainerOptions { - force: true, - v: true, - ..Default::default() - }), - ) - .await - .with_context(|| format!("removing container '{id}'"))?; - Ok(()) + // Hand root-written workspace files back to the host user before + // teardown: `HmVm::execute` always destroys the VM (awaited, even + // on cancellation) before the runner keeps or drops the tempdir, + // so the reclaim happens-before every host-side read or removal of + // the workspace. When the command was cut short (`exec_in_flight` + // still set) the container is quiesced first so no in-container + // writer can race or follow the chown. + let reclaim = self.workspace_reclaim.take(); + let exec_in_flight = self.exec_in_flight.load(Ordering::Acquire); + teardown_container(&self.client, &id, reclaim, exec_in_flight).await + } +} + +#[cfg(test)] +mod tests { + use super::reference_matches; + + #[test] + fn exact_repo_pattern_matches_every_tag_of_that_repo() { + assert!(reference_matches("ephemeral", "ephemeral:latest")); + assert!(reference_matches( + "ephemeral", + "ephemeral:0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcd" + )); + assert!(reference_matches( + "harmont-ephemeral", + "harmont-ephemeral:3c5e0fda-1111-2222-3333-444444444444" + )); + } + + #[test] + fn exact_repo_pattern_rejects_other_repos() { + // A user's own image whose repo merely starts with the pattern + // must never be swept. + assert!(!reference_matches("ephemeral", "ephemeral-test:latest")); + assert!(!reference_matches("ephemeral", "my-ephemeral:latest")); + assert!(!reference_matches("ephemeral", "ephemeral/sub:latest")); + } + + #[test] + fn wildcard_pattern_matches_one_path_component() { + assert!(reference_matches( + "harmont-cache/*", + "harmont-cache/build:0123456789abcdef" + )); + assert!(!reference_matches( + "harmont-cache/*", + "harmont-cache:latest" + )); + assert!(!reference_matches( + "harmont-cache/*", + "harmont-cachex/build:latest" + )); + assert!(!reference_matches( + "harmont-cache/*", + "harmont-cache/a/b:latest" + )); + assert!(!reference_matches("harmont-cache/*", "other/build:latest")); } } diff --git a/crates/hm-vm/src/lib.rs b/crates/hm-vm/src/lib.rs index fc39a968..6db761c6 100644 --- a/crates/hm-vm/src/lib.rs +++ b/crates/hm-vm/src/lib.rs @@ -5,6 +5,7 @@ pub mod backend; pub mod registry; pub mod types; pub mod vm; +pub mod workspace; #[cfg(feature = "docker-backend")] pub mod docker; @@ -13,6 +14,6 @@ pub use backend::{Vm, VmBackend}; pub use registry::ImageRegistry; pub use types::{ Action, CachingPolicy, ExecutionResult, ImageSource, NullSink, OutputSink, SnapshotId, - SnapshotLabel, VmConfig, + SnapshotLabel, VmConfig, WorkspaceMount, }; pub use vm::HmVm; diff --git a/crates/hm-vm/src/registry.rs b/crates/hm-vm/src/registry.rs index 5602fc58..83aed04c 100644 --- a/crates/hm-vm/src/registry.rs +++ b/crates/hm-vm/src/registry.rs @@ -62,7 +62,8 @@ impl ImageRegistry { conn.execute_batch( "PRAGMA journal_mode = WAL; - PRAGMA synchronous = NORMAL;", + PRAGMA synchronous = NORMAL; + PRAGMA busy_timeout = 5000;", )?; conn.execute_batch( @@ -73,6 +74,19 @@ impl ImageRegistry { );", )?; + // Idempotent migration: add workspace_dir column if missing. + let has_ws_col: bool = conn + .query_row( + "SELECT COUNT(*) FROM pragma_table_info('snapshots') WHERE name='workspace_dir'", + [], + |row| row.get::<_, i64>(0), + ) + .unwrap_or(0) + > 0; + if !has_ws_col { + conn.execute_batch("ALTER TABLE snapshots ADD COLUMN workspace_dir TEXT;")?; + } + Ok(Self { conn: Mutex::new(conn), capacity, @@ -84,18 +98,27 @@ impl ImageRegistry { /// Returns `None` if no entry exists for `key`. #[must_use] pub fn get(&self, key: &str) -> Option { + self.get_with_workspace(key).map(|(snap, _)| snap) + } + + /// Look up a cached snapshot and its workspace directory, updating the + /// access time. + /// + /// Returns `None` if no entry exists for `key`. + #[must_use] + pub fn get_with_workspace(&self, key: &str) -> Option<(SnapshotId, Option)> { let now = epoch_secs(); let conn = self.conn.lock().ok()?; - let snapshot: Option = conn + let result: Option<(String, Option)> = conn .query_row( - "SELECT snapshot_id FROM snapshots WHERE key = ?1", + "SELECT snapshot_id, workspace_dir FROM snapshots WHERE key = ?1", [key], - |row| row.get(0), + |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option>(1)?)), ) .ok(); - if snapshot.is_some() { + if result.is_some() { let _ = conn.execute( "UPDATE snapshots SET accessed_at = ?1 WHERE key = ?2", rusqlite::params![now, key], @@ -103,55 +126,136 @@ impl ImageRegistry { } drop(conn); - snapshot.map(SnapshotId::new) + result.map(|(snap, ws)| (SnapshotId::new(snap), ws)) } /// Insert or update a cache entry. /// - /// Returns the [`SnapshotId`]s of any entries evicted to keep the registry - /// within its configured capacity. The caller is responsible for cleaning - /// up the backend resources associated with evicted snapshots. - pub fn put(&self, key: &str, snapshot: &SnapshotId) -> Vec { + /// The `workspace_dir` column is always written as `NULL`: the registry + /// stores system-state snapshots only. Workspace state is strictly + /// run-scoped and never persisted across runs (non-`NULL` values are + /// legacy rows kept solely so their directories can be reaped). + /// + /// Returns the evicted entries (snapshot ID and optional legacy workspace + /// directory) to keep the registry within its configured capacity. The + /// caller is responsible for cleaning up backend resources and legacy + /// workspace directories associated with evicted entries. + /// + /// Upsert + eviction run inside a single transaction so concurrent + /// writers (including other processes sharing the database file) can + /// never observe a partially-applied put. + /// + /// # Errors + /// + /// Returns an error if the registry mutex is poisoned or any statement + /// fails; the caller must treat the snapshot as unregistered. + #[allow( + clippy::significant_drop_tightening, + reason = "the transaction borrows the guarded connection until commit" + )] + pub fn put( + &self, + key: &str, + snapshot: &SnapshotId, + ) -> Result)>> { let now = epoch_secs(); - let Ok(conn) = self.conn.lock() else { - return Vec::new(); - }; + let mut conn = self + .conn + .lock() + .map_err(|_| anyhow::anyhow!("registry mutex poisoned"))?; + let tx = conn.transaction()?; - // INSERT OR REPLACE handles both new and updated entries. let snapshot_id: &str = snapshot.as_ref(); - let _result = conn.execute( + tx.execute( "INSERT OR REPLACE INTO snapshots (key, snapshot_id, accessed_at) VALUES (?1, ?2, ?3)", rusqlite::params![key, snapshot_id, now], - ); + )?; - drop(conn); - self.evict_overflow() + let evicted = Self::evict_overflow_tx(&tx, self.capacity)?; + tx.commit()?; + Ok(evicted) } /// Remove a specific entry. /// - /// Returns the removed snapshot's ID so the caller can clean up backend - /// resources, or `None` if the key was not present. + /// Returns the removed snapshot's ID and workspace directory so the + /// caller can clean up backend resources, or `None` if the key was + /// not present. #[must_use] - pub fn invalidate(&self, key: &str) -> Option { + pub fn invalidate(&self, key: &str) -> Option<(SnapshotId, Option)> { let conn = self.conn.lock().ok()?; - let snapshot: Option = conn + let row: Option<(String, Option)> = conn .query_row( - "SELECT snapshot_id FROM snapshots WHERE key = ?1", + "SELECT snapshot_id, workspace_dir FROM snapshots WHERE key = ?1", [key], - |row| row.get(0), + |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); - if snapshot.is_some() { + if row.is_some() { let _ = conn.execute("DELETE FROM snapshots WHERE key = ?1", [key]); } drop(conn); - snapshot.map(SnapshotId::new) + row.map(|(snap, ws)| (SnapshotId::new(snap), ws)) + } + + /// Compare-and-delete: remove the entry for `key` only if it still maps + /// to `expected`. + /// + /// This closes the race where a stale entry is observed, the lock is + /// released for an async backend check, and a fresh entry is inserted + /// under the same key before the invalidation lands — a plain + /// [`Self::invalidate`] would destroy the fresh entry. + /// + /// Returns `Some(legacy_workspace_dir)` when the observed row was + /// deleted, or `None` when the key is absent or now holds a different + /// snapshot (the concurrently re-inserted entry survives). + #[must_use] + #[allow( + clippy::significant_drop_tightening, + reason = "the prepared statement borrows the guarded connection" + )] + pub fn invalidate_if(&self, key: &str, expected: &SnapshotId) -> Option> { + let conn = self.conn.lock().ok()?; + let mut stmt = conn + .prepare( + "DELETE FROM snapshots WHERE key = ?1 AND snapshot_id = ?2 + RETURNING workspace_dir", + ) + .ok()?; + let mut rows = stmt + .query_map(rusqlite::params![key, expected.as_ref()], |row| { + row.get::<_, Option>(0) + }) + .ok()?; + let legacy_ws = rows.next()?.ok()?; + Some(legacy_ws) + } + + /// Returns `true` if any entry currently maps to `snapshot`. + /// + /// Used as a pre-removal guard for deferred eviction cleanup: a tag that + /// was evicted earlier may have been re-registered since (by a later step + /// in this run or by a concurrent process); Docker re-tagging means the + /// tag now names the *fresh* image, so removing it would destroy a live + /// cache entry. + #[must_use] + pub fn contains_snapshot(&self, snapshot: &SnapshotId) -> bool { + let Ok(conn) = self.conn.lock() else { + // A poisoned lock means we cannot prove the snapshot is unused; + // report it as referenced so callers err on the side of keeping it. + return true; + }; + conn.query_row( + "SELECT COUNT(*) FROM snapshots WHERE snapshot_id = ?1", + [snapshot.as_ref()], + |row| row.get::<_, i64>(0), + ) + .map_or(true, |n| n > 0) } /// Return every stored snapshot ID. @@ -194,46 +298,40 @@ impl ImageRegistry { } /// Evict the oldest entries (by `accessed_at`) when the registry exceeds - /// its capacity. Returns the snapshot IDs of evicted entries. - fn evict_overflow(&self) -> Vec { - let count = self.len(); - let capacity = self.capacity.get(); + /// its capacity. Runs inside the caller's transaction. + /// + /// A single `DELETE .. RETURNING` statement selects and removes the same + /// rows, with `key` as a deterministic tie-break for equal timestamps, so + /// the returned set can never diverge from the deleted set. + fn evict_overflow_tx( + tx: &rusqlite::Transaction<'_>, + capacity: NonZeroU64, + ) -> Result)>> { + let count: i64 = tx.query_row("SELECT COUNT(*) FROM snapshots", [], |row| row.get(0))?; + let count = u64::try_from(count).unwrap_or(0); + let capacity = capacity.get(); if count <= capacity { - return Vec::new(); + return Ok(Vec::new()); } let overflow = count - capacity; - let Ok(conn) = self.conn.lock() else { - return Vec::new(); - }; - - let Ok(mut stmt) = - conn.prepare("SELECT snapshot_id FROM snapshots ORDER BY accessed_at ASC LIMIT ?1") - else { - return Vec::new(); - }; - - let evicted: Vec = stmt - .query_map([overflow], |row| { - row.get::<_, String>(0).map(SnapshotId::new) - }) - .ok() - .map(|rows| rows.filter_map(Result::ok).collect()) - .unwrap_or_default(); - - // Drop stmt before using conn again for the delete. - drop(stmt); - - // Delete those entries. - let _deleted = conn.execute( + let mut stmt = tx.prepare( "DELETE FROM snapshots WHERE key IN ( - SELECT key FROM snapshots ORDER BY accessed_at ASC LIMIT ?1 - )", - [overflow], - ); + SELECT key FROM snapshots ORDER BY accessed_at ASC, key ASC LIMIT ?1 + ) RETURNING snapshot_id, workspace_dir", + )?; - evicted + let evicted = stmt + .query_map([overflow], |row| { + Ok(( + SnapshotId::new(row.get::<_, String>(0)?), + row.get::<_, Option>(1)?, + )) + })? + .collect::, _>>()?; + + Ok(evicted) } } @@ -256,11 +354,23 @@ mod tests { assert!(reg.get("nonexistent").is_none()); } + /// Insert a legacy-style row with a non-NULL `workspace_dir`, as written + /// by pre-fix versions that persisted cached workspaces. + fn insert_legacy_row(reg: &ImageRegistry, key: &str, snap: &str, ws: &str) { + let conn = reg.conn.lock().unwrap(); + conn.execute( + "INSERT OR REPLACE INTO snapshots (key, snapshot_id, accessed_at, workspace_dir) + VALUES (?1, ?2, ?3, ?4)", + rusqlite::params![key, snap, epoch_secs(), ws], + ) + .unwrap(); + } + #[test] fn put_then_get_returns_snapshot() { let (reg, _dir) = open_temp(10); let snap = SnapshotId::new("snap-abc"); - let evicted = reg.put("my-key", &snap); + let evicted = reg.put("my-key", &snap).expect("put"); assert!(evicted.is_empty()); let got = reg.get("my-key"); @@ -272,12 +382,12 @@ mod tests { let (reg, _dir) = open_temp(2); // Insert a, then b. "a" is older by insertion order. - reg.put("a", &SnapshotId::new("snap-a")); + reg.put("a", &SnapshotId::new("snap-a")).expect("put a"); // Tiny sleep so timestamps differ. std::thread::sleep(std::time::Duration::from_secs(1)); - reg.put("b", &SnapshotId::new("snap-b")); + reg.put("b", &SnapshotId::new("snap-b")).expect("put b"); // Touch "a" so it becomes the most recently accessed. std::thread::sleep(std::time::Duration::from_secs(1)); @@ -286,10 +396,10 @@ mod tests { // Now insert "c" -- capacity is 2, so one must be evicted. // "b" should be evicted since "a" was touched more recently. std::thread::sleep(std::time::Duration::from_secs(1)); - let evicted = reg.put("c", &SnapshotId::new("snap-c")); + let evicted = reg.put("c", &SnapshotId::new("snap-c")).expect("put c"); assert_eq!(evicted.len(), 1); - assert_eq!(evicted[0], SnapshotId::new("snap-b")); + assert_eq!(evicted[0].0, SnapshotId::new("snap-b")); // "a" should still be present. assert!(reg.get("a").is_some()); @@ -301,16 +411,16 @@ mod tests { fn eviction_returns_overflow_entries() { let (reg, _dir) = open_temp(2); - reg.put("x", &SnapshotId::new("snap-x")); + reg.put("x", &SnapshotId::new("snap-x")).expect("put x"); std::thread::sleep(std::time::Duration::from_secs(1)); - reg.put("y", &SnapshotId::new("snap-y")); + reg.put("y", &SnapshotId::new("snap-y")).expect("put y"); std::thread::sleep(std::time::Duration::from_secs(1)); // This third insert should evict the oldest ("x"). - let evicted = reg.put("z", &SnapshotId::new("snap-z")); + let evicted = reg.put("z", &SnapshotId::new("snap-z")).expect("put z"); assert_eq!(evicted.len(), 1); - assert_eq!(evicted[0], SnapshotId::new("snap-x")); + assert_eq!(evicted[0].0, SnapshotId::new("snap-x")); assert_eq!(reg.len(), 2); } @@ -323,7 +433,8 @@ mod tests { { let reg = ImageRegistry::open(&db_path, capacity).expect("open"); - reg.put("persistent", &SnapshotId::new("snap-persist")); + reg.put("persistent", &SnapshotId::new("snap-persist")) + .expect("put"); assert_eq!(reg.len(), 1); // reg is dropped here, closing the connection. } @@ -339,8 +450,8 @@ mod tests { let (reg, _dir) = open_temp(10); assert!(reg.all_snapshot_ids().is_empty()); - reg.put("k1", &SnapshotId::new("forever-a")); - reg.put("k2", &SnapshotId::new("forever-b")); + reg.put("k1", &SnapshotId::new("forever-a")).expect("put"); + reg.put("k2", &SnapshotId::new("forever-b")).expect("put"); let mut ids: Vec = reg .all_snapshot_ids() @@ -355,10 +466,10 @@ mod tests { fn invalidate_returns_removed_snapshot() { let (reg, _dir) = open_temp(10); let snap = SnapshotId::new("snap-rm"); - reg.put("to-remove", &snap); + reg.put("to-remove", &snap).expect("put"); let removed = reg.invalidate("to-remove"); - assert_eq!(removed, Some(SnapshotId::new("snap-rm"))); + assert_eq!(removed, Some((SnapshotId::new("snap-rm"), None))); assert!(reg.get("to-remove").is_none()); assert_eq!(reg.len(), 0); @@ -366,4 +477,96 @@ mod tests { let removed2 = reg.invalidate("to-remove"); assert!(removed2.is_none()); } + + #[test] + fn put_writes_null_workspace() { + let (reg, _dir) = open_temp(10); + reg.put("plain-key", &SnapshotId::new("snap-plain")) + .expect("put"); + + let (_, got_ws) = reg.get_with_workspace("plain-key").unwrap(); + assert!(got_ws.is_none()); + } + + #[test] + fn put_overwrites_legacy_workspace_with_null() { + let (reg, _dir) = open_temp(10); + insert_legacy_row(®, "k", "snap-old", "/ws/legacy"); + + reg.put("k", &SnapshotId::new("snap-new")).expect("put"); + + let (snap, ws) = reg.get_with_workspace("k").unwrap(); + assert_eq!(snap, SnapshotId::new("snap-new")); + assert!(ws.is_none()); + } + + #[test] + fn invalidate_if_matching_snapshot_deletes_row() { + let (reg, _dir) = open_temp(10); + let snap = SnapshotId::new("snap-cas"); + reg.put("cas-key", &snap).expect("put"); + + let removed = reg.invalidate_if("cas-key", &snap); + assert_eq!(removed, Some(None)); + assert!(reg.get("cas-key").is_none()); + } + + #[test] + fn invalidate_if_mismatched_snapshot_keeps_row() { + let (reg, _dir) = open_temp(10); + reg.put("cas-key", &SnapshotId::new("snap-fresh")) + .expect("put"); + + // A stale observer tries to invalidate with the snapshot it saw + // earlier; the fresh row must survive. + let removed = reg.invalidate_if("cas-key", &SnapshotId::new("snap-stale")); + assert!(removed.is_none()); + assert_eq!(reg.get("cas-key"), Some(SnapshotId::new("snap-fresh"))); + + // Absent key is also a no-op. + assert!( + reg.invalidate_if("missing", &SnapshotId::new("whatever")) + .is_none() + ); + } + + #[test] + fn invalidate_if_returns_legacy_workspace_for_cleanup() { + let (reg, _dir) = open_temp(10); + insert_legacy_row(®, "legacy", "snap-legacy", "/ws/legacy"); + + let removed = reg.invalidate_if("legacy", &SnapshotId::new("snap-legacy")); + assert_eq!(removed, Some(Some("/ws/legacy".into()))); + assert!(reg.get("legacy").is_none()); + } + + #[test] + fn contains_snapshot_tracks_rows() { + let (reg, _dir) = open_temp(10); + let snap = SnapshotId::new("snap-ref"); + assert!(!reg.contains_snapshot(&snap)); + + reg.put("k", &snap).expect("put"); + assert!(reg.contains_snapshot(&snap)); + + // The same snapshot under a second key still counts. + reg.put("k2", &snap).expect("put"); + let _ = reg.invalidate("k"); + assert!(reg.contains_snapshot(&snap)); + + let _ = reg.invalidate("k2"); + assert!(!reg.contains_snapshot(&snap)); + } + + #[test] + fn eviction_returns_legacy_workspace_path() { + let (reg, _dir) = open_temp(1); + insert_legacy_row(®, "a", "snap-a", "/ws/a"); + std::thread::sleep(std::time::Duration::from_secs(1)); + + let evicted = reg.put("b", &SnapshotId::new("snap-b")).expect("put b"); + assert_eq!(evicted.len(), 1); + assert_eq!(evicted[0].0, SnapshotId::new("snap-a")); + assert_eq!(evicted[0].1.as_deref(), Some("/ws/a")); + } } diff --git a/crates/hm-vm/src/types.rs b/crates/hm-vm/src/types.rs index 2511f48a..b3cc6fa1 100644 --- a/crates/hm-vm/src/types.rs +++ b/crates/hm-vm/src/types.rs @@ -10,6 +10,13 @@ pub enum ImageSource { Snapshot(SnapshotId), } +/// Bind-mount specification for a host workspace directory. +#[derive(Clone, Debug)] +pub struct WorkspaceMount { + pub host_path: PathBuf, + pub guest_path: String, +} + /// What to execute inside a VM. #[derive(Clone, Debug)] pub struct Action { @@ -18,9 +25,12 @@ pub struct Action { pub env: Vec<(String, String)>, pub working_dir: String, pub timeout: Option, - /// Host directory to copy into `working_dir` before execution. - /// Skipped on cache hits (snapshot already contains prior state). - pub inject: Option, + /// Host workspace directory to bind-mount into the VM. + /// + /// Bind mounts are EXCLUDED from snapshots (`docker commit` captures + /// system state only), so workspace contents never persist into the + /// cache and are never consulted on the cache-hit path. + pub workspace: Option, } /// How to cache the result. @@ -86,6 +96,9 @@ pub struct ExecutionResult { pub exit_code: i32, pub snapshot: Option, pub cached: bool, + /// True when the snapshot is ephemeral (not registered in the cache) + /// and must be cleaned up by the caller after downstream steps finish. + pub ephemeral_snapshot: bool, } /// VM resource configuration. diff --git a/crates/hm-vm/src/vm.rs b/crates/hm-vm/src/vm.rs index 07f00a9b..6b6addb2 100644 --- a/crates/hm-vm/src/vm.rs +++ b/crates/hm-vm/src/vm.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::Result; +use tokio_util::sync::CancellationToken; use tracing::{instrument, warn}; use crate::backend::VmBackend; @@ -12,16 +13,41 @@ use crate::types::{ VmConfig, }; +/// Exit code reported when execution is cut short by cooperative +/// cancellation (Ctrl-C, sibling failure, per-step timeout). Mirrors the +/// conventional 128+SIGINT shell encoding; the scheduler maps it to a +/// `Canceled` step status. +const CANCELLED_EXIT_CODE: i32 = 130; + +/// The [`ExecutionResult`] returned when a step is cancelled before or +/// during execution: nothing was committed, nothing was cached. +const fn cancelled_result() -> ExecutionResult { + ExecutionResult { + exit_code: CANCELLED_EXIT_CODE, + snapshot: None, + cached: false, + ephemeral_snapshot: false, + } +} + /// High-level orchestrator that drives the VM lifecycle. /// /// `HmVm` composes a [`VmBackend`] with an [`ImageRegistry`] to provide /// cache-aware execution: if a cached snapshot already exists for a given -/// caching key the expensive create-inject-exec cycle is skipped entirely. +/// caching key the expensive create-exec cycle is skipped entirely. #[derive(Debug)] pub struct HmVm { backend: Arc, registry: ImageRegistry, config: VmConfig, + /// Snapshots (plus optional legacy workspace dirs) evicted from the + /// registry during this run. Their backend images are NOT removed at + /// eviction time: an in-flight step may still hold the evicted tag as + /// its `parent_snapshot` and restore from it later (cache-hit outcomes + /// propagate only the tag). Removal is deferred to + /// [`Self::cleanup_deferred_evictions`], which the local backend calls + /// strictly after the whole DAG has drained. + deferred_evictions: std::sync::Mutex)>>, } impl HmVm { @@ -31,6 +57,116 @@ impl HmVm { backend, registry, config, + deferred_evictions: std::sync::Mutex::new(Vec::new()), + } + } + + /// Check whether a cached result exists for `key` without executing anything. + /// + /// Returns `Some(result)` on a valid hit (snapshot still exists in the + /// backend). Invalidates stale entries as a side-effect. Returns `None` + /// on miss. + /// + /// # Errors + /// + /// Returns an error if the backend's `snapshot_exists` check fails. + pub async fn peek_cache(&self, key: &str) -> Result> { + self.check_cache(key).await + } + + /// Single implementation of the cache-hit check shared by + /// [`Self::peek_cache`] and [`Self::execute`]. + /// + /// A hit requires only a registry row plus a live backend snapshot: the + /// cache stores system state (what `docker commit` captures) and nothing + /// else. A stale row (image removed out-of-band, e.g. `docker rmi`) is + /// invalidated with a compare-and-delete so a concurrently re-inserted + /// fresh entry is never destroyed; any legacy workspace directory the + /// row still referenced is reaped lazily. + async fn check_cache(&self, key: &str) -> Result> { + let Some((snap, _legacy_ws)) = self.registry.get_with_workspace(key) else { + return Ok(None); + }; + if self.backend.snapshot_exists(&snap).await? { + return Ok(Some(ExecutionResult { + exit_code: 0, + snapshot: Some(snap), + cached: true, + ephemeral_snapshot: false, + })); + } + // The image is already gone, so there is nothing to remove in the + // backend -- only the registry row (and any legacy workspace dir). + warn!(key, snapshot = %snap, "cached snapshot missing from backend; invalidating entry"); + if let Some(Some(legacy_ws)) = self.registry.invalidate_if(key, &snap) { + tokio::task::spawn_blocking(move || std::fs::remove_dir_all(legacy_ws).ok()) + .await + .ok(); + } + Ok(None) + } + + /// Remove backend images for registry entries evicted during this run. + /// + /// Must be called strictly after every step of the run has finished + /// (the local backend invokes it once the scheduler's DAG has fully + /// drained), so no in-flight step can still restore from an evicted + /// tag. + /// + /// A tag that has been re-registered since its eviction is skipped: + /// the same key may have been rebuilt later in this run, or a + /// concurrent process may have re-inserted it, and backend re-tagging + /// means the tag now names the *fresh* image — removing it would + /// destroy a live cache entry. (A narrow cross-process window remains: + /// a concurrent run that observed the tag before the eviction — as its + /// own cache hit, whose children then carry the tag as their + /// `parent_snapshot` — and restores from it after this cleanup gets a + /// hard "no such image" restore failure and the step FAILS; the + /// `parent_snapshot` restore path never goes through `check_cache`, so + /// this does not degrade gracefully into re-execution. The stale row is + /// invalidated on that process's next `check_cache` of the key. Closing + /// the window entirely needs backend-level coordination between + /// processes, which the registry cannot provide.) + /// + /// Legacy workspace directories riding on evicted rows are always + /// reaped. Best-effort throughout: failures are logged, never + /// propagated. + pub async fn cleanup_deferred_evictions(&self) { + let pending = std::mem::take( + &mut *self + .deferred_evictions + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner), + ); + for (snap, legacy_ws) in pending { + if self.registry.contains_snapshot(&snap) { + tracing::debug!(snapshot = %snap, "evicted tag was re-registered; keeping image"); + } else if let Err(e) = self.backend.remove_snapshot(&snap).await { + warn!(snapshot = %snap, error = %e, "failed to remove evicted snapshot"); + } + if let Some(ws_path) = legacy_ws { + tokio::task::spawn_blocking(move || std::fs::remove_dir_all(ws_path).ok()) + .await + .ok(); + } + } + } + + /// Remove `snapshot` from the backend unless the registry currently + /// maps some key to it. + /// + /// Guarded twin of a bare `VmBackend::remove_snapshot` for reaping + /// run-scoped (ephemeral or demoted-to-ephemeral) snapshots: if a + /// concurrent run re-registered the same tag after this run marked it + /// ephemeral — e.g. a `harmont-cache/*` tag demoted by a failed + /// `registry.put` that another process has since successfully cached — + /// removing the image would destroy a live cache entry. Best-effort: + /// failures are logged, never propagated. + pub async fn remove_snapshot_unless_registered(&self, snapshot: &SnapshotId) { + if self.registry.contains_snapshot(snapshot) { + tracing::debug!(snapshot = %snapshot, "snapshot was re-registered; keeping image"); + } else if let Err(e) = self.backend.remove_snapshot(snapshot).await { + warn!(snapshot = %snapshot, error = %e, "failed to remove ephemeral snapshot"); } } @@ -41,42 +177,78 @@ impl HmVm { /// When the policy is [`CachingPolicy::Cache`] the registry is consulted /// first. A cache hit that still exists in the backend returns immediately. /// On a successful (exit-code 0) execution the resulting snapshot is stored - /// in the registry; evicted entries are cleaned up in the backend. + /// in the registry; entries evicted by the insert are queued for removal + /// via [`Self::cleanup_deferred_evictions`] once the run has drained. + /// + /// # Cancellation + /// + /// Cancellation is cooperative via `cancel`: when the token fires, the + /// in-flight command is abandoned and the VM is destroyed — and that + /// teardown (including the bind-mount ownership reclaim on native Linux + /// Docker) is *awaited* before this future resolves. Callers must + /// therefore never `select!`-drop this future on cancellation; awaiting + /// it guarantees the workspace directory is safe to read or remove the + /// moment it returns. A cancelled execution yields exit code 130 with + /// no snapshot. /// /// # Errors /// - /// Returns an error if the backend fails to create, restore, inject, or + /// Returns an error if the backend fails to create, restore, or /// execute. Best-effort cleanup is performed even on failure paths. - #[instrument(skip(self, action, sink), fields(cmd = %action.cmd))] + #[instrument(skip(self, action, sink, cancel), fields(cmd = %action.cmd))] pub async fn execute( &self, action: Action, policy: CachingPolicy, sink: &dyn OutputSink, + cancel: &CancellationToken, ) -> Result { - // 1. Cache check + // 1. Cache check. Callers typically `peek_cache` first; this second + // check is deliberately retained -- it is cheap (one SQLite read + + // one image list) and lets a concurrent fill between peek and + // execute still count as a hit, at worst wasting one COW copy. if let CachingPolicy::Cache { ref key } = policy - && let Some(snap) = self.registry.get(key) + && let Some(result) = self.check_cache(key).await? { - if self.backend.snapshot_exists(&snap).await? { - return Ok(ExecutionResult { - exit_code: 0, - snapshot: Some(snap), - cached: true, - }); - } - let _ = self.registry.invalidate(key); + return Ok(result); } - // 2. Create or restore VM - let mut vm = match &action.source { - ImageSource::Image(image) => self.backend.create(image, &self.config).await?, - ImageSource::Snapshot(snap) => self.backend.restore(snap, &self.config).await?, + // 2. Create or restore the VM, bailing cooperatively if the run is + // cancelled mid-boot (image pulls can take minutes). No container + // handle exists yet on the cancel branch, so there is nothing to + // tear down, and the workspace still holds only host-user-owned + // files, so the caller may delete it immediately. (`biased` makes + // an already-cancelled token win deterministically. Dropping a + // creation request mid-roundtrip can, in a sub-millisecond window, + // leave a never-started container behind on the daemon; it holds no + // processes and writes nothing, so it cannot affect the workspace.) + let create_fut = async { + match &action.source { + ImageSource::Image(image) => { + self.backend + .create(image, &self.config, action.workspace.as_ref()) + .await + } + ImageSource::Snapshot(snap) => { + self.backend + .restore(snap, &self.config, action.workspace.as_ref()) + .await + } + } + }; + let mut vm = tokio::select! { + biased; + () = cancel.cancelled() => return Ok(cancelled_result()), + vm = create_fut => vm?, }; - let result = self.run_in_vm(&mut *vm, &action, &policy, sink).await; + let result = self + .run_in_vm(&mut *vm, &action, &policy, sink, cancel) + .await; - // Always destroy the VM, even on error. + // Always destroy the VM -- on success, error AND cancellation -- + // and await it: teardown reclaims bind-mount ownership and must + // happen-before the caller touches or removes the workspace dir. vm.destroy().await.ok(); result @@ -96,57 +268,142 @@ impl HmVm { self.backend.remove_snapshot(snapshot).await } - /// Inner lifecycle: inject, exec, snapshot. Separated so the caller + /// Best-effort sweep of aged snapshot images that neither a registry + /// row nor this run's deferred-eviction queue references. + /// + /// Intended for *run-scoped* tag namespaces (`harmont-ephemeral`, the + /// legacy `ephemeral`), whose images are created and removed within a + /// single run: an aged tag there is residue of a dead run. It must NOT + /// be pointed at the shared cache namespace (`harmont-cache/*`) by + /// anything that runs automatically: the registry DB is per-user while + /// the backend (Docker daemon) can be shared, so a missing row does not + /// prove an image is orphaned — another user's registry may still + /// reference it. The `older_than` floor protects a concurrent run's + /// freshly committed images that have not been registered yet. + /// + /// The keep predicate protects, in addition to live registry rows, any + /// tag queued in [`Self::deferred_evictions`]: an LRU-evicted row is + /// gone from `SQLite` immediately, but in-flight steps of this run may + /// still restore from the evicted tag until the DAG drains, so it must + /// survive any sweep that runs concurrently with the run. + /// + /// # Errors + /// + /// Returns an error only if the backend cannot be queried at all; + /// per-image removal failures are logged and skipped. + pub async fn gc_orphaned_snapshots( + &self, + reference: &str, + older_than: std::time::Duration, + ) -> Result { + self.backend + .gc_snapshots(reference, older_than, &|tag| { + self.registry + .contains_snapshot(&SnapshotId::new(tag.to_owned())) + || self.is_eviction_deferred(tag) + }) + .await + } + + /// Whether `tag` is queued for deferred end-of-run removal — i.e. an + /// in-flight step of this run may still restore from it even though its + /// registry row is already gone. + fn is_eviction_deferred(&self, tag: &str) -> bool { + self.deferred_evictions + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .iter() + .any(|(snap, _)| snap.as_ref() == tag) + } + + /// Inner lifecycle: exec, snapshot, register. Separated so the caller /// can guarantee `vm.destroy()` runs regardless of outcome. + /// + /// The workspace bind mount is never persisted: snapshots capture system + /// state only, and workspace state is strictly run-scoped (owned by the + /// caller's temp directories). async fn run_in_vm( &self, vm: &mut dyn crate::backend::Vm, action: &Action, policy: &CachingPolicy, sink: &dyn OutputSink, + cancel: &CancellationToken, ) -> Result { - // 3. Inject workspace - if let Some(ref host_path) = action.inject { - vm.inject(host_path, &action.working_dir).await?; - } - - // 4. Execute command (with optional timeout) - let exec_fut = vm.exec(&action.cmd, &action.env, &action.working_dir, sink); - let exit_code = if let Some(timeout) = action.timeout { - match tokio::time::timeout(timeout, exec_fut).await { - Ok(result) => result?, - Err(_) => anyhow::bail!("command timed out after {timeout:?}"), + // 3. Execute command (with optional timeout), bailing cooperatively + // on cancellation. Dropping the exec future abandons only the + // output stream -- the in-container process may keep running until + // the caller's `vm.destroy()` quiesces the container (the backend + // tracks the interrupted exec and stops every process before + // reclaiming workspace ownership). + let exec_inner = vm.exec(&action.cmd, &action.env, &action.working_dir, sink); + let exec_fut = async { + if let Some(timeout) = action.timeout { + match tokio::time::timeout(timeout, exec_inner).await { + Ok(result) => result, + Err(_) => anyhow::bail!("command timed out after {timeout:?}"), + } + } else { + exec_inner.await } - } else { - exec_fut.await? + }; + let exit_code = tokio::select! { + biased; + () = cancel.cancelled() => return Ok(cancelled_result()), + exit = exec_fut => exit?, }; - // 5. Snapshot and cache on success - let snapshot = if exit_code == 0 { - let label = match policy { - CachingPolicy::Cache { key } => SnapshotLabel::Cached(key.clone()), - CachingPolicy::None => SnapshotLabel::Ephemeral, + // 4. Snapshot and cache on success + let (snapshot, ephemeral) = if exit_code == 0 { + let (label, mut is_ephemeral) = match &policy { + CachingPolicy::Cache { key } => (SnapshotLabel::Cached(key.clone()), false), + CachingPolicy::None => (SnapshotLabel::Ephemeral, true), }; let snap = vm.snapshot(&label).await?; - if let CachingPolicy::Cache { key } = policy { - let evicted = self.registry.put(key, &snap); - for old in &evicted { - if let Err(e) = self.backend.remove_snapshot(old).await { - warn!(snapshot = %old, error = %e, "failed to remove evicted snapshot"); + if let CachingPolicy::Cache { key } = &policy { + match self.registry.put(key, &snap) { + Ok(evicted) => { + // Do NOT remove evicted images here: a cache-hit + // outcome propagates only the snapshot tag to its + // children, and a child may restore from that tag + // long after a sibling's put evicts the row (the + // child can be parked on the parallelism semaphore). + // Queue the entries; the local backend drains them + // via `cleanup_deferred_evictions` strictly after + // the whole DAG has finished. + if !evicted.is_empty() { + self.deferred_evictions + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .extend(evicted); + } + } + Err(e) => { + // The snapshot exists but could not be registered. + // Demote it to ephemeral so the scheduler removes it + // after the run (same-run children still restore from + // it for lineage) instead of orphaning it forever. + warn!( + key, + error = %e, + "failed to record snapshot in registry; demoting to ephemeral" + ); + is_ephemeral = true; } } } - Some(snap) + (Some(snap), is_ephemeral) } else { - None + (None, false) }; Ok(ExecutionResult { exit_code, snapshot, cached: false, + ephemeral_snapshot: ephemeral, }) } } @@ -156,9 +413,8 @@ impl HmVm { mod tests { use super::*; use crate::backend::Vm; - use crate::types::{NullSink, SnapshotId}; + use crate::types::{NullSink, SnapshotId, WorkspaceMount}; - use std::path::Path; use std::sync::Mutex; use async_trait::async_trait; @@ -174,6 +430,11 @@ mod tests { exit_code: i32, /// Whether `snapshot_exists` should return true. snapshot_exists: bool, + /// Artificial latency for `MockVm::exec` (lets cancellation tests + /// fire mid-execution). + exec_delay: Option, + /// Aged snapshot tags the mock GC considers removable candidates. + gc_candidates: Vec, } impl MockBackend { @@ -182,29 +443,43 @@ mod tests { calls: Arc::new(Mutex::new(Vec::new())), exit_code, snapshot_exists, + exec_delay: None, + gc_candidates: Vec::new(), } } } #[async_trait] impl VmBackend for MockBackend { - async fn create(&self, image: &str, _config: &VmConfig) -> Result> { + async fn create( + &self, + image: &str, + _config: &VmConfig, + _workspace: Option<&WorkspaceMount>, + ) -> Result> { self.calls .lock() .map_or_else(|_| {}, |mut c| c.push(format!("create:{image}"))); Ok(Box::new(MockVm { calls: Arc::clone(&self.calls), exit_code: self.exit_code, + exec_delay: self.exec_delay, })) } - async fn restore(&self, snapshot: &SnapshotId, _config: &VmConfig) -> Result> { + async fn restore( + &self, + snapshot: &SnapshotId, + _config: &VmConfig, + _workspace: Option<&WorkspaceMount>, + ) -> Result> { self.calls .lock() .map_or_else(|_| {}, |mut c| c.push(format!("restore:{snapshot}"))); Ok(Box::new(MockVm { calls: Arc::clone(&self.calls), exit_code: self.exit_code, + exec_delay: self.exec_delay, })) } @@ -223,23 +498,35 @@ mod tests { ); Ok(()) } + + async fn gc_snapshots( + &self, + _reference: &str, + _older_than: std::time::Duration, + keep: &(dyn for<'a> Fn(&'a str) -> bool + Send + Sync), + ) -> Result { + let mut removed = 0; + for tag in &self.gc_candidates { + if keep(tag) { + continue; + } + self.calls + .lock() + .map_or_else(|_| {}, |mut c| c.push(format!("gc_remove:{tag}"))); + removed += 1; + } + Ok(removed) + } } struct MockVm { calls: Arc>>, exit_code: i32, + exec_delay: Option, } #[async_trait] impl Vm for MockVm { - async fn inject(&self, host_path: &Path, guest_path: &str) -> Result<()> { - self.calls.lock().map_or_else( - |_| {}, - |mut c| c.push(format!("inject:{}:{guest_path}", host_path.display())), - ); - Ok(()) - } - async fn exec( &self, cmd: &str, @@ -250,6 +537,9 @@ mod tests { self.calls .lock() .map_or_else(|_| {}, |mut c| c.push(format!("exec:{cmd}"))); + if let Some(delay) = self.exec_delay { + tokio::time::sleep(delay).await; + } Ok(self.exit_code) } @@ -291,7 +581,10 @@ mod tests { env: vec![], working_dir: "/work".into(), timeout: None, - inject: Some(std::path::PathBuf::from("/host/src")), + workspace: Some(WorkspaceMount { + host_path: std::path::PathBuf::from("/host/src"), + guest_path: "/work".into(), + }), } } @@ -316,6 +609,7 @@ mod tests { key: "step-1".into(), }, &NullSink, + &CancellationToken::new(), ) .await .expect("execute should succeed"); @@ -326,7 +620,6 @@ mod tests { let log = calls(&backend); assert!(log.iter().any(|c| c.starts_with("create:"))); - assert!(log.iter().any(|c| c.starts_with("inject:"))); assert!(log.iter().any(|c| c.starts_with("exec:"))); assert!(log.iter().any(|c| c.starts_with("snapshot:"))); assert!(log.iter().any(|c| c == "destroy")); @@ -338,7 +631,9 @@ mod tests { let (registry, _dir) = open_temp_registry(10); // Pre-populate the registry. - registry.put("step-1", &SnapshotId::new("cached-snap")); + registry + .put("step-1", &SnapshotId::new("cached-snap")) + .expect("put"); let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); @@ -349,6 +644,7 @@ mod tests { key: "step-1".into(), }, &NullSink, + &CancellationToken::new(), ) .await .expect("execute should succeed"); @@ -371,7 +667,12 @@ mod tests { let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); let result = hm - .execute(make_action(), CachingPolicy::None, &NullSink) + .execute( + make_action(), + CachingPolicy::None, + &NullSink, + &CancellationToken::new(), + ) .await .expect("execute should succeed"); @@ -399,6 +700,7 @@ mod tests { key: "step-fail".into(), }, &NullSink, + &CancellationToken::new(), ) .await .expect("execute should succeed"); @@ -415,4 +717,347 @@ mod tests { // Registry should still be empty. assert!(hm.registry.is_empty()); } + + #[tokio::test] + async fn stale_entry_is_invalidated_and_step_reexecutes() { + // Registry has a row, but the backend image is gone (e.g. an + // out-of-band `docker rmi`). The step must re-execute and the stale + // row must be replaced -- with no remove_snapshot call, since the + // image is already absent. + let backend = MockBackend::new(0, false); + let (registry, _dir) = open_temp_registry(10); + registry + .put("step-1", &SnapshotId::new("gone-snap")) + .expect("put"); + + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + let result = hm + .execute( + make_action(), + CachingPolicy::Cache { + key: "step-1".into(), + }, + &NullSink, + &CancellationToken::new(), + ) + .await + .expect("execute should succeed"); + + assert!(!result.cached); + assert_eq!(result.exit_code, 0); + + let log = calls(&backend); + assert!(log.iter().any(|c| c.starts_with("exec:"))); + assert!(!log.iter().any(|c| c.starts_with("remove_snapshot:"))); + + // The stale row was replaced by the fresh snapshot. + assert_eq!( + hm.registry.get("step-1"), + Some(SnapshotId::new("snap-step-1")) + ); + } + + #[tokio::test] + async fn eviction_defers_snapshot_removal_until_cleanup() { + // Capacity 1: caching a second key evicts the first. The evicted + // image must NOT be removed inline -- an in-flight step may still + // restore from the evicted tag -- only when the run has drained and + // `cleanup_deferred_evictions` is invoked. + let backend = MockBackend::new(0, false); + let (registry, _dir) = open_temp_registry(1); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + for key in ["step-a", "step-b"] { + hm.execute( + make_action(), + CachingPolicy::Cache { key: key.into() }, + &NullSink, + &CancellationToken::new(), + ) + .await + .expect("execute should succeed"); + } + + // No inline removal while steps could still be in flight. + let log = calls(&backend); + assert!(!log.iter().any(|c| c.starts_with("remove_snapshot:"))); + assert_eq!(hm.registry.len(), 1); + assert!(hm.registry.get("step-b").is_some()); + + // End of run: the evicted snapshot is removed now. + // "step-a" ties with "step-b" on accessed_at; key ASC tie-break + // makes the eviction deterministic. + hm.cleanup_deferred_evictions().await; + let log = calls(&backend); + assert!(log.iter().any(|c| c == "remove_snapshot:snap-step-a")); + assert!(!log.iter().any(|c| c == "remove_snapshot:snap-step-b")); + } + + #[tokio::test] + async fn deferred_eviction_skips_reregistered_snapshot() { + // An evicted tag that is re-inserted before cleanup (same key + // rebuilt later this run, or a concurrent process) now names the + // fresh image; removing it would destroy a live cache entry. + let backend = MockBackend::new(0, false); + let (registry, _dir) = open_temp_registry(1); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + for key in ["step-a", "step-b"] { + hm.execute( + make_action(), + CachingPolicy::Cache { key: key.into() }, + &NullSink, + &CancellationToken::new(), + ) + .await + .expect("execute should succeed"); + } + // "step-a" was evicted and its removal deferred. Re-register the + // same snapshot tag (as a concurrent process or a later rebuild of + // the same key would after Docker re-tagging). The sleep gives the + // re-inserted row a strictly newer accessed_at so the capacity-1 + // eviction removes "step-b", not the fresh row (timestamps have + // one-second granularity). + std::thread::sleep(std::time::Duration::from_secs(1)); + hm.registry + .put("step-a", &SnapshotId::new("snap-step-a")) + .expect("put"); + + hm.cleanup_deferred_evictions().await; + + // The re-registered tag survives cleanup: it now names a live + // cache entry's image. + let log = calls(&backend); + assert!(!log.iter().any(|c| c == "remove_snapshot:snap-step-a")); + assert_eq!( + hm.registry.get("step-a"), + Some(SnapshotId::new("snap-step-a")) + ); + } + + #[tokio::test] + async fn remove_unless_registered_guards_live_entries() { + // A demoted-to-ephemeral `harmont-cache/*` tag that a concurrent + // run re-registered must survive the scheduler's ephemeral + // cleanup; an unregistered tag must be removed. + let backend = MockBackend::new(0, false); + let (registry, _dir) = open_temp_registry(10); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + hm.registry + .put("step-a", &SnapshotId::new("snap-live")) + .expect("put"); + + hm.remove_snapshot_unless_registered(&SnapshotId::new("snap-live")) + .await; + hm.remove_snapshot_unless_registered(&SnapshotId::new("snap-orphan")) + .await; + + let log = calls(&backend); + assert!(!log.iter().any(|c| c == "remove_snapshot:snap-live")); + assert!(log.iter().any(|c| c == "remove_snapshot:snap-orphan")); + } + + #[tokio::test] + async fn cleanup_with_no_evictions_is_a_noop() { + let backend = MockBackend::new(0, false); + let (registry, _dir) = open_temp_registry(10); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + hm.execute( + make_action(), + CachingPolicy::Cache { key: "only".into() }, + &NullSink, + &CancellationToken::new(), + ) + .await + .expect("execute should succeed"); + hm.cleanup_deferred_evictions().await; + + let log = calls(&backend); + assert!(!log.iter().any(|c| c.starts_with("remove_snapshot:"))); + } + + #[tokio::test] + async fn cache_miss_from_snapshot_passes_workspace() { + let backend = MockBackend::new(0, false); + let (registry, _dir) = open_temp_registry(10); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + let mut action = make_action(); + // Simulate child step: source is a snapshot, not an image. + action.source = ImageSource::Snapshot(SnapshotId::new("parent-snap")); + + let result = hm + .execute( + action, + CachingPolicy::Cache { + key: "child-step".into(), + }, + &NullSink, + &CancellationToken::new(), + ) + .await + .expect("execute should succeed"); + + assert_eq!(result.exit_code, 0); + assert!(!result.cached); + + let log = calls(&backend); + // Must restore from snapshot (not create from image). + assert!(log.iter().any(|c| c.starts_with("restore:parent-snap"))); + assert!(log.iter().any(|c| c.starts_with("exec:"))); + assert!(log.iter().any(|c| c.starts_with("snapshot:"))); + } + + #[tokio::test] + async fn cancellation_mid_exec_destroys_vm_before_returning() { + // Cancellation must be cooperative: execute() resolves only after + // the VM has been destroyed (so the bind-mount ownership reclaim + // has run) and reports exit 130 with nothing snapshotted or cached. + let mut backend = MockBackend::new(0, false); + backend.exec_delay = Some(std::time::Duration::from_secs(30)); + let (registry, _dir) = open_temp_registry(10); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + let cancel = CancellationToken::new(); + let trigger = cancel.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + trigger.cancel(); + }); + + let result = hm + .execute( + make_action(), + CachingPolicy::Cache { + key: "step-1".into(), + }, + &NullSink, + &cancel, + ) + .await + .expect("execute should succeed"); + + assert_eq!(result.exit_code, 130); + assert!(!result.cached); + assert!(result.snapshot.is_none()); + assert!(!result.ephemeral_snapshot); + + let log = calls(&backend); + // The exec started, was abandoned, and the VM was still destroyed + // before execute() returned. + assert!(log.iter().any(|c| c.starts_with("exec:"))); + assert!(log.iter().any(|c| c == "destroy")); + assert!(!log.iter().any(|c| c.starts_with("snapshot:"))); + assert!(hm.registry.is_empty()); + } + + #[tokio::test] + async fn pre_cancelled_token_skips_vm_creation() { + let backend = MockBackend::new(0, false); + let (registry, _dir) = open_temp_registry(10); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + let cancel = CancellationToken::new(); + cancel.cancel(); + + let result = hm + .execute( + make_action(), + CachingPolicy::Cache { + key: "step-1".into(), + }, + &NullSink, + &cancel, + ) + .await + .expect("execute should succeed"); + + assert_eq!(result.exit_code, 130); + // The biased select bails before any container is created. + let log = calls(&backend); + assert!(!log.iter().any(|c| c.starts_with("create:"))); + assert!(!log.iter().any(|c| c.starts_with("exec:"))); + } + + #[tokio::test] + async fn gc_keeps_tags_queued_for_deferred_eviction() { + // An LRU-evicted row is gone from SQLite immediately, but in-flight + // steps of this run may still restore from the evicted tag until + // the DAG drains. A GC sweep running concurrently with the run must + // therefore keep any tag in the deferred-eviction queue even though + // no registry row references it any more. + let mut backend = MockBackend::new(0, false); + backend.gc_candidates = vec!["snap-step-a".into(), "snap-orphan".into()]; + let (registry, _dir) = open_temp_registry(1); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + // Capacity 1: caching "step-b" evicts "step-a"'s row and queues + // "snap-step-a" for deferred removal. + for key in ["step-a", "step-b"] { + hm.execute( + make_action(), + CachingPolicy::Cache { key: key.into() }, + &NullSink, + &CancellationToken::new(), + ) + .await + .expect("execute should succeed"); + } + assert!(hm.registry.get("step-a").is_none()); + + let removed = hm + .gc_orphaned_snapshots("harmont-cache/*", std::time::Duration::from_secs(0)) + .await + .expect("gc should succeed"); + + // Only the true orphan goes; the deferred-eviction tag survives. + assert_eq!(removed, 1); + let log = calls(&backend); + assert!(log.iter().any(|c| c == "gc_remove:snap-orphan")); + assert!(!log.iter().any(|c| c == "gc_remove:snap-step-a")); + + // End-of-run cleanup still removes it once the DAG has drained. + hm.cleanup_deferred_evictions().await; + let log = calls(&backend); + assert!(log.iter().any(|c| c == "remove_snapshot:snap-step-a")); + } + + #[tokio::test] + async fn gc_orphans_removes_unregistered_and_keeps_registered() { + // The registry is the source of truth: an aged tag with a live row + // must survive the sweep; an aged tag without one is an orphan and + // goes. + let mut backend = MockBackend::new(0, false); + backend.gc_candidates = vec![ + "harmont-cache/live:aaaabbbbccccdddd".into(), + "harmont-cache/orphan:eeeeffff00001111".into(), + ]; + let (registry, _dir) = open_temp_registry(10); + registry + .put( + "harmont-cache/live:aaaabbbbccccdddd", + &SnapshotId::new("harmont-cache/live:aaaabbbbccccdddd"), + ) + .expect("put"); + let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default()); + + let removed = hm + .gc_orphaned_snapshots("harmont-cache/*", std::time::Duration::from_secs(0)) + .await + .expect("gc should succeed"); + + assert_eq!(removed, 1); + let log = calls(&backend); + assert!( + log.iter() + .any(|c| c == "gc_remove:harmont-cache/orphan:eeeeffff00001111") + ); + assert!( + !log.iter() + .any(|c| c == "gc_remove:harmont-cache/live:aaaabbbbccccdddd") + ); + } } diff --git a/crates/hm-vm/src/workspace.rs b/crates/hm-vm/src/workspace.rs new file mode 100644 index 00000000..8f69f4ce --- /dev/null +++ b/crates/hm-vm/src/workspace.rs @@ -0,0 +1,86 @@ +//! Host-side workspace utilities for COW build directories. + +use std::path::Path; +use std::process::Command; + +use anyhow::{Context, Result, ensure}; + +/// Create a copy-on-write clone of `src` contents into `dst`. +/// +/// macOS: `cp -cpR` (APFS clonefile; `cp -c` itself falls back to +/// `copyfile(2)` when cloning is unsupported, so no manual retry is +/// needed). Linux: `cp --reflink=auto -a` (COW on btrfs/XFS, full copy +/// on ext4). Symlinks are copied as symlinks; mode and mtime are +/// preserved (incremental build tools depend on mtimes). +/// +/// # Errors +/// +/// Returns an error if `cp` cannot be spawned or exits with a non-zero +/// status; `cp`'s stderr is captured into the error rather than leaked +/// to the terminal. +pub fn cow_copy(src: &Path, dst: &Path) -> Result<()> { + let src_dot = format!("{}/.", src.display()); + + let mut cmd = Command::new("cp"); + if cfg!(target_os = "macos") { + cmd.args(["-cpR", &src_dot]); + } else { + cmd.args(["--reflink=auto", "-a", &src_dot]); + } + cmd.arg(dst); + + let output = cmd + .output() + .with_context(|| format!("spawning cp: {} -> {}", src.display(), dst.display()))?; + + ensure!( + output.status.success(), + "cp {} -> {} exited with {}: {}", + src.display(), + dst.display(), + output.status, + String::from_utf8_lossy(&output.stderr) + ); + Ok(()) +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + use std::fs; + use tempfile::tempdir; + + #[test] + fn cow_copy_produces_independent_clone() { + let src = tempdir().unwrap(); + fs::write(src.path().join("file.txt"), "original").unwrap(); + fs::create_dir(src.path().join("sub")).unwrap(); + fs::write(src.path().join("sub/nested.txt"), "nested").unwrap(); + + let dst = tempdir().unwrap(); + cow_copy(src.path(), dst.path()).unwrap(); + + assert_eq!( + fs::read_to_string(dst.path().join("file.txt")).unwrap(), + "original" + ); + assert_eq!( + fs::read_to_string(dst.path().join("sub/nested.txt")).unwrap(), + "nested" + ); + + fs::write(dst.path().join("file.txt"), "modified").unwrap(); + assert_eq!( + fs::read_to_string(src.path().join("file.txt")).unwrap(), + "original" + ); + } + + #[test] + fn cow_copy_empty_dir() { + let src = tempdir().unwrap(); + let dst = tempdir().unwrap(); + cow_copy(src.path(), dst.path()).unwrap(); + } +} diff --git a/crates/hm/src/commands/cache/clean.rs b/crates/hm/src/commands/cache/clean.rs index b9dcb80e..a8fa4735 100644 --- a/crates/hm/src/commands/cache/clean.rs +++ b/crates/hm/src/commands/cache/clean.rs @@ -55,7 +55,8 @@ pub async fn handle_clean() -> Result { /// cache index is reset. async fn remove_registered_images(db_path: &std::path::Path) { // Capacity here is irrelevant — we only read existing rows, never insert. - let registry = match hm_vm::ImageRegistry::open(db_path, u64::MAX) { + const CAPACITY: std::num::NonZeroU64 = std::num::NonZeroU64::MAX; + let registry = match hm_vm::ImageRegistry::open(db_path, CAPACITY) { Ok(r) => r, Err(e) => { tracing::warn!(error = %e, "could not open image registry; skipping image removal"); diff --git a/crates/hm/tests/deep_cache_chain_workspace.rs b/crates/hm/tests/deep_cache_chain_workspace.rs new file mode 100644 index 00000000..5d146d27 --- /dev/null +++ b/crates/hm/tests/deep_cache_chain_workspace.rs @@ -0,0 +1,82 @@ +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +//! Verify workspace freshness across a deep cache chain. +//! +//! Pipeline: A (forever) → B (forever) → C (uncached, reads marker). +//! Two runs with different marker contents: C must see the updated +//! marker even when both A and B are cache hits. +//! +//! Requires Docker. `cargo test --test deep_cache_chain_workspace -- --ignored` + +use std::fs; +use std::path::Path; +use std::process::Command; + +use tempfile::tempdir; + +fn write_pipeline(dir: &Path, marker: &str) { + let hm = dir.join(".hm"); + fs::create_dir_all(&hm).expect("mkdir .hm"); + fs::write(dir.join("marker.txt"), marker).expect("marker.txt"); + fs::write( + hm.join("pipeline.py"), + r#" +import harmont as hm + + +@hm.pipeline("deep-cache-chain", default_image="alpine:3.20") +def build() -> hm.Step: + a = hm.scratch().sh("echo step-a", label="a", cache=hm.forever()) + b = a.sh("echo step-b", label="b", cache=hm.forever()) + return b.sh("cat /workspace/marker.txt", label="c") +"#, + ) + .expect("pipeline.py"); +} + +fn run_hm(repo: &Path) -> String { + let bin = env!("CARGO_BIN_EXE_hm"); + let out = Command::new(bin) + .args(["run", "--format", "human", "--logs"]) + .current_dir(repo) + .output() + .expect("spawn hm"); + let stdout = String::from_utf8_lossy(&out.stdout).into_owned(); + let stderr = String::from_utf8_lossy(&out.stderr).into_owned(); + assert!( + out.status.success(), + "hm run failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ); + stderr +} + +#[test] +#[ignore = "requires Docker"] +fn deep_chain_child_sees_fresh_workspace() { + let dir = tempdir().expect("tempdir"); + + write_pipeline(dir.path(), "deep-v1"); + let out1 = run_hm(dir.path()); + assert!(out1.contains("deep-v1"), "run 1 missing marker:\n{out1}"); + + fs::write(dir.path().join("marker.txt"), "deep-v2").expect("rewrite"); + + let out2 = run_hm(dir.path()); + assert!( + out2.contains("deep-v2"), + "run 2 did not see fresh workspace through deep cache chain:\n{out2}" + ); + assert!( + !out2.contains("deep-v1"), + "run 2 leaked stale workspace through deep cache chain:\n{out2}" + ); + // Freshness must come from rebasing onto current source, not from + // silently re-executing the cached ancestors. + assert!( + out2.contains("[a] cache hit"), + "run 2 expected step a to be a cache hit:\n{out2}" + ); + assert!( + out2.contains("[b] cache hit"), + "run 2 expected step b to be a cache hit:\n{out2}" + ); +} diff --git a/crates/hm/tests/local_fork_cache.rs b/crates/hm/tests/local_fork_cache.rs index 411a770d..6434f025 100644 --- a/crates/hm/tests/local_fork_cache.rs +++ b/crates/hm/tests/local_fork_cache.rs @@ -24,17 +24,18 @@ fn write_pipeline(dir: &Path, marker_contents: &str) { r#" import harmont as hm -def build(): - base = hm.scratch().run( + +@hm.pipeline("fork-cache", default_image="alpine:3.20") +def build() -> hm.Step: + base = hm.scratch().sh( "echo base-ran", label="base", cache=hm.forever(), ) - child = base.fork(label="child").run( + return base.fork(label="fork").sh( "cat /workspace/marker.txt", label="child", ) - return hm.pipeline(child, default_image="alpine:3.20") "#, ) .expect("pipeline.py"); @@ -43,7 +44,7 @@ def build(): fn run_harmont(repo: &Path) -> String { let bin = env!("CARGO_BIN_EXE_hm"); let output = Command::new(bin) - .args(["run"]) + .args(["run", "--format", "human", "--logs"]) .current_dir(repo) .output() .expect("spawn harmont"); diff --git a/crates/hm/tests/local_parallelism.rs b/crates/hm/tests/local_parallelism.rs index f18175a2..1204a79c 100644 --- a/crates/hm/tests/local_parallelism.rs +++ b/crates/hm/tests/local_parallelism.rs @@ -22,10 +22,12 @@ fn write_pipeline(dir: &Path) { r#" import harmont as hm -def build(): - a = hm.scratch().run("sleep 3", label="sleep-a") - b = hm.scratch().run("sleep 3", label="sleep-b") - return hm.pipeline(a, b, default_image="alpine:3.20") + +@hm.pipeline("parallelism", default_image="alpine:3.20") +def build() -> list[hm.Step]: + a = hm.scratch().sh("sleep 3", label="sleep-a") + b = hm.scratch().sh("sleep 3", label="sleep-b") + return [a, b] "#, ) .expect("pipeline.py");