Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 114 additions & 2 deletions src/decision/policy/stop_nudge.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,117 @@ def _check_never_surfaced(state: SessionState) -> str | None:
return None # Never break Claude Code


def _scan_assistant_decisions(state: SessionState) -> list[str]:
"""Scan the session transcript for decision language in assistant messages.

Reads the tail of the JSONL transcript, parses assistant text blocks, and
applies the same corroboration requirements as capture_nudge. Returns a list
of matched decision phrases (max 3).
"""
from ..utils.constants import TRANSCRIPT_MAX_BLOCKS, TRANSCRIPT_TAIL_BYTES
from ..utils.helpers import _discover_transcript
from .capture_nudge import (
_DECISION_PHRASE,
_REASONING_SIGNAL,
_has_nearby_technical,
_is_false_positive,
)

try:
path = _discover_transcript()
if path is None:
return []

# Read the tail of the file to bound scan time
file_size = path.stat().st_size
with open(path, encoding="utf-8", errors="replace") as f:
if file_size > TRANSCRIPT_TAIL_BYTES:
f.seek(file_size - TRANSCRIPT_TAIL_BYTES)
f.readline() # discard partial first line
lines = f.readlines()

# Extract text blocks from assistant messages
texts: list[str] = []
for line in reversed(lines):
if len(texts) >= TRANSCRIPT_MAX_BLOCKS:
break
try:
obj = json.loads(line)
except (json.JSONDecodeError, ValueError):
continue
if obj.get("type") != "assistant":
continue
content = obj.get("message", {}).get("content", [])
if not isinstance(content, list):
continue
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if text:
texts.append(text)

if not texts:
return []

# Deduplicate against phrases already detected by capture_nudge
already_detected = state.load_data("_capture-nudge-pending").lower()

phrases: list[str] = []
seen: set[str] = set()
for text in texts:
text_lower = text.lower()
match_iter = list(_DECISION_PHRASE.finditer(text_lower))
if not match_iter:
continue

real_matches = [m for m in match_iter if not _is_false_positive(text_lower, m.end())]
if not real_matches:
continue

# Corroboration: same bar as capture_nudge neutral context
has_nearby_tech = any(_has_nearby_technical(text, m.start(), m.end()) for m in real_matches)
has_reasoning = bool(_REASONING_SIGNAL.search(text_lower))
has_multiple = len(set(m.group(0) for m in real_matches)) >= 2

if not (has_nearby_tech or has_reasoning or has_multiple):
continue

for m in real_matches:
phrase = m.group(0)
if phrase in seen:
continue
# Skip if capture_nudge already detected this phrase from user input
if already_detected and phrase in already_detected:
continue
seen.add(phrase)
phrases.append(phrase)
if len(phrases) >= 3:
return phrases

return phrases
except Exception as exc:
print(f"decision: _scan_assistant_decisions error: {exc}", file=sys.stderr)
return []


def _assistant_decision_summary(state: SessionState) -> str | None:
"""Check for decision language in assistant messages via transcript scanning."""
if state.nudges_dismissed():
return None

store = state.get_store()
if state.has_recent_decisions(store.decisions_dir):
return None

phrases = _scan_assistant_decisions(state)
if not phrases:
return None

if len(phrases) == 1:
return f'Assistant stated a choice ("{phrases[0]}") — write to `.claude/decisions/` to preserve context'
return f"Assistant stated {len(phrases)} uncaptured choices — write to `.claude/decisions/` to preserve context"


def _stop_nudge_condition(data: dict[str, Any], state: SessionState) -> PolicyResult | None:
"""Show a compact one-line summary at session end. Never a wall of text."""
# Persist surfacing analytics before building summary
Expand Down Expand Up @@ -240,9 +351,10 @@ def _stop_nudge_condition(data: dict[str, Any], state: SessionState) -> PolicyRe
suppress_coaching = _should_suppress_coaching()

# Pick the single highest-priority secondary hint (one sentence max).
# Priority: impl session > plan session > staleness > never-surfaced.
# Priority: assistant scan > impl session > plan session > staleness > never-surfaced.
secondary_msg: str | None = None
if not suppress_coaching:
secondary_msg = _assistant_decision_summary(state)
if secondary_msg is None and not suppress_coaching:
secondary_msg = _impl_session_summary(state)
if secondary_msg is None and not suppress_coaching:
secondary_msg = _plan_session_summary(state)
Expand Down
4 changes: 4 additions & 0 deletions src/decision/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@
# Session activity
MAX_SESSION_EDITS = 500 # Cap unique file paths tracked per session

# Transcript scanning (assistant decision detection at Stop time)
TRANSCRIPT_TAIL_BYTES = 65_536 # Read last 64KB of JSONL
TRANSCRIPT_MAX_BLOCKS = 20 # Max assistant text blocks to scan

# ── Type aliases ─────────────────────────────────────────────────────

StrPath = Union[str, Path]
9 changes: 9 additions & 0 deletions src/decision/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ def _file_lock(lock_path: Path) -> Generator[None, None, None]:
fcntl.flock(fd, fcntl.LOCK_UN)


def _discover_transcript(session_id: str | None = None) -> Path | None:
"""Return the session JSONL transcript path, or None if unavailable."""
sid = session_id or os.environ.get("CLAUDE_SESSION_ID")
if not sid:
return None
path = Path.home() / ".claude" / "projects" / _project_key() / f"{sid}.jsonl"
return path if path.is_file() else None


