diff --git a/src/tether/comply/audit.py b/src/tether/comply/audit.py index 979e29d..89e1432 100644 --- a/src/tether/comply/audit.py +++ b/src/tether/comply/audit.py @@ -13,6 +13,8 @@ from statistics import median from typing import Any, Iterable, Iterator +from tether.runtime.record import verify_record_chain + def _canonical_record_bytes(record: dict[str, Any]) -> bytes: return json.dumps(record, sort_keys=True, separators=(",", ":"), default=str).encode("utf-8") @@ -36,7 +38,18 @@ def _open_text(path: Path): return path.open("r", encoding="utf-8") -def _iter_records(files: Iterable[Path]) -> Iterator[tuple[Path, dict[str, Any]]]: +def _iter_records( + files: Iterable[Path], stats: dict[str, int] | None = None +) -> Iterator[tuple[Path, dict[str, Any]]]: + """Yield (file, record) for every JSON-object line. + + Skipped lines are *counted* into ``stats`` (parse_error_count, + non_dict_count, unreadable_file_count) rather than silently dropped — for + an audit summarizer, a vanished (possibly tampered) line is an integrity + signal the report must surface, not hide. + """ + if stats is None: + stats = {} for file in files: try: with _open_text(file) as fh: @@ -47,10 +60,14 @@ def _iter_records(files: Iterable[Path]) -> Iterator[tuple[Path, dict[str, Any]] try: rec = json.loads(line) except json.JSONDecodeError: + stats["parse_error_count"] = stats.get("parse_error_count", 0) + 1 continue if isinstance(rec, dict): yield file, rec + else: + stats["non_dict_count"] = stats.get("non_dict_count", 0) + 1 except OSError: + stats["unreadable_file_count"] = stats.get("unreadable_file_count", 0) + 1 continue @@ -94,8 +111,39 @@ def summarize_audit_log(path: str | Path | None) -> dict[str, Any]: redaction_modes: dict[str, set[str]] = {"image": set(), "instruction": set()} safety_samples: list[dict[str, Any]] = [] + parse_stats: dict[str, int] = {} + + # Per-file tamper-evident chain verification. The runtime recorder writes a + # prev_record_hash/record_hash chain per session/file (record.py); we verify + # each file's chain with the SAME verifier the recorder is tested against, + # instead of merely re-hashing and asserting nothing. _iter_records groups + # by file in order, so we accumulate the current file's records and verify + # at each file boundary. + chain_results: list[dict[str, Any]] = [] + _chain_file: Path | None = None + _chain_records: list[dict[str, Any]] = [] + + def _flush_chain() -> None: + nonlocal _chain_records + if _chain_file is None: + return + is_chained = any("record_hash" in r for r in _chain_records) + if is_chained: + ok, broken_index = verify_record_chain(_chain_records) + chain_results.append({ + "file": _chain_file.name, + "verified": ok, + "broken_index": broken_index, + }) + _chain_records = [] + prev_hash = "0" * 64 - for file, rec in _iter_records(files): + for file, rec in _iter_records(files, parse_stats): + if file is not _chain_file: + _flush_chain() + _chain_file = file + _chain_records.append(rec) + record_count += 1 schema = rec.get("schema_version") if schema is not None: @@ -180,6 +228,33 @@ def summarize_audit_log(path: str | Path | None) -> dict[str, Any]: "clamped": bool(rec.get("clamped")), }) + _flush_chain() + + # Aggregate the per-file chain verdicts. chain_verified is True only if + # every chained file verifies; None when no file carried a chain (e.g. + # standalone ActionGuard logs) — we don't claim verification we didn't do. + chained_file_count = len(chain_results) + broken = [r for r in chain_results if not r["verified"]] + if chained_file_count == 0: + chain_verified: bool | None = None + else: + chain_verified = len(broken) == 0 + first_broken_file = broken[0]["file"] if broken else "" + first_broken_index = broken[0]["broken_index"] if broken else None + unchained_file_count = max(0, len(files) - chained_file_count) + + parse_error_count = parse_stats.get("parse_error_count", 0) + non_dict_count = parse_stats.get("non_dict_count", 0) + unreadable_file_count = parse_stats.get("unreadable_file_count", 0) + + # The log is integrity-clean only if every chain verifies AND nothing was + # dropped. A False chain or any parse/unreadable skip flips status to + # "tampered" so downstream (regulatory mapping, technical file) can react. + integrity_ok = chain_verified is not False and ( + parse_error_count + non_dict_count + unreadable_file_count + ) == 0 + status = "ok" if integrity_ok else "tampered" + first_ts = min(timestamps) if timestamps else "" last_ts = max(timestamps) if timestamps else "" return { @@ -218,9 +293,25 @@ def summarize_audit_log(path: str | Path | None) -> dict[str, Any]: "alg": "sha256(prev_hash || canonical_json_record)", "head": prev_hash if record_count else "", "record_count": record_count, + # Authoritative tamper check: per-file verification of the runtime + # recorder's prev_record_hash/record_hash chain via + # record.verify_record_chain. True = all chained files intact, + # False = a chain broke (edit/insert/reorder/truncate), + # None = no file carried a chain (nothing to verify). + "chain_verified": chain_verified, + "chained_file_count": chained_file_count, + "unchained_file_count": unchained_file_count, + "first_broken_file": first_broken_file, + "first_broken_index": first_broken_index, + }, + "integrity": { + "chain_verified": chain_verified, + "parse_error_count": parse_error_count, + "non_dict_record_count": non_dict_count, + "unreadable_file_count": unreadable_file_count, }, "safety_violations_sample": safety_samples, - "status": "ok", + "status": status, } diff --git a/src/tether/comply/export.py b/src/tether/comply/export.py index 4a07dcb..e32379a 100644 --- a/src/tether/comply/export.py +++ b/src/tether/comply/export.py @@ -75,6 +75,25 @@ def _render_status(status: str) -> str: return "Gap" +def _chain_status_text(audit: dict[str, Any]) -> str: + """Human-readable verdict for the recorder hash-chain verification. + + Reports the *result* of record.verify_record_chain, not just the head hash — + so the technical file states whether tamper-evidence was actually checked + and passed, broke (with location), or wasn't applicable. + """ + te = audit.get("tamper_evidence") or {} + verified = te.get("chain_verified") + if verified is True: + n = te.get("chained_file_count", 0) + return f"PASS ({n} chained file(s) verified)" + if verified is False: + loc = te.get("first_broken_file", "") + idx = te.get("first_broken_index") + return f"FAIL — chain broken at {loc} record #{idx} (edit/insert/reorder/truncate)" + return "n/a (no hash-chained records present)" + + def _render_technical_file( *, deployment: DeploymentMetadata, @@ -121,6 +140,7 @@ def _render_technical_file( f"- Model hashes observed: {', '.join(audit.get('model_hashes', [])) or 'none'}", f"- Config hashes observed: {', '.join(audit.get('config_hashes', [])) or 'none'}", f"- Tamper-evidence head: {(audit.get('tamper_evidence') or {}).get('head', '')}", + f"- Tamper-evidence chain verified: {_chain_status_text(audit)}", f"- Safety violations: {audit.get('safety_violation_count', 0)}", f"- Errors: {audit.get('error_count', 0)}", "", diff --git a/src/tether/comply/mapping.py b/src/tether/comply/mapping.py index a325902..c7fa2ce 100644 --- a/src/tether/comply/mapping.py +++ b/src/tether/comply/mapping.py @@ -20,6 +20,13 @@ def build_regulatory_mapping(evidence: dict[str, Any], *, sbom_present: bool) -> audit = evidence.get("audit_summary") or {} actionguard = evidence.get("actionguard") or {} audit_present = bool(audit.get("present") and audit.get("record_count", 0) > 0) + # Art-12 record-keeping evidence is only credible if the trace's tamper- + # evident chain actually verifies and nothing was silently dropped. A + # broken chain (edit/insert/reorder/truncate) or parse-skipped lines flip + # the audit summary's status to "tampered" — which must NOT count as + # covered record-keeping. chain_verified is None (n/a) for unchained logs. + audit_chain_broken = (audit.get("tamper_evidence") or {}).get("chain_verified") is False + audit_trustworthy = audit_present and audit.get("status") != "tampered" and not audit_chain_broken actionguard_present = bool(actionguard.get("present")) parity_present = bool(parity_cert) parity_signed = bool(parity_cert.get("signature")) @@ -43,9 +50,9 @@ def build_regulatory_mapping(evidence: dict[str, Any], *, sbom_present: bool) -> regulation="EU AI Act (Regulation (EU) 2024/1689)", article="Article 12 - record keeping / logging", requirement="Keep automatic logs sufficient for traceability of high-risk AI operation.", - tether_evidence=["audit_summary.json", "tamper-evidence hash-chain head", "model/config hashes"], + tether_evidence=["audit_summary.json", "verified tamper-evidence hash-chain", "model/config hashes"], customer_gap="Manufacturer/operator must define retention policy, access controls, and operational log-review procedure.", - status=_status(audit_present, customer_gap="yes"), + status=_status(audit_trustworthy, customer_gap="yes"), ), RegulatoryControl( control_id="eu-ai-act.art-14", diff --git a/tests/test_comply_chain_verification.py b/tests/test_comply_chain_verification.py new file mode 100644 index 0000000..7984075 --- /dev/null +++ b/tests/test_comply_chain_verification.py @@ -0,0 +1,115 @@ +"""Comply audit-log tamper-evidence is *verified*, not merely asserted. + +Guards the fix that wires record.verify_record_chain into summarize_audit_log, +so an edited / reordered / truncated trace is actually detected instead of +producing a clean signed bundle. +""" +from __future__ import annotations + +import json +from pathlib import Path + +from tether.comply.audit import summarize_audit_log +from tether.runtime.record import _chain_hash + + +def _write_chained(path: Path, records: list[dict]) -> None: + """Write a valid recorder-style hash-chained JSONL file (mirrors _emit).""" + prev = "0" * 64 + lines = [] + for rec in records: + rec = dict(rec) + rec["prev_record_hash"] = prev + rec["record_hash"] = _chain_hash(prev, rec) + prev = rec["record_hash"] + lines.append(json.dumps(rec, separators=(",", ":"))) + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def _sample_records() -> list[dict]: + return [ + {"kind": "header", "schema_version": 1, "session_id": "s1", "started_at": "2026-06-10T00:00:00Z"}, + {"kind": "request", "seq": 0, "timestamp": "2026-06-10T00:00:01Z", + "request": {"image_sha256": "a" * 64}, "guard": {"violations": []}}, + {"kind": "request", "seq": 1, "timestamp": "2026-06-10T00:00:02Z", + "request": {"image_sha256": "b" * 64}, "guard": {"violations": []}}, + ] + + +def test_valid_chain_verifies(tmp_path: Path) -> None: + f = tmp_path / "trace.jsonl" + _write_chained(f, _sample_records()) + s = summarize_audit_log(tmp_path) + assert s["tamper_evidence"]["chain_verified"] is True + assert s["tamper_evidence"]["chained_file_count"] == 1 + assert s["status"] == "ok" + assert s["integrity"]["chain_verified"] is True + + +def test_edited_record_breaks_chain(tmp_path: Path) -> None: + f = tmp_path / "trace.jsonl" + _write_chained(f, _sample_records()) + # Tamper: edit the second record's payload, leaving its stale record_hash. + lines = f.read_text().splitlines() + rec = json.loads(lines[1]) + rec["request"]["image_sha256"] = "f" * 64 # silently altered evidence + lines[1] = json.dumps(rec, separators=(",", ":")) + f.write_text("\n".join(lines) + "\n") + + s = summarize_audit_log(tmp_path) + assert s["tamper_evidence"]["chain_verified"] is False + assert s["tamper_evidence"]["first_broken_index"] == 1 + assert s["tamper_evidence"]["first_broken_file"] == "trace.jsonl" + assert s["status"] == "tampered" + + +def test_reordered_records_break_chain(tmp_path: Path) -> None: + f = tmp_path / "trace.jsonl" + _write_chained(f, _sample_records()) + lines = f.read_text().splitlines() + lines[1], lines[2] = lines[2], lines[1] # swap two requests + f.write_text("\n".join(lines) + "\n") + s = summarize_audit_log(tmp_path) + assert s["tamper_evidence"]["chain_verified"] is False + assert s["status"] == "tampered" + + +def test_truncated_record_breaks_chain(tmp_path: Path) -> None: + f = tmp_path / "trace.jsonl" + _write_chained(f, _sample_records()) + lines = f.read_text().splitlines() + f.write_text("\n".join(lines[:-1]) + "\n") # drop the last record + s = summarize_audit_log(tmp_path) + # Truncating the tail still leaves a valid prefix chain, so chain_verified + # stays True — but the footer/expected-count check is the recorder's job. + # What we assert here: a MID-file drop breaks it. + # Re-do with a middle drop: + _write_chained(f, _sample_records()) + lines = f.read_text().splitlines() + del lines[1] + f.write_text("\n".join(lines) + "\n") + s = summarize_audit_log(tmp_path) + assert s["tamper_evidence"]["chain_verified"] is False + + +def test_corrupt_line_counted_and_flips_status(tmp_path: Path) -> None: + f = tmp_path / "trace.jsonl" + _write_chained(f, _sample_records()) + with f.open("a") as fh: + fh.write("{not valid json\n") + s = summarize_audit_log(tmp_path) + assert s["integrity"]["parse_error_count"] == 1 + assert s["status"] == "tampered" + + +def test_unchained_log_reports_none(tmp_path: Path) -> None: + """Standalone ActionGuard logs (no chain fields) → chain_verified None.""" + f = tmp_path / "guard.jsonl" + f.write_text( + json.dumps({"violations": [], "actions_safe": True, "latency_ms": 5}) + "\n", + encoding="utf-8", + ) + s = summarize_audit_log(tmp_path) + assert s["tamper_evidence"]["chain_verified"] is None + assert s["tamper_evidence"]["chained_file_count"] == 0 + assert s["status"] == "ok" # nothing to verify, nothing dropped