Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
fbd6b5d
test(hm-vm): add test proving HmVm injects workspace for snapshot-sou…
markovejnovic Jun 9, 2026
0856af0
fix: always inject workspace into every executing step (CLI-28)
markovejnovic Jun 9, 2026
c9ee5a1
test: add deep cache chain workspace freshness test (CLI-28)
markovejnovic Jun 9, 2026
e836fa7
docs: document deleted-file edge case in workspace injection (CLI-28)
markovejnovic Jun 9, 2026
e28d564
feat(hm-vm): add workspace COW copy utility
markovejnovic Jun 9, 2026
b1f2f18
fix(hm-vm): improve macOS COW fallback to retry on nonzero exit
markovejnovic Jun 9, 2026
c2e051b
feat(hm-vm): extend ImageRegistry to store workspace directory paths
markovejnovic Jun 9, 2026
76156d0
refactor(hm-vm): replace inject() with bind-mount workspace, persist …
markovejnovic Jun 9, 2026
9f16b9d
feat(hm-exec): wire COW workspace through runner and scheduler DAG
markovejnovic Jun 9, 2026
939c83a
feat(dsl): add predicate cache policy for user-defined invalidation
markovejnovic Jun 9, 2026
1b0bdc9
fix(dsl): resolve type checker issues in predicate cache policy
markovejnovic Jun 9, 2026
007a230
style: cargo fmt + clippy fixes across workspace
markovejnovic Jun 9, 2026
511b5fa
fix: eliminate ephemeral workspace race and review fixes
markovejnovic Jun 9, 2026
4acc789
fix: harden COW workspace lifecycle and address review findings
markovejnovic Jun 9, 2026
a7663f5
refactor: run-local workspace lineage — registry caches system snapsh…
markovejnovic Jun 10, 2026
197a98e
Merge remote-tracking branch 'origin/main' into marko/cli-28-caching-…
markovejnovic Jun 10, 2026
177320d
Merge remote-tracking branch 'origin/main' into marko/cli-28-caching-…
markovejnovic Jun 10, 2026
fc98131
Merge remote-tracking branch 'origin/main' into marko/cli-28-caching-…
markovejnovic Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions crates/hm-dsl-engine/harmont-py/harmont/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@
CacheNone,
CacheOnChange,
CachePolicy,
CachePredicate,
CacheTTL,
)
from .triggers import pull_request, push
from .triggers import pull_request as pr
from .types import Pipeline

if TYPE_CHECKING:
from collections.abc import Callable
from datetime import timedelta


Expand Down Expand Up @@ -158,6 +160,12 @@ def forever(env_keys: tuple[str, ...] = ()) -> CacheForever:
itself encodes the version (e.g. downloading a pinned binary). Do not
use for steps that fetch mutable remote resources.

On a cache hit only SYSTEM state (the snapshot) is restored — files the
step wrote into the workspace are not replayed across runs; downstream
steps see the current run's source tree instead. Write build outputs to
system paths (e.g. ``/usr/local``) or use ``on_change`` if the step's
workspace outputs must survive cache hits.

Args:
env_keys: Environment variable names whose values are folded into
the cache key. Use this when the command's behavior varies by
Expand Down Expand Up @@ -196,6 +204,28 @@ def compose(*policies: CachePolicy) -> CacheCompose:
return CacheCompose(policies=tuple(policies))


def predicate(fn: Callable[[], str]) -> CachePredicate:
"""Cache policy that invalidates when ``fn()`` returns a new value.

``fn`` is called once at plan time. Its string return value is
folded into the cache key.

Example::

import json

def package_manager():
with open("package.json") as f:
return json.load(f).get("packageManager", "")

step.run("install", cache=hm.predicate(package_manager))
"""
if not callable(fn):
msg = f"hm.predicate() requires a callable, got {type(fn).__name__}"
raise TypeError(msg)
return CachePredicate(fn=fn)


