Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 94 additions & 3 deletions src/tether/comply/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from statistics import median
from typing import Any, Iterable, Iterator

from tether.runtime.record import verify_record_chain


def _canonical_record_bytes(record: dict[str, Any]) -> bytes:
return json.dumps(record, sort_keys=True, separators=(",", ":"), default=str).encode("utf-8")
Expand All @@ -36,7 +38,18 @@ def _open_text(path: Path):
return path.open("r", encoding="utf-8")


def _iter_records(files: Iterable[Path]) -> Iterator[tuple[Path, dict[str, Any]]]:
def _iter_records(
files: Iterable[Path], stats: dict[str, int] | None = None
) -> Iterator[tuple[Path, dict[str, Any]]]:
"""Yield (file, record) for every JSON-object line.

Skipped lines are *counted* into ``stats`` (parse_error_count,
non_dict_count, unreadable_file_count) rather than silently dropped — for
an audit summarizer, a vanished (possibly tampered) line is an integrity
signal the report must surface, not hide.
"""
if stats is None:
stats = {}
for file in files:
try:
with _open_text(file) as fh:
Expand All @@ -47,10 +60,14 @@ def _iter_records(files: Iterable[Path]) -> Iterator[tuple[Path, dict[str, Any]]
try:
rec = json.loads(line)
except json.JSONDecodeError:
stats["parse_error_count"] = stats.get("parse_error_count", 0) + 1
continue
if isinstance(rec, dict):
yield file, rec
else:
stats["non_dict_count"] = stats.get("non_dict_count", 0) + 1
except OSError:
stats["unreadable_file_count"] = stats.get("unreadable_file_count", 0) + 1
continue


Expand Down Expand Up @@ -94,8 +111,39 @@ def summarize_audit_log(path: str | Path | None) -> dict[str, Any]:
redaction_modes: dict[str, set[str]] = {"image": set(), "instruction": set()}
safety_samples: list[dict[str, Any]] = []

parse_stats: dict[str, int] = {}

# Per-file tamper-evident chain verification. The runtime recorder writes a
# prev_record_hash/record_hash chain per session/file (record.py); we verify
# each file's chain with the SAME verifier the recorder is tested against,
# instead of merely re-hashing and asserting nothing. _iter_records groups
# by file in order, so we accumulate the current file's records and verify
# at each file boundary.
chain_results: list[dict[str, Any]] = []
_chain_file: Path | None = None
_chain_records: list[dict[str, Any]] = []

def _flush_chain() -> None:
nonlocal _chain_records
if _chain_file is None:
return
is_chained = any("record_hash" in r for r in _chain_records)
if is_chained:
ok, broken_index = verify_record_chain(_chain_records)
chain_results.append({
"file": _chain_file.name,
"verified": ok,
"broken_index": broken_index,
})
_chain_records = []

prev_hash = "0" * 64
for file, rec in _iter_records(files):
for file, rec in _iter_records(files, parse_stats):
if file is not _chain_file:
_flush_chain()
_chain_file = file
_chain_records.append(rec)

record_count += 1
schema = rec.get("schema_version")
if schema is not None:
Expand Down Expand Up @@ -180,6 +228,33 @@ def summarize_audit_log(path: str | Path | None) -> dict[str, Any]:
"clamped": bool(rec.get("clamped")),
})

_flush_chain()

# Aggregate the per-file chain verdicts. chain_verified is True only if
# every chained file verifies; None when no file carried a chain (e.g.
# standalone ActionGuard logs) — we don't claim verification we didn't do.
chained_file_count = len(chain_results)
broken = [r for r in chain_results if not r["verified"]]
if chained_file_count == 0:
chain_verified: bool | None = None
else:
chain_verified = len(broken) == 0
first_broken_file = broken[0]["file"] if broken else ""
first_broken_index = broken[0]["broken_index"] if broken else None
unchained_file_count = max(0, len(files) - chained_file_count)

parse_error_count = parse_stats.get("parse_error_count", 0)
non_dict_count = parse_stats.get("non_dict_count", 0)
unreadable_file_count = parse_stats.get("unreadable_file_count", 0)

# The log is integrity-clean only if every chain verifies AND nothing was
# dropped. A False chain or any parse/unreadable skip flips status to
# "tampered" so downstream (regulatory mapping, technical file) can react.
integrity_ok = chain_verified is not False and (
parse_error_count + non_dict_count + unreadable_file_count
) == 0
status = "ok" if integrity_ok else "tampered"

