Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions app/features/demo/hitl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""HITL decision relay for the showcase pipeline (E5, issue #411).

A single-slot, in-memory store that lets the Showcase HITL step card relay an
operator's Approve/Reject decision back to the in-flight pipeline. The browser
POSTs ``/demo/hitl-decision`` (demo slice); :func:`resolve` records the decision
and wakes the waiting step, which then forwards the real decision to the agents
HITL gate (``POST /agents/sessions/{id}/approve`` with ``approved=true|false``).

This is module-level mutable state. It is SAFE because
``app.features.demo.service._pipeline_lock`` enforces exactly one pipeline per
process, and ``step_agent_hitl_flow`` registers at most one pending action per
run (precedent for module-level demo state: the lock itself, ``service.py:19``).
Defensive anyway: :func:`register` overwrites any stale slot from a crashed run
so the next run can never wedge, and the step clears the slot in a ``finally``.
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass, field
from typing import Literal

from app.core.logging import get_logger

logger = get_logger(__name__)

Decision = Literal["approved", "rejected"]
ResolveOutcome = Literal["applied", "already_decided", "not_found"]


@dataclass
class _PendingDecision:
"""The one open decision window, or ``None`` when no step is awaiting."""

action_id: str
event: asyncio.Event = field(default_factory=asyncio.Event)
decision: Decision | None = None
reason: str | None = None


_slot: _PendingDecision | None = None # module-level; one pipeline at a time


def register(action_id: str) -> None:
"""Open the decision window for ``action_id``.

Overwrites any stale slot left by a crashed run so a wedged slot can never
block the next run. Called by ``step_agent_hitl_flow`` immediately before it
yields the intermediate ``awaiting_approval`` event.
"""
global _slot
_slot = _PendingDecision(action_id=action_id)


def resolve(action_id: str, decision: Decision, reason: str | None = None) -> ResolveOutcome:
"""Record the operator's decision; called by ``POST /demo/hitl-decision``.

Returns ``"not_found"`` when no window is open for ``action_id`` (nothing
pending under that id), ``"already_decided"`` when a decision already
landed, and ``"applied"`` on success.
"""
if _slot is None or _slot.action_id != action_id:
return "not_found"
if _slot.decision is not None:
return "already_decided"
_slot.decision = decision
_slot.reason = reason
_slot.event.set()
logger.info("demo.hitl_decision_resolved", action_id=action_id, decision=decision)
return "applied"


async def wait_for_decision(action_id: str, timeout: float) -> tuple[Decision, str | None] | None:
"""Block up to ``timeout`` seconds for an operator decision.

Returns the ``(decision, reason)`` pair when the operator decided in time,
or ``None`` when the window lapsed (the caller then auto-approves) or when
no slot is open for ``action_id`` (defensive -- the step always registers
first).
"""
if _slot is None or _slot.action_id != action_id:
return None
try:
await asyncio.wait_for(_slot.event.wait(), timeout=timeout)
except TimeoutError:
return None
if _slot.decision is None: # defensive: event set without a decision
return None
return (_slot.decision, _slot.reason)


def clear() -> None:
"""Close the decision window (called from the step's ``finally``)."""
global _slot
_slot = None
27 changes: 20 additions & 7 deletions app/features/demo/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,12 @@ class ShowcaseWorkspace(TimestampMixin, Base):
)
# Version of the workspace config + story-slot schema (umbrella #406
# junk-drawer mitigation). Bump the ORM default when a slot shape changes.
# E5 (#411) bumped 1 -> 2: it widened the approval_events.decision enum
# (+timed_out), added "probe" to rag_events.event, and added the additive
# entry keys documented below. server_default stays text("1") -- no
# migration; old rows legitimately read 1.
config_schema_version: Mapped[int] = mapped_column(
Integer, nullable=False, default=1, server_default=text("1")
Integer, nullable=False, default=2, server_default=text("1")
)

# ── E1 (#407) — replay provenance ─────────────────────────────────────
Expand All @@ -152,13 +156,22 @@ class ShowcaseWorkspace(TimestampMixin, Base):
# user_scope (E3 #409 writes) — dict: operator-selected focus,
# {"store_id": int, "product_id": int} (additive keys
# allowed later).
# approval_events (E5 #411 writes) — list[dict], append-only:
# approval_events (E5 #411 writes — schema v2) — list[dict], append-only:
# {"action_id": str, "tool_name": str,
# "decision": "approved"|"rejected",
# "decided_at": iso8601-str, "session_id": str}.
# rag_events (E5 #411 writes) — list[dict], append-only:
# {"event": "index"|"retrieve"|"skip", "detail": str,
# "count": int, "occurred_at": iso8601-str}.
# "decision": "approved"|"rejected"|"timed_out",
# "decided_at": iso8601-str, "session_id": str,
# # v2 additive (config_schema_version >= 2):
# "auto_approved": bool, "reason": str|None,
# "execution_status": str|None,
# "tool_call_summary": {"description": str,
# "arguments_keys": list[str]},
# "transcript_summary": str, "tokens_used": int,
# "tool_calls_count": int}.
# rag_events (E5 #411 writes — schema v2) — list[dict], append-only:
# {"event": "probe"|"index"|"retrieve"|"skip",
# "status": "pass"|"warn"|"skip", "detail": str,
# "count": int, "occurred_at": iso8601-str,
# "provider": str|None, "reachable": bool|None}.
# job_ids (later parallel epic) — list[str]: job / batch
# sub-job ids the run submitted (soft references).
# phase_summaries (later parallel epic) — list[dict], one per phase:
Expand Down
Loading