def timeout(duration: str | int | timedelta, step: Step) -> Step:
"""Apply a wall-clock timeout to a single step.

Expand Down Expand Up @@ -305,6 +335,7 @@ def group(steps: list[Step] | tuple[Step, ...]) -> tuple[Step, ...]:
"CacheNone",
"CacheOnChange",
"CachePolicy",
"CachePredicate",
"CacheTTL",
"JsProject",
"Pipeline",
Expand All @@ -324,6 +355,7 @@ def group(steps: list[Step] | tuple[Step, ...]) -> tuple[Step, ...]:
"pipeline",
"pipeline_to_json",
"pr",
"predicate",
"pull_request",
"push",
"py",
Expand Down
11 changes: 11 additions & 0 deletions crates/hm-dsl-engine/harmont-py/harmont/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
CacheNone,
CacheOnChange,
CachePolicy,
CachePredicate,
CacheTTL,
)

Expand Down Expand Up @@ -245,6 +246,16 @@ def _cache_to_dict(policy: CachePolicy) -> dict[str, Any]:
"policy": "compose",
"sub_policies": [_cache_to_dict(p) for p in policy.policies],
}
if isinstance(policy, CachePredicate):
try:
value = policy.fn() # type: ignore[operator]
except Exception as exc:
msg = f"hm.predicate() callback raised {type(exc).__name__}: {exc}"
raise ValueError(msg) from exc
if value is None:
msg = "hm.predicate() callback returned None; it must return a string"
raise ValueError(msg)
return {"policy": "predicate", "value": str(value)}
msg = f"unknown CachePolicy: {type(policy).__name__}"
raise TypeError(msg)

Expand Down
17 changes: 17 additions & 0 deletions crates/hm-dsl-engine/harmont-py/harmont/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class CacheForever(CachePolicy):
Use for pure computations whose only inputs are visible to the planner.
DO NOT use for installs that fetch the public internet — package repos
drift; manual cache busts will be needed.

On a cache hit only SYSTEM state (the snapshot) is restored — files the
step wrote into the workspace are not replayed across runs; downstream
steps see the current run's source tree instead. Write build outputs to
system paths (e.g. ``/usr/local``) or use ``on_change`` if the step's
workspace outputs must survive cache hits.
"""

env_keys: tuple[str, ...] = ()
Expand Down Expand Up @@ -78,3 +84,14 @@ class CacheCompose(CachePolicy):
"""

policies: tuple[CachePolicy, ...]


@dataclass(frozen=True)
class CachePredicate(CachePolicy):
"""Cache keyed on the return value of a user-supplied function.

The function is called at plan time. If it returns a different value
than the previous run, the cache key changes and the step re-runs.
"""

fn: object # Callable[[], str] — called at plan time
2 changes: 2 additions & 0 deletions crates/hm-dsl-engine/harmont-py/harmont/keygen.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def _resolve_policy(
for sub in subs
]
return "compose-" + _sha256_hex("".join(parts))
if kind == "predicate":
return "predicate-" + _sha256_hex(policy["value"])
msg = f"resolve-policy-key: unknown policy {kind!r}"
raise ValueError(msg)

Expand Down
129 changes: 129 additions & 0 deletions crates/hm-dsl-engine/harmont-py/tests/test_cache_predicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Tests for the ``predicate`` cache policy."""

from __future__ import annotations

import hashlib
from pathlib import Path

import harmont as hm
from harmont._pipeline import _cache_to_dict
from harmont.cache import CachePredicate
from harmont.keygen import resolve_pipeline_keys

NUL = "\x00"


def _sha256_hex(s: str) -> str:
return hashlib.sha256(s.encode("utf-8")).hexdigest()


def _make_graph(nodes, edges=None):
return {
"nodes": nodes,
"node_holes": [],
"edge_property": "directed",
"edges": edges or [],
}


# -- factory -----------------------------------------------------------------


def test_predicate_factory_returns_cache_predicate():
policy = hm.predicate(lambda: "v1")
assert isinstance(policy, CachePredicate)


# -- lowering ----------------------------------------------------------------


def test_predicate_lowering_calls_fn_and_produces_dict():
policy = hm.predicate(lambda: "v1")
d = _cache_to_dict(policy)
assert d == {"policy": "predicate", "value": "v1"}


def test_predicate_lowering_stringifies_return_value():
policy = hm.predicate(lambda: 42)
d = _cache_to_dict(policy)
assert d == {"policy": "predicate", "value": "42"}


def test_predicate_fn_is_called_at_lowering_time():
calls: list[int] = []

def counter():
calls.append(1)
return "x"

policy = hm.predicate(counter)
assert len(calls) == 0
_cache_to_dict(policy)
assert len(calls) == 1


# -- keygen ------------------------------------------------------------------


