Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions app/features/demo/link_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""Soft-reference liveness probes for showcase workspaces (E2, issue #408).

A workspace row records everything its run created as OPAQUE SOFT REFERENCES
(no ForeignKeys -- see ``app/features/demo/models.py``), so referenced objects
can be deleted out from under it by design. This module turns that silent
staleness into a per-workspace health signal.

The demo slice may NOT import another feature slice (vertical-slice rule), so
liveness is checked through the public HTTP API **in-process** via
``httpx.ASGITransport`` -- the exact mechanism ``pipeline._Client`` already
uses (``app/features/demo/pipeline.py``). ``raise_app_exceptions=False`` is
load-bearing: an unhandled error inside a probed endpoint must surface as a
500 *response* (classified ``unknown``), never as a re-raised exception.

Classification table:

2xx -> "alive" (the referenced object still exists)
404 -> "dead" (deleted after the run -- expected, designed)
anything else -> "unknown" (5xx, timeout, transport error -- no false alarms)

A probe NEVER raises -- a flaky slice must not 500 the health route.
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import httpx

from app.features.demo.models import WORKSPACE_STATUS_COMPLETED, ShowcaseWorkspace
from app.features.demo.schemas import (
RefHealthStatus,
RefType,
WorkspaceHealthResponse,
WorkspaceRefHealth,
)

if TYPE_CHECKING:
from fastapi import FastAPI

# Probe budget -- generous for an in-process call; a hung dependency inside a
# probed endpoint classifies as "unknown" instead of hanging the health route.
_PROBE_TIMEOUT = httpx.Timeout(10.0, connect=5.0)


@dataclass(frozen=True)
class _ProbeTarget:
"""One probeable soft reference resolved from a workspace row."""

key: str # created_objects key (list keys carry an index, e.g. "scenario_plan_ids[0]")
ref_type: RefType
ref_id: str
probe_path: str # public API path whose status code decides liveness


def build_probe_targets(ws: ShowcaseWorkspace) -> list[_ProbeTarget]:
"""Map a workspace's soft references to probeable public API paths.

Non-probeable ``created_objects`` keys (``v2_model_path``,
``scenario_artifact_key``, ``train_model_types``) are skipped -- they have
no HTTP identity to check. The E1 ``job_ids`` story slot (CONTRACT(E1)-6)
probes through ``GET /jobs/{job_id}`` when present; pre-backfill rows
where the slot is NULL are silently skipped.
"""
targets: list[_ProbeTarget] = []
objects = ws.created_objects or {}

def _str_value(key: str) -> str | None:
value = objects.get(key)
return value if isinstance(value, str) and value else None

for key in ("winning_run_id", "v2_run_id", "stale_alias_run_id"):
run_id = _str_value(key)
if run_id:
targets.append(_ProbeTarget(key, "model_run", run_id, f"/registry/runs/{run_id}"))

plan_ids = objects.get("scenario_plan_ids")
if isinstance(plan_ids, list):
for index, plan_id in enumerate(plan_ids):
if isinstance(plan_id, str) and plan_id:
targets.append(
_ProbeTarget(
f"scenario_plan_ids[{index}]",
"scenario_plan",
plan_id,
f"/scenarios/{plan_id}",
)
)

alias = _str_value("alias")
if alias:
targets.append(_ProbeTarget("alias", "alias", alias, f"/registry/aliases/{alias}"))

batch_id = _str_value("batch_id")
if batch_id:
targets.append(_ProbeTarget("batch_id", "batch", batch_id, f"/batch/{batch_id}"))

session_id = _str_value("agent_session_id")
if session_id:
targets.append(
_ProbeTarget(
"agent_session_id",
"agent_session",
session_id,
f"/agents/sessions/{session_id}",
)
)

# The ORM types job_ids as list[str], but JSONB enforces nothing at
# runtime -- treat entries as untrusted (mirrors the created_objects guards).
job_ids: list[Any] = list(ws.job_ids or [])
for index, job_id in enumerate(job_ids):
if isinstance(job_id, str) and job_id:
targets.append(_ProbeTarget(f"job_ids[{index}]", "job", job_id, f"/jobs/{job_id}"))

return targets


async def _probe_one(client: httpx.AsyncClient, target: _ProbeTarget) -> WorkspaceRefHealth:
"""Probe one reference; classify the status code. NEVER raises."""
status: RefHealthStatus
try:
response = await client.get(target.probe_path)
except (httpx.HTTPError, OSError):
status = "unknown"
else:
if 200 <= response.status_code < 300:
status = "alive"
elif response.status_code == 404:
status = "dead"
else:
status = "unknown"
return WorkspaceRefHealth(
key=target.key,
ref_type=target.ref_type,
ref_id=target.ref_id,
status=status,
probe_path=target.probe_path,
)


async def probe_workspace_links(app: FastAPI, ws: ShowcaseWorkspace) -> WorkspaceHealthResponse:
"""Probe every soft reference a workspace recorded; aggregate the counts.

Probes run concurrently. ``partial_run`` flags a row whose pipeline never
settled to ``completed`` -- its artifacts may be missing regardless of
what the probes find.

Args:
app: The live FastAPI app (``request.app`` -- the slice never imports
``app.main``).
ws: The workspace row whose references are probed.

Returns:
The per-reference results plus alive/dead/unknown counts.
"""
targets = build_probe_targets(ws)
references: list[WorkspaceRefHealth] = []
if targets:
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app, raise_app_exceptions=False),
base_url="http://demo.internal",
timeout=_PROBE_TIMEOUT,
) as client:
references = list(
await asyncio.gather(*(_probe_one(client, target) for target in targets))
)
return WorkspaceHealthResponse(
workspace_id=ws.workspace_id,
workspace_status=ws.status,
partial_run=ws.status != WORKSPACE_STATUS_COMPLETED,
references=references,
alive=sum(1 for ref in references if ref.status == "alive"),
dead=sum(1 for ref in references if ref.status == "dead"),
unknown=sum(1 for ref in references if ref.status == "unknown"),
)
101 changes: 94 additions & 7 deletions app/features/demo/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
- ``POST /demo/run`` -- synchronous; runs the whole pipeline, returns a result.
- ``WS /demo/stream`` -- streams one StepEvent per step for the live UI.
- ``GET /demo/workspaces`` -- E4 (#393): list saved workspaces.
E2 (#408): ``q`` name search, repeated ``tags`` containment,
``include_archived`` (default false), allow-listed ``sort_by``/``sort_order``;
pinned rows always order first.
- ``GET /demo/workspaces/{workspace_id}`` -- E4 (#393): one workspace's detail.
- ``GET /demo/workspaces/{workspace_id}/health`` -- E2 (#408): probe the
workspace's soft references in-process; per-ref alive/dead/unknown + counts.
- ``PATCH /demo/workspaces/{workspace_id}`` -- E1 (#407): partial lifecycle
update (rename / notes / tags / archive / pin); ``status`` is not patchable.
- ``DELETE /demo/workspaces/{workspace_id}`` -- delete the workspace METADATA
Expand Down Expand Up @@ -35,12 +40,13 @@
from app.core.database import get_db
from app.core.exceptions import ConflictError, NotFoundError
from app.core.logging import get_logger
from app.features.demo import service, workspace
from app.features.demo import link_health, service, workspace
from app.features.demo.schemas import (
DemoRunRequest,
DemoRunResult,
StepEvent,
WorkspaceDetailResponse,
WorkspaceHealthResponse,
WorkspaceListItem,
WorkspaceListResponse,
WorkspaceUpdateRequest,
Expand Down Expand Up @@ -84,26 +90,70 @@ async def run_demo_pipeline(request: Request, params: DemoRunRequest) -> DemoRun
"/workspaces",
response_model=WorkspaceListResponse,
summary="List saved showcase workspaces",
description="List saved showcase workspaces, newest first. Returns 200 + "
"an empty list when no workspaces exist.",
description=(
"List saved showcase workspaces, newest first (pinned rows always "
"order first). E2 (#408): `q` searches names case-insensitively, "
"repeated `tags` params filter by containment, archived rows are "
"hidden unless `include_archived=true`, and `sort_by`/`sort_order` "
"are allow-listed (unknown values use the default order). Returns "
"200 + an empty list when nothing matches."
),
)
async def list_showcase_workspaces(
db: AsyncSession = Depends(get_db),
limit: int = Query(default=20, ge=1, le=100, description="Maximum workspaces to return."),
offset: int = Query(default=0, ge=0, description="Number of workspaces to skip."),
q: str | None = Query(
default=None,
min_length=2,
description="Search in workspace name (case-insensitive).",
),
tags: list[str] | None = Query(
default=None,
description="Repeatable tag filter -- a workspace matches when it "
"carries every listed tag.",
),
include_archived: bool = Query(
default=False,
description="Include archived workspaces (hidden by default).",
),
sort_by: str | None = Query(
default=None,
description="Sort column: created_at, name, seed, or status. "
"Unknown values fall back to the default order (created_at desc).",
),
sort_order: str = Query(
default="desc",
pattern="^(asc|desc)$",
description="Sort direction: asc or desc.",
),
) -> WorkspaceListResponse:
"""List saved showcase workspaces (E4, issue #393).
"""List saved showcase workspaces (E4 #393; filters/sort E2 #408).

Args:
db: Async database session from dependency.
limit: Maximum workspaces to return (1-100).
offset: Number of workspaces to skip.
q: Case-insensitive name search.
tags: Repeatable tag containment filter.
include_archived: Include archived workspaces.
sort_by: Allow-listed sort column (unknown values use default order).
sort_order: Sort direction (asc or desc).

Returns:
A page of saved workspaces plus the total count.
A page of saved workspaces plus the filtered total count.
"""
rows = await workspace.list_workspaces(db, limit=limit, offset=offset)
total = await workspace.count_workspaces(db)
rows = await workspace.list_workspaces(
db,
limit=limit,
offset=offset,
q=q,
tags=tags,
include_archived=include_archived,
sort_by=sort_by,
sort_order=sort_order,
)
total = await workspace.count_workspaces(db, q=q, tags=tags, include_archived=include_archived)
return WorkspaceListResponse(
workspaces=[WorkspaceListItem.model_validate(row) for row in rows],
total=total,
Expand Down Expand Up @@ -138,6 +188,43 @@ async def get_showcase_workspace(
return WorkspaceDetailResponse.model_validate(row)


@router.get(
"/workspaces/{workspace_id}/health",
response_model=WorkspaceHealthResponse,
summary="Probe a workspace's soft-reference link health",
description=(
"Probe every soft reference the workspace recorded (model runs, "
"scenario plans, alias, batch, agent session, job ids) through the "
"public API in-process. Each reference classifies as alive (2xx), "
"dead (404 -- deleted after the run), or unknown (anything else). "
"`partial_run` flags a row whose pipeline never completed."
),
)
async def get_workspace_health(
workspace_id: str,
request: Request,
db: AsyncSession = Depends(get_db),
) -> WorkspaceHealthResponse:
"""Probe a saved workspace's soft references (E2, issue #408).

Args:
workspace_id: External identifier of the workspace.
request: The incoming request (used to obtain the live FastAPI app
for the in-process probes).
db: Async database session from dependency.

Returns:
Per-reference liveness plus aggregate counts.

Raises:
NotFoundError: When no workspace matches ``workspace_id``.
"""
row = await workspace.get_workspace(db, workspace_id)
if row is None:
raise NotFoundError(message=f"Workspace not found: {workspace_id}")
return await link_health.probe_workspace_links(request.app, row)


@router.patch(
"/workspaces/{workspace_id}",
response_model=WorkspaceDetailResponse,
Expand Down
46 changes: 46 additions & 0 deletions app/features/demo/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,49 @@ class WorkspaceListResponse(BaseModel):
..., description="Saved workspaces for the current page; empty when none."
)
total: int = Field(..., ge=0, description="Total saved workspaces.")


# E2 (#408) -- link-health classification of one probed soft reference.
RefHealthStatus = Literal["alive", "dead", "unknown"]
# E2 (#408) -- kind of soft-referenced object a workspace can record.
RefType = Literal["model_run", "scenario_plan", "alias", "batch", "agent_session", "job"]


class WorkspaceRefHealth(BaseModel):
"""Liveness of one soft reference recorded on a workspace (E2, #408).

Response model -- plain ``BaseModel``, NOT strict (``StepEvent``
precedent above; strict mode is request-body-only policy).
"""

key: str = Field(
...,
description="created_objects key, e.g. 'winning_run_id' or 'scenario_plan_ids[0]'.",
)
ref_type: RefType = Field(..., description="Kind of referenced object.")
ref_id: str = Field(..., description="The recorded soft-reference id.")
status: RefHealthStatus = Field(
..., description="alive (2xx) / dead (404) / unknown (anything else)."
)
probe_path: str = Field(..., description="The public API path probed.")


class WorkspaceHealthResponse(BaseModel):
"""Per-workspace link-health summary (E2, #408).

Response model -- plain ``BaseModel``, NOT strict.
"""

workspace_id: str = Field(..., description="The probed workspace's external id.")
workspace_status: str = Field(..., description="running / completed / failed.")
partial_run: bool = Field(
..., description="True when workspace_status != 'completed' (the run never settled)."
)
references: list[WorkspaceRefHealth] = Field(
default_factory=list,
description="Per-reference probe results; empty when nothing was recorded.",
)
alive: int = Field(..., ge=0, description="Count of references that probed alive.")
dead: int = Field(..., ge=0, description="Count of references that probed dead (404).")
unknown: int = Field(..., ge=0, description="Count of references whose probe was inconclusive.")
checked_at: datetime = Field(default_factory=_utc_now, description="When the probes ran (UTC).")
Loading