first_ts = min(timestamps) if timestamps else ""
last_ts = max(timestamps) if timestamps else ""
return {
Expand Down Expand Up @@ -218,9 +293,25 @@ def summarize_audit_log(path: str | Path | None) -> dict[str, Any]:
"alg": "sha256(prev_hash || canonical_json_record)",
"head": prev_hash if record_count else "",
"record_count": record_count,
# Authoritative tamper check: per-file verification of the runtime
# recorder's prev_record_hash/record_hash chain via
# record.verify_record_chain. True = all chained files intact,
# False = a chain broke (edit/insert/reorder/truncate),
# None = no file carried a chain (nothing to verify).
"chain_verified": chain_verified,
"chained_file_count": chained_file_count,
"unchained_file_count": unchained_file_count,
"first_broken_file": first_broken_file,
"first_broken_index": first_broken_index,
},
"integrity": {
"chain_verified": chain_verified,
"parse_error_count": parse_error_count,
"non_dict_record_count": non_dict_count,
"unreadable_file_count": unreadable_file_count,
},
"safety_violations_sample": safety_samples,
"status": "ok",
"status": status,
}


Expand Down
20 changes: 20 additions & 0 deletions src/tether/comply/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,25 @@ def _render_status(status: str) -> str:
return "Gap"


def _chain_status_text(audit: dict[str, Any]) -> str:
"""Human-readable verdict for the recorder hash-chain verification.

Reports the *result* of record.verify_record_chain, not just the head hash —
so the technical file states whether tamper-evidence was actually checked
and passed, broke (with location), or wasn't applicable.
"""
te = audit.get("tamper_evidence") or {}
verified = te.get("chain_verified")
if verified is True:
n = te.get("chained_file_count", 0)
return f"PASS ({n} chained file(s) verified)"
if verified is False:
loc = te.get("first_broken_file", "")
idx = te.get("first_broken_index")
return f"FAIL — chain broken at {loc} record #{idx} (edit/insert/reorder/truncate)"
return "n/a (no hash-chained records present)"