def test_predicate_keygen_produces_deterministic_key():
graph = _make_graph(
[
{
"step": {
"key": "a",
"cmd": "echo",
"cache": {"policy": "predicate", "value": "v1"},
},
"env": {},
},
]
)
out = resolve_pipeline_keys(
graph,
pipeline_org="default",
pipeline_slug="default",
now=0,
base_path=Path("/tmp"), # noqa: S108
env={},
)
policy_res = "predicate-" + _sha256_hex("v1")
expected = _sha256_hex(
"default" + NUL + "default" + NUL + "a" + NUL + "scratch" + NUL + policy_res
)
assert out["nodes"][0]["step"]["cache"]["key"] == expected


def test_different_predicate_values_produce_different_keys():
def make_graph(value):
return _make_graph(
[
{
"step": {
"key": "a",
"cmd": "echo",
"cache": {"policy": "predicate", "value": value},
},
"env": {},
},
]
)

g1 = make_graph("v1")
g2 = make_graph("v2")
resolve_pipeline_keys(
g1,
pipeline_org="o",
pipeline_slug="s",
now=0,
base_path=Path("/tmp"), # noqa: S108
env={},
)
resolve_pipeline_keys(
g2,
pipeline_org="o",
pipeline_slug="s",
now=0,
base_path=Path("/tmp"), # noqa: S108
env={},
)
assert g1["nodes"][0]["step"]["cache"]["key"] != g2["nodes"][0]["step"]["cache"]["key"]
22 changes: 20 additions & 2 deletions crates/hm-dsl-engine/harmont-ts/src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,22 @@ export interface CacheCompose {
readonly policies: readonly CachePolicy[];
}

export type CachePolicy = CacheForever | CacheTTL | CacheOnChange | CacheCompose;

export interface CachePredicate {
readonly kind: "predicate";
readonly value: string;
}

export type CachePolicy = CacheForever | CacheTTL | CacheOnChange | CacheCompose | CachePredicate;

/**
* Create a permanent cache policy, keyed on (command, parent, envKeys).
*
* On a cache hit only SYSTEM state (the snapshot) is restored — files the
* step wrote into the workspace are not replayed across runs; downstream
* steps see the current run's source tree instead. Write build outputs to
* system paths (e.g. `/usr/local`) or use `onChange` if the step's
* workspace outputs must survive cache hits.
*/
export function forever(opts?: { envKeys?: string[] }): CacheForever {
return { kind: "forever", envKeys: opts?.envKeys ?? [] };
}
Expand All @@ -39,3 +53,7 @@ export function onChange(...paths: string[]): CacheOnChange {
export function compose(...policies: CachePolicy[]): CacheCompose {
return { kind: "compose", policies };
}

export function predicate(value: string): CachePredicate {
return { kind: "predicate", value };
}
2 changes: 2 additions & 0 deletions crates/hm-dsl-engine/harmont-ts/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ export {
type CacheTTL,
type CacheOnChange,
type CacheCompose,
type CachePredicate,
forever,
ttl,
onChange,
compose,
predicate,
} from "./cache.js";
export {
type Trigger,
Expand Down
4 changes: 4 additions & 0 deletions crates/hm-dsl-engine/harmont-ts/src/keygen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ function resolvePolicy(
return "compose-" + sha256hex(parts.join(""));
}

if (policy === "predicate") {
return "predicate-" + sha256hex(cache.value as string);
}

throw new Error(`resolve-policy-key: unknown policy "${policy}"`);
}

Expand Down
2 changes: 2 additions & 0 deletions crates/hm-dsl-engine/harmont-ts/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,7 @@ function cachePolicyToDict(policy: CachePolicy): Record<string, unknown> {
policy: "compose",
sub_policies: policy.policies.map(cachePolicyToDict),
};
case "predicate":
return { policy: "predicate", value: policy.value };
}
}
15 changes: 14 additions & 1 deletion crates/hm-dsl-engine/harmont-ts/tests/cache.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { forever, ttl, onChange, compose, type CachePolicy } from "../src/cache.js";
import { forever, ttl, onChange, compose, predicate, type CachePolicy } from "../src/cache.js";

describe("forever", () => {
it("creates a forever policy with no env keys", () => {
Expand Down Expand Up @@ -42,6 +42,19 @@ describe("compose", () => {
});
});

describe("predicate", () => {
it("creates a predicate policy with the given value", () => {
const p = predicate("v1");
expect(p).toEqual({ kind: "predicate", value: "v1" });
});

it("different values produce distinct objects", () => {
const p1 = predicate("v1");
const p2 = predicate("v2");
expect(p1.value).not.toBe(p2.value);
});
});

describe("type discrimination", () => {
it("kind field enables type narrowing", () => {
const p: CachePolicy = forever();
Expand Down
Loading
Loading