def _path_to_keywords(path: str) -> str:
"""Extract searchable words from a file path."""
parts = Path(path).parts
Expand Down
163 changes: 163 additions & 0 deletions tests/test_stop_nudge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,31 @@
import json
import os
import time
from unittest.mock import patch

import decision
from conftest import make_session_state, make_decision, make_store


def _write_jsonl(path, entries):
"""Write a list of dicts as JSONL lines."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
for entry in entries:
f.write(json.dumps(entry) + "\n")


def _assistant_entry(text):
"""Build a minimal assistant JSONL entry with a text block."""
return {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": text}],
},
}


# ── stop-nudge tests ──────────────────────────────────────────────


Expand Down Expand Up @@ -476,3 +496,146 @@ def test_stop_nudge_cleans_up_session_dir(tmp_path):

# Session dir should be removed after stop
assert not state._dir.is_dir()


# ── Assistant transcript scanning ──────────────────────────────────


def test_assistant_scan_detects_decision_phrase(tmp_path):
"""Transcript scan detects decision language with corroboration in assistant text."""
_, store = make_store(tmp_path)
from decision.policy.stop_nudge import _scan_assistant_decisions

jsonl_path = tmp_path / "session.jsonl"
_write_jsonl(jsonl_path, [
_assistant_entry("Going with `serde_json::Value` for tool inputs because they vary wildly."),
])

state = make_session_state("ascan-detect", store=store)

with patch("decision.utils.helpers._discover_transcript", return_value=jsonl_path):
phrases = _scan_assistant_decisions(state)

assert len(phrases) >= 1
assert "going with" in phrases[0]


def test_assistant_scan_requires_corroboration(tmp_path):
"""Bare decision phrase without tech/reasoning signal is not detected."""
_, store = make_store(tmp_path)
from decision.policy.stop_nudge import _scan_assistant_decisions

jsonl_path = tmp_path / "session.jsonl"
# "going with" but no technical signal, no reasoning signal
_write_jsonl(jsonl_path, [
_assistant_entry("Going with the simpler option here."),
])

state = make_session_state("ascan-nocorr", store=store)

with patch("decision.utils.helpers._discover_transcript", return_value=jsonl_path):
phrases = _scan_assistant_decisions(state)

assert phrases == []


def test_assistant_scan_skips_user_detected(tmp_path):
"""Phrases already detected by capture_nudge are deduplicated."""
_, store = make_store(tmp_path)
from decision.policy.stop_nudge import _scan_assistant_decisions

jsonl_path = tmp_path / "session.jsonl"
_write_jsonl(jsonl_path, [
_assistant_entry("Going with `serde_json` because it's the standard."),
])

state = make_session_state("ascan-dedup", store=store)
state.mark_fired("_capture-nudge-pending")
state.store_data("_capture-nudge-pending", "going with")

with patch("decision.utils.helpers._discover_transcript", return_value=jsonl_path):
phrases = _scan_assistant_decisions(state)

assert phrases == []


def test_assistant_scan_no_transcript(tmp_path):
"""Graceful empty result when transcript doesn't exist."""
_, store = make_store(tmp_path)
from decision.policy.stop_nudge import _scan_assistant_decisions

state = make_session_state("ascan-none", store=store)

with patch("decision.utils.helpers._discover_transcript", return_value=None):
phrases = _scan_assistant_decisions(state)

assert phrases == []


def test_assistant_scan_corrupt_jsonl(tmp_path):
"""Malformed JSON lines are skipped gracefully."""
_, store = make_store(tmp_path)
from decision.policy.stop_nudge import _scan_assistant_decisions

jsonl_path = tmp_path / "session.jsonl"
jsonl_path.write_text(
"{broken json!!!\n"
+ json.dumps(_assistant_entry("Going with `Redis` because it's faster.")) + "\n"
+ "another broken line\n"
)

state = make_session_state("ascan-corrupt", store=store)

with patch("decision.utils.helpers._discover_transcript", return_value=jsonl_path):
phrases = _scan_assistant_decisions(state)

assert len(phrases) >= 1
assert "going with" in phrases[0]


def test_assistant_scan_respects_dismissed(tmp_path):
"""No assistant scan when nudges are dismissed."""
_, store = make_store(tmp_path)
from decision.policy.stop_nudge import _assistant_decision_summary

state = make_session_state("ascan-dismissed", store=store)
state.mark_nudges_dismissed()

result = _assistant_decision_summary(state)
assert result is None


def test_assistant_scan_in_stop_nudge(tmp_path):
"""End-to-end: assistant decision phrase appears in stop-nudge output."""
_, store = make_store(tmp_path)
from decision.policy.stop_nudge import _stop_nudge_condition

jsonl_path = tmp_path / "session.jsonl"
_write_jsonl(jsonl_path, [
_assistant_entry("I chose `PostgreSQL` over SQLite because we need concurrent writes."),
])

state = make_session_state("ascan-e2e", store=store)
state.record_edit("src/db/connection.py")

with patch("decision.utils.helpers._discover_transcript", return_value=jsonl_path):
result = _stop_nudge_condition({}, state)

assert result is not None
assert "assistant stated" in result.system_message.lower()


def test_assistant_scan_silent_when_decisions_captured(tmp_path):
"""No assistant scan nudge when decisions were already captured this session."""
decisions_dir, store = make_store(tmp_path)
from decision.policy.stop_nudge import _assistant_decision_summary

# Simulate a decision being captured after session start
f = make_decision(decisions_dir, "test-captured")
future = time.time() + 1
os.utime(f, (future, future))

state = make_session_state("ascan-captured", store=store)

result = _assistant_decision_summary(state)
assert result is None