def _render_technical_file(
*,
deployment: DeploymentMetadata,
Expand Down Expand Up @@ -121,6 +140,7 @@ def _render_technical_file(
f"- Model hashes observed: {', '.join(audit.get('model_hashes', [])) or 'none'}",
f"- Config hashes observed: {', '.join(audit.get('config_hashes', [])) or 'none'}",
f"- Tamper-evidence head: {(audit.get('tamper_evidence') or {}).get('head', '')}",
f"- Tamper-evidence chain verified: {_chain_status_text(audit)}",
f"- Safety violations: {audit.get('safety_violation_count', 0)}",
f"- Errors: {audit.get('error_count', 0)}",
"",
Expand Down
11 changes: 9 additions & 2 deletions src/tether/comply/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ def build_regulatory_mapping(evidence: dict[str, Any], *, sbom_present: bool) ->
audit = evidence.get("audit_summary") or {}
actionguard = evidence.get("actionguard") or {}
audit_present = bool(audit.get("present") and audit.get("record_count", 0) > 0)
# Art-12 record-keeping evidence is only credible if the trace's tamper-
# evident chain actually verifies and nothing was silently dropped. A
# broken chain (edit/insert/reorder/truncate) or parse-skipped lines flip
# the audit summary's status to "tampered" — which must NOT count as
# covered record-keeping. chain_verified is None (n/a) for unchained logs.
audit_chain_broken = (audit.get("tamper_evidence") or {}).get("chain_verified") is False
audit_trustworthy = audit_present and audit.get("status") != "tampered" and not audit_chain_broken
actionguard_present = bool(actionguard.get("present"))
parity_present = bool(parity_cert)
parity_signed = bool(parity_cert.get("signature"))
Expand All @@ -43,9 +50,9 @@ def build_regulatory_mapping(evidence: dict[str, Any], *, sbom_present: bool) ->
regulation="EU AI Act (Regulation (EU) 2024/1689)",
article="Article 12 - record keeping / logging",
requirement="Keep automatic logs sufficient for traceability of high-risk AI operation.",
tether_evidence=["audit_summary.json", "tamper-evidence hash-chain head", "model/config hashes"],
tether_evidence=["audit_summary.json", "verified tamper-evidence hash-chain", "model/config hashes"],
customer_gap="Manufacturer/operator must define retention policy, access controls, and operational log-review procedure.",
status=_status(audit_present, customer_gap="yes"),
status=_status(audit_trustworthy, customer_gap="yes"),
),
RegulatoryControl(
control_id="eu-ai-act.art-14",
Expand Down
115 changes: 115 additions & 0 deletions tests/test_comply_chain_verification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Comply audit-log tamper-evidence is *verified*, not merely asserted.

Guards the fix that wires record.verify_record_chain into summarize_audit_log,
so an edited / reordered / truncated trace is actually detected instead of
producing a clean signed bundle.
"""
from __future__ import annotations

import json
from pathlib import Path

from tether.comply.audit import summarize_audit_log
from tether.runtime.record import _chain_hash


def _write_chained(path: Path, records: list[dict]) -> None:
"""Write a valid recorder-style hash-chained JSONL file (mirrors _emit)."""
prev = "0" * 64
lines = []
for rec in records:
rec = dict(rec)
rec["prev_record_hash"] = prev
rec["record_hash"] = _chain_hash(prev, rec)
prev = rec["record_hash"]
lines.append(json.dumps(rec, separators=(",", ":")))
path.write_text("\n".join(lines) + "\n", encoding="utf-8")


def _sample_records() -> list[dict]:
return [
{"kind": "header", "schema_version": 1, "session_id": "s1", "started_at": "2026-06-10T00:00:00Z"},
{"kind": "request", "seq": 0, "timestamp": "2026-06-10T00:00:01Z",
"request": {"image_sha256": "a" * 64}, "guard": {"violations": []}},
{"kind": "request", "seq": 1, "timestamp": "2026-06-10T00:00:02Z",
"request": {"image_sha256": "b" * 64}, "guard": {"violations": []}},
]


def test_valid_chain_verifies(tmp_path: Path) -> None:
f = tmp_path / "trace.jsonl"
_write_chained(f, _sample_records())
s = summarize_audit_log(tmp_path)
assert s["tamper_evidence"]["chain_verified"] is True
assert s["tamper_evidence"]["chained_file_count"] == 1
assert s["status"] == "ok"
assert s["integrity"]["chain_verified"] is True


def test_edited_record_breaks_chain(tmp_path: Path) -> None:
f = tmp_path / "trace.jsonl"
_write_chained(f, _sample_records())
# Tamper: edit the second record's payload, leaving its stale record_hash.
lines = f.read_text().splitlines()
rec = json.loads(lines[1])
rec["request"]["image_sha256"] = "f" * 64 # silently altered evidence
lines[1] = json.dumps(rec, separators=(",", ":"))
f.write_text("\n".join(lines) + "\n")

s = summarize_audit_log(tmp_path)
assert s["tamper_evidence"]["chain_verified"] is False
assert s["tamper_evidence"]["first_broken_index"] == 1
assert s["tamper_evidence"]["first_broken_file"] == "trace.jsonl"
assert s["status"] == "tampered"


def test_reordered_records_break_chain(tmp_path: Path) -> None:
f = tmp_path / "trace.jsonl"
_write_chained(f, _sample_records())
lines = f.read_text().splitlines()
lines[1], lines[2] = lines[2], lines[1] # swap two requests
f.write_text("\n".join(lines) + "\n")
s = summarize_audit_log(tmp_path)
assert s["tamper_evidence"]["chain_verified"] is False
assert s["status"] == "tampered"


def test_truncated_record_breaks_chain(tmp_path: Path) -> None:
f = tmp_path / "trace.jsonl"
_write_chained(f, _sample_records())
lines = f.read_text().splitlines()
f.write_text("\n".join(lines[:-1]) + "\n") # drop the last record
s = summarize_audit_log(tmp_path)
# Truncating the tail still leaves a valid prefix chain, so chain_verified
# stays True — but the footer/expected-count check is the recorder's job.
# What we assert here: a MID-file drop breaks it.
# Re-do with a middle drop:
_write_chained(f, _sample_records())
lines = f.read_text().splitlines()
del lines[1]
f.write_text("\n".join(lines) + "\n")
s = summarize_audit_log(tmp_path)
assert s["tamper_evidence"]["chain_verified"] is False


def test_corrupt_line_counted_and_flips_status(tmp_path: Path) -> None:
f = tmp_path / "trace.jsonl"
_write_chained(f, _sample_records())
with f.open("a") as fh:
fh.write("{not valid json\n")
s = summarize_audit_log(tmp_path)
assert s["integrity"]["parse_error_count"] == 1
assert s["status"] == "tampered"


def test_unchained_log_reports_none(tmp_path: Path) -> None:
"""Standalone ActionGuard logs (no chain fields) → chain_verified None."""
f = tmp_path / "guard.jsonl"
f.write_text(
json.dumps({"violations": [], "actions_safe": True, "latency_ms": 5}) + "\n",
encoding="utf-8",
)
s = summarize_audit_log(tmp_path)
assert s["tamper_evidence"]["chain_verified"] is None
assert s["tamper_evidence"]["chained_file_count"] == 0
assert s["status"] == "ok" # nothing to verify, nothing dropped
Loading