diff --git a/app/features/demo/link_health.py b/app/features/demo/link_health.py new file mode 100644 index 00000000..d3748fd4 --- /dev/null +++ b/app/features/demo/link_health.py @@ -0,0 +1,178 @@ +"""Soft-reference liveness probes for showcase workspaces (E2, issue #408). + +A workspace row records everything its run created as OPAQUE SOFT REFERENCES +(no ForeignKeys -- see ``app/features/demo/models.py``), so referenced objects +can be deleted out from under it by design. This module turns that silent +staleness into a per-workspace health signal. + +The demo slice may NOT import another feature slice (vertical-slice rule), so +liveness is checked through the public HTTP API **in-process** via +``httpx.ASGITransport`` -- the exact mechanism ``pipeline._Client`` already +uses (``app/features/demo/pipeline.py``). ``raise_app_exceptions=False`` is +load-bearing: an unhandled error inside a probed endpoint must surface as a +500 *response* (classified ``unknown``), never as a re-raised exception. + +Classification table: + + 2xx -> "alive" (the referenced object still exists) + 404 -> "dead" (deleted after the run -- expected, designed) + anything else -> "unknown" (5xx, timeout, transport error -- no false alarms) + +A probe NEVER raises -- a flaky slice must not 500 the health route. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +import httpx + +from app.features.demo.models import WORKSPACE_STATUS_COMPLETED, ShowcaseWorkspace +from app.features.demo.schemas import ( + RefHealthStatus, + RefType, + WorkspaceHealthResponse, + WorkspaceRefHealth, +) + +if TYPE_CHECKING: + from fastapi import FastAPI + +# Probe budget -- generous for an in-process call; a hung dependency inside a +# probed endpoint classifies as "unknown" instead of hanging the health route. +_PROBE_TIMEOUT = httpx.Timeout(10.0, connect=5.0) + + +@dataclass(frozen=True) +class _ProbeTarget: + """One probeable soft reference resolved from a workspace row.""" + + key: str # created_objects key (list keys carry an index, e.g. "scenario_plan_ids[0]") + ref_type: RefType + ref_id: str + probe_path: str # public API path whose status code decides liveness + + +def build_probe_targets(ws: ShowcaseWorkspace) -> list[_ProbeTarget]: + """Map a workspace's soft references to probeable public API paths. + + Non-probeable ``created_objects`` keys (``v2_model_path``, + ``scenario_artifact_key``, ``train_model_types``) are skipped -- they have + no HTTP identity to check. The E1 ``job_ids`` story slot (CONTRACT(E1)-6) + probes through ``GET /jobs/{job_id}`` when present; pre-backfill rows + where the slot is NULL are silently skipped. + """ + targets: list[_ProbeTarget] = [] + objects = ws.created_objects or {} + + def _str_value(key: str) -> str | None: + value = objects.get(key) + return value if isinstance(value, str) and value else None + + for key in ("winning_run_id", "v2_run_id", "stale_alias_run_id"): + run_id = _str_value(key) + if run_id: + targets.append(_ProbeTarget(key, "model_run", run_id, f"/registry/runs/{run_id}")) + + plan_ids = objects.get("scenario_plan_ids") + if isinstance(plan_ids, list): + for index, plan_id in enumerate(plan_ids): + if isinstance(plan_id, str) and plan_id: + targets.append( + _ProbeTarget( + f"scenario_plan_ids[{index}]", + "scenario_plan", + plan_id, + f"/scenarios/{plan_id}", + ) + ) + + alias = _str_value("alias") + if alias: + targets.append(_ProbeTarget("alias", "alias", alias, f"/registry/aliases/{alias}")) + + batch_id = _str_value("batch_id") + if batch_id: + targets.append(_ProbeTarget("batch_id", "batch", batch_id, f"/batch/{batch_id}")) + + session_id = _str_value("agent_session_id") + if session_id: + targets.append( + _ProbeTarget( + "agent_session_id", + "agent_session", + session_id, + f"/agents/sessions/{session_id}", + ) + ) + + # The ORM types job_ids as list[str], but JSONB enforces nothing at + # runtime -- treat entries as untrusted (mirrors the created_objects guards). + job_ids: list[Any] = list(ws.job_ids or []) + for index, job_id in enumerate(job_ids): + if isinstance(job_id, str) and job_id: + targets.append(_ProbeTarget(f"job_ids[{index}]", "job", job_id, f"/jobs/{job_id}")) + + return targets + + +async def _probe_one(client: httpx.AsyncClient, target: _ProbeTarget) -> WorkspaceRefHealth: + """Probe one reference; classify the status code. NEVER raises.""" + status: RefHealthStatus + try: + response = await client.get(target.probe_path) + except (httpx.HTTPError, OSError): + status = "unknown" + else: + if 200 <= response.status_code < 300: + status = "alive" + elif response.status_code == 404: + status = "dead" + else: + status = "unknown" + return WorkspaceRefHealth( + key=target.key, + ref_type=target.ref_type, + ref_id=target.ref_id, + status=status, + probe_path=target.probe_path, + ) + + +async def probe_workspace_links(app: FastAPI, ws: ShowcaseWorkspace) -> WorkspaceHealthResponse: + """Probe every soft reference a workspace recorded; aggregate the counts. + + Probes run concurrently. ``partial_run`` flags a row whose pipeline never + settled to ``completed`` -- its artifacts may be missing regardless of + what the probes find. + + Args: + app: The live FastAPI app (``request.app`` -- the slice never imports + ``app.main``). + ws: The workspace row whose references are probed. + + Returns: + The per-reference results plus alive/dead/unknown counts. + """ + targets = build_probe_targets(ws) + references: list[WorkspaceRefHealth] = [] + if targets: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app, raise_app_exceptions=False), + base_url="http://demo.internal", + timeout=_PROBE_TIMEOUT, + ) as client: + references = list( + await asyncio.gather(*(_probe_one(client, target) for target in targets)) + ) + return WorkspaceHealthResponse( + workspace_id=ws.workspace_id, + workspace_status=ws.status, + partial_run=ws.status != WORKSPACE_STATUS_COMPLETED, + references=references, + alive=sum(1 for ref in references if ref.status == "alive"), + dead=sum(1 for ref in references if ref.status == "dead"), + unknown=sum(1 for ref in references if ref.status == "unknown"), + ) diff --git a/app/features/demo/routes.py b/app/features/demo/routes.py index 87584247..deaa8ac0 100644 --- a/app/features/demo/routes.py +++ b/app/features/demo/routes.py @@ -4,7 +4,12 @@ - ``POST /demo/run`` -- synchronous; runs the whole pipeline, returns a result. - ``WS /demo/stream`` -- streams one StepEvent per step for the live UI. - ``GET /demo/workspaces`` -- E4 (#393): list saved workspaces. + E2 (#408): ``q`` name search, repeated ``tags`` containment, + ``include_archived`` (default false), allow-listed ``sort_by``/``sort_order``; + pinned rows always order first. - ``GET /demo/workspaces/{workspace_id}`` -- E4 (#393): one workspace's detail. +- ``GET /demo/workspaces/{workspace_id}/health`` -- E2 (#408): probe the + workspace's soft references in-process; per-ref alive/dead/unknown + counts. - ``PATCH /demo/workspaces/{workspace_id}`` -- E1 (#407): partial lifecycle update (rename / notes / tags / archive / pin); ``status`` is not patchable. - ``DELETE /demo/workspaces/{workspace_id}`` -- delete the workspace METADATA @@ -35,12 +40,13 @@ from app.core.database import get_db from app.core.exceptions import ConflictError, NotFoundError from app.core.logging import get_logger -from app.features.demo import service, workspace +from app.features.demo import link_health, service, workspace from app.features.demo.schemas import ( DemoRunRequest, DemoRunResult, StepEvent, WorkspaceDetailResponse, + WorkspaceHealthResponse, WorkspaceListItem, WorkspaceListResponse, WorkspaceUpdateRequest, @@ -84,26 +90,70 @@ async def run_demo_pipeline(request: Request, params: DemoRunRequest) -> DemoRun "/workspaces", response_model=WorkspaceListResponse, summary="List saved showcase workspaces", - description="List saved showcase workspaces, newest first. Returns 200 + " - "an empty list when no workspaces exist.", + description=( + "List saved showcase workspaces, newest first (pinned rows always " + "order first). E2 (#408): `q` searches names case-insensitively, " + "repeated `tags` params filter by containment, archived rows are " + "hidden unless `include_archived=true`, and `sort_by`/`sort_order` " + "are allow-listed (unknown values use the default order). Returns " + "200 + an empty list when nothing matches." + ), ) async def list_showcase_workspaces( db: AsyncSession = Depends(get_db), limit: int = Query(default=20, ge=1, le=100, description="Maximum workspaces to return."), offset: int = Query(default=0, ge=0, description="Number of workspaces to skip."), + q: str | None = Query( + default=None, + min_length=2, + description="Search in workspace name (case-insensitive).", + ), + tags: list[str] | None = Query( + default=None, + description="Repeatable tag filter -- a workspace matches when it " + "carries every listed tag.", + ), + include_archived: bool = Query( + default=False, + description="Include archived workspaces (hidden by default).", + ), + sort_by: str | None = Query( + default=None, + description="Sort column: created_at, name, seed, or status. " + "Unknown values fall back to the default order (created_at desc).", + ), + sort_order: str = Query( + default="desc", + pattern="^(asc|desc)$", + description="Sort direction: asc or desc.", + ), ) -> WorkspaceListResponse: - """List saved showcase workspaces (E4, issue #393). + """List saved showcase workspaces (E4 #393; filters/sort E2 #408). Args: db: Async database session from dependency. limit: Maximum workspaces to return (1-100). offset: Number of workspaces to skip. + q: Case-insensitive name search. + tags: Repeatable tag containment filter. + include_archived: Include archived workspaces. + sort_by: Allow-listed sort column (unknown values use default order). + sort_order: Sort direction (asc or desc). Returns: - A page of saved workspaces plus the total count. + A page of saved workspaces plus the filtered total count. """ - rows = await workspace.list_workspaces(db, limit=limit, offset=offset) - total = await workspace.count_workspaces(db) + rows = await workspace.list_workspaces( + db, + limit=limit, + offset=offset, + q=q, + tags=tags, + include_archived=include_archived, + sort_by=sort_by, + sort_order=sort_order, + ) + total = await workspace.count_workspaces(db, q=q, tags=tags, include_archived=include_archived) return WorkspaceListResponse( workspaces=[WorkspaceListItem.model_validate(row) for row in rows], total=total, @@ -138,6 +188,43 @@ async def get_showcase_workspace( return WorkspaceDetailResponse.model_validate(row) +@router.get( + "/workspaces/{workspace_id}/health", + response_model=WorkspaceHealthResponse, + summary="Probe a workspace's soft-reference link health", + description=( + "Probe every soft reference the workspace recorded (model runs, " + "scenario plans, alias, batch, agent session, job ids) through the " + "public API in-process. Each reference classifies as alive (2xx), " + "dead (404 -- deleted after the run), or unknown (anything else). " + "`partial_run` flags a row whose pipeline never completed." + ), +) +async def get_workspace_health( + workspace_id: str, + request: Request, + db: AsyncSession = Depends(get_db), +) -> WorkspaceHealthResponse: + """Probe a saved workspace's soft references (E2, issue #408). + + Args: + workspace_id: External identifier of the workspace. + request: The incoming request (used to obtain the live FastAPI app + for the in-process probes). + db: Async database session from dependency. + + Returns: + Per-reference liveness plus aggregate counts. + + Raises: + NotFoundError: When no workspace matches ``workspace_id``. + """ + row = await workspace.get_workspace(db, workspace_id) + if row is None: + raise NotFoundError(message=f"Workspace not found: {workspace_id}") + return await link_health.probe_workspace_links(request.app, row) + + @router.patch( "/workspaces/{workspace_id}", response_model=WorkspaceDetailResponse, diff --git a/app/features/demo/schemas.py b/app/features/demo/schemas.py index 66bf202b..58daf891 100644 --- a/app/features/demo/schemas.py +++ b/app/features/demo/schemas.py @@ -314,3 +314,49 @@ class WorkspaceListResponse(BaseModel): ..., description="Saved workspaces for the current page; empty when none." ) total: int = Field(..., ge=0, description="Total saved workspaces.") + + +# E2 (#408) -- link-health classification of one probed soft reference. +RefHealthStatus = Literal["alive", "dead", "unknown"] +# E2 (#408) -- kind of soft-referenced object a workspace can record. +RefType = Literal["model_run", "scenario_plan", "alias", "batch", "agent_session", "job"] + + +class WorkspaceRefHealth(BaseModel): + """Liveness of one soft reference recorded on a workspace (E2, #408). + + Response model -- plain ``BaseModel``, NOT strict (``StepEvent`` + precedent above; strict mode is request-body-only policy). + """ + + key: str = Field( + ..., + description="created_objects key, e.g. 'winning_run_id' or 'scenario_plan_ids[0]'.", + ) + ref_type: RefType = Field(..., description="Kind of referenced object.") + ref_id: str = Field(..., description="The recorded soft-reference id.") + status: RefHealthStatus = Field( + ..., description="alive (2xx) / dead (404) / unknown (anything else)." + ) + probe_path: str = Field(..., description="The public API path probed.") + + +class WorkspaceHealthResponse(BaseModel): + """Per-workspace link-health summary (E2, #408). + + Response model -- plain ``BaseModel``, NOT strict. + """ + + workspace_id: str = Field(..., description="The probed workspace's external id.") + workspace_status: str = Field(..., description="running / completed / failed.") + partial_run: bool = Field( + ..., description="True when workspace_status != 'completed' (the run never settled)." + ) + references: list[WorkspaceRefHealth] = Field( + default_factory=list, + description="Per-reference probe results; empty when nothing was recorded.", + ) + alive: int = Field(..., ge=0, description="Count of references that probed alive.") + dead: int = Field(..., ge=0, description="Count of references that probed dead (404).") + unknown: int = Field(..., ge=0, description="Count of references whose probe was inconclusive.") + checked_at: datetime = Field(default_factory=_utc_now, description="When the probes ran (UTC).") diff --git a/app/features/demo/tests/test_link_health.py b/app/features/demo/tests/test_link_health.py new file mode 100644 index 00000000..dd27f828 --- /dev/null +++ b/app/features/demo/tests/test_link_health.py @@ -0,0 +1,204 @@ +"""Unit tests for the link-health probe module (E2, issue #408). + +Probes run against a THROWAWAY FastAPI stub app -- no database, no real +slices. The stub returns 200 / 404 / 500 (and one raising endpoint) at the +probed paths so every classification branch is exercised. Workspace rows are +constructed in memory (never persisted) -- Python-side column defaults apply +at INSERT time, so every consumed field is passed explicitly. +""" + +from typing import Any + +from fastapi import FastAPI, Response + +from app.features.demo import link_health +from app.features.demo.link_health import _ProbeTarget, build_probe_targets +from app.features.demo.models import ShowcaseWorkspace + + +def _make_workspace(**overrides: Any) -> ShowcaseWorkspace: + """An in-memory (unpersisted) ShowcaseWorkspace with explicit fields.""" + base: dict[str, Any] = { + "workspace_id": "a" * 32, + "name": "e2-health", + "status": "completed", + "seed": 42, + "scenario": "demo_minimal", + "reset": False, + "skip_seed": True, + "created_objects": {}, + "job_ids": None, + } + base.update(overrides) + return ShowcaseWorkspace(**base) + + +def _stub_app() -> FastAPI: + """A throwaway ASGI app standing in for the probed public surface.""" + app = FastAPI() + + @app.get("/registry/runs/{run_id}") + def get_run(run_id: str) -> Response: + if run_id == "run-alive": + return Response(status_code=200, content="{}", media_type="application/json") + return Response(status_code=404) + + @app.get("/scenarios/{scenario_id}") + def get_scenario(scenario_id: str) -> Response: + return Response(status_code=404) + + @app.get("/registry/aliases/{alias_name}") + def get_alias(alias_name: str) -> Response: + return Response(status_code=200, content="{}", media_type="application/json") + + @app.get("/batch/{batch_id}") + def get_batch(batch_id: str) -> Response: + return Response(status_code=500) + + @app.get("/agents/sessions/{session_id}") + def get_session(session_id: str) -> Response: + raise RuntimeError("probed endpoint blew up") # -> 500 response, never re-raised + + @app.get("/jobs/{job_id}") + def get_job(job_id: str) -> Response: + if job_id == "job-alive": + return Response(status_code=200, content="{}", media_type="application/json") + return Response(status_code=404) + + return app + + +# ============================================================================= +# build_probe_targets +# ============================================================================= + + +def test_build_probe_targets_covers_every_probeable_key() -> None: + """Every probeable created_objects key + the job_ids slot map to a path.""" + ws = _make_workspace( + created_objects={ + "winning_run_id": "run-1", + "v2_run_id": "run-2", + "stale_alias_run_id": "run-3", + "scenario_plan_ids": ["sp-1", "sp-2"], + "alias": "demo-production", + "batch_id": "batch-1", + "agent_session_id": "sess-1", + # Non-probeable keys -- no HTTP identity; must be skipped. + "v2_model_path": "artifacts/models/model_x.joblib", + "scenario_artifact_key": "abc123", + "train_model_types": ["naive", "seasonal_naive"], + }, + job_ids=["job-1", "job-2"], + ) + targets = build_probe_targets(ws) + by_key = {t.key: t for t in targets} + + assert by_key["winning_run_id"].probe_path == "/registry/runs/run-1" + assert by_key["winning_run_id"].ref_type == "model_run" + assert by_key["v2_run_id"].probe_path == "/registry/runs/run-2" + assert by_key["stale_alias_run_id"].probe_path == "/registry/runs/run-3" + assert by_key["scenario_plan_ids[0]"].probe_path == "/scenarios/sp-1" + assert by_key["scenario_plan_ids[1]"].probe_path == "/scenarios/sp-2" + assert by_key["scenario_plan_ids[0]"].ref_type == "scenario_plan" + assert by_key["alias"].probe_path == "/registry/aliases/demo-production" + assert by_key["batch_id"].probe_path == "/batch/batch-1" + assert by_key["agent_session_id"].probe_path == "/agents/sessions/sess-1" + assert by_key["job_ids[0]"].probe_path == "/jobs/job-1" + assert by_key["job_ids[1]"].probe_path == "/jobs/job-2" + assert by_key["job_ids[0]"].ref_type == "job" + # 3 run ids + 2 plans + alias + batch + session + 2 jobs -- and nothing + # for the non-probeable keys. + assert len(targets) == 10 + assert not any("model_path" in t.key or "artifact" in t.key for t in targets) + + +def test_build_probe_targets_empty_objects() -> None: + """No recorded references (and a NULL job_ids slot) -> no targets.""" + assert build_probe_targets(_make_workspace()) == [] + + +def test_build_probe_targets_skips_non_string_values() -> None: + """Malformed JSONB values (non-strings, empties) are skipped, not raised.""" + ws = _make_workspace( + created_objects={ + "winning_run_id": 123, # not a str + "alias": "", # empty + "scenario_plan_ids": ["sp-1", 7, None, ""], + "batch_id": None, + }, + job_ids=["job-1", 42], + ) + targets = build_probe_targets(ws) + assert [t.key for t in targets] == ["scenario_plan_ids[0]", "job_ids[0]"] + + +# ============================================================================= +# probe_workspace_links (against the stub app) +# ============================================================================= + + +async def test_probe_classification_alive_dead_unknown() -> None: + """2xx -> alive, 404 -> dead, 5xx/exception -> unknown; counts add up.""" + ws = _make_workspace( + created_objects={ + "winning_run_id": "run-alive", # 200 -> alive + "v2_run_id": "run-gone", # 404 -> dead + "scenario_plan_ids": ["sp-gone"], # 404 -> dead + "alias": "demo-production", # 200 -> alive + "batch_id": "batch-1", # 500 -> unknown + "agent_session_id": "sess-1", # raises -> 500 response -> unknown + }, + job_ids=["job-alive", "job-gone"], # 200 + 404 + ) + health = await link_health.probe_workspace_links(_stub_app(), ws) + + by_key = {r.key: r.status for r in health.references} + assert by_key["winning_run_id"] == "alive" + assert by_key["v2_run_id"] == "dead" + assert by_key["scenario_plan_ids[0]"] == "dead" + assert by_key["alias"] == "alive" + assert by_key["batch_id"] == "unknown" + assert by_key["agent_session_id"] == "unknown" + assert by_key["job_ids[0]"] == "alive" + assert by_key["job_ids[1]"] == "dead" + + assert health.alive == 3 + assert health.dead == 3 + assert health.unknown == 2 + assert health.workspace_id == ws.workspace_id + assert health.partial_run is False + + +async def test_probe_empty_workspace_short_circuits() -> None: + """No references -> empty result, zero counts, no client construction.""" + health = await link_health.probe_workspace_links(_stub_app(), _make_workspace()) + assert health.references == [] + assert (health.alive, health.dead, health.unknown) == (0, 0, 0) + + +async def test_partial_run_flag_tracks_status() -> None: + """partial_run is True exactly when the row never reached 'completed'.""" + app = _stub_app() + for status, expected in (("completed", False), ("failed", True), ("running", True)): + health = await link_health.probe_workspace_links(app, _make_workspace(status=status)) + assert health.partial_run is expected + assert health.workspace_status == status + + +async def test_probe_transport_error_classifies_unknown() -> None: + """A transport-level failure classifies as unknown -- never raises.""" + + class _ExplodingClient: + async def get(self, _path: str) -> Response: + raise OSError("transport down") + + target = _ProbeTarget( + key="winning_run_id", + ref_type="model_run", + ref_id="run-1", + probe_path="/registry/runs/run-1", + ) + result = await link_health._probe_one(_ExplodingClient(), target) # type: ignore[arg-type] + assert result.status == "unknown" + assert result.ref_id == "run-1" diff --git a/app/features/demo/tests/test_routes.py b/app/features/demo/tests/test_routes.py index 1934c018..f5813d79 100644 --- a/app/features/demo/tests/test_routes.py +++ b/app/features/demo/tests/test_routes.py @@ -236,10 +236,10 @@ def _orm_like_row(workspace_id: str = "a" * 32, **overrides: object) -> SimpleNa async def test_list_workspaces_empty(client, monkeypatch): """E4 (#393) -- empty table yields 200 + an empty page (no 404).""" - async def fake_list(_db, *, limit: int, offset: int) -> list[SimpleNamespace]: + async def fake_list(_db, **_kwargs: object) -> list[SimpleNamespace]: return [] - async def fake_count(_db) -> int: + async def fake_count(_db, **_kwargs: object) -> int: return 0 monkeypatch.setattr(workspace, "list_workspaces", fake_list) @@ -252,14 +252,13 @@ async def fake_count(_db) -> int: async def test_list_workspaces_passes_pagination(client, monkeypatch): """E4 (#393) -- limit/offset query params reach the helper.""" - seen: dict[str, int] = {} + seen: dict[str, object] = {} - async def fake_list(_db, *, limit: int, offset: int) -> list[SimpleNamespace]: - seen["limit"] = limit - seen["offset"] = offset + async def fake_list(_db, **kwargs: object) -> list[SimpleNamespace]: + seen.update(kwargs) return [_orm_like_row()] - async def fake_count(_db) -> int: + async def fake_count(_db, **_kwargs: object) -> int: return 5 monkeypatch.setattr(workspace, "list_workspaces", fake_list) @@ -267,7 +266,8 @@ async def fake_count(_db) -> int: resp = await client.get("/demo/workspaces", params={"limit": 2, "offset": 3}) assert resp.status_code == 200 - assert seen == {"limit": 2, "offset": 3} + assert seen["limit"] == 2 + assert seen["offset"] == 3 body = resp.json() assert body["total"] == 5 assert body["workspaces"][0]["workspace_id"] == "a" * 32 @@ -283,6 +283,136 @@ async def test_list_workspaces_rejects_bad_pagination(client): assert resp.status_code == 422 +# ============================================================================= +# E2 (#408) -- list filters / sort + GET /demo/workspaces/{id}/health (unit) +# ============================================================================= + + +async def test_list_workspaces_passes_filters_and_sort(client, monkeypatch): + """E2 (#408) -- q/tags/include_archived/sort params reach BOTH helpers.""" + seen_list: dict[str, object] = {} + seen_count: dict[str, object] = {} + + async def fake_list(_db, **kwargs: object) -> list[SimpleNamespace]: + seen_list.update(kwargs) + return [] + + async def fake_count(_db, **kwargs: object) -> int: + seen_count.update(kwargs) + return 0 + + monkeypatch.setattr(workspace, "list_workspaces", fake_list) + monkeypatch.setattr(workspace, "count_workspaces", fake_count) + + resp = await client.get( + "/demo/workspaces", + params=[ + ("q", "demo"), + ("tags", "smoke"), + ("tags", "e2"), + ("include_archived", "true"), + ("sort_by", "name"), + ("sort_order", "asc"), + ], + ) + assert resp.status_code == 200 + assert seen_list["q"] == "demo" + assert seen_list["tags"] == ["smoke", "e2"] + assert seen_list["include_archived"] is True + assert seen_list["sort_by"] == "name" + assert seen_list["sort_order"] == "asc" + # The count helper gets the SAME filters -- total respects them. + assert seen_count["q"] == "demo" + assert seen_count["tags"] == ["smoke", "e2"] + assert seen_count["include_archived"] is True + + +async def test_list_workspaces_defaults_hide_archived(client, monkeypatch): + """E2 (#408) -- a legacy no-param call defaults to include_archived=False.""" + seen: dict[str, object] = {} + + async def fake_list(_db, **kwargs: object) -> list[SimpleNamespace]: + seen.update(kwargs) + return [] + + async def fake_count(_db, **_kwargs: object) -> int: + return 0 + + monkeypatch.setattr(workspace, "list_workspaces", fake_list) + monkeypatch.setattr(workspace, "count_workspaces", fake_count) + + resp = await client.get("/demo/workspaces") + assert resp.status_code == 200 + assert seen["include_archived"] is False + assert seen["q"] is None + assert seen["tags"] is None + assert seen["sort_by"] is None + + +async def test_list_workspaces_rejects_bad_sort_order(client): + """E2 (#408) -- sort_order is pattern-constrained (asc|desc only).""" + resp = await client.get("/demo/workspaces", params={"sort_order": "sideways"}) + assert resp.status_code == 422 + assert resp.headers["content-type"].startswith("application/problem+json") + + +async def test_workspace_health_404(client, monkeypatch): + """E2 (#408) -- health on a missing workspace is a 404 problem+json.""" + + async def fake_get(_db, _workspace_id: str) -> None: + return None + + monkeypatch.setattr(workspace, "get_workspace", fake_get) + + resp = await client.get("/demo/workspaces/" + "0" * 32 + "/health") + assert resp.status_code == 404 + assert resp.headers["content-type"].startswith("application/problem+json") + assert "Workspace not found" in resp.json()["detail"] + + +async def test_workspace_health_happy_path(client, monkeypatch): + """E2 (#408) -- the route resolves the row and returns the probe result.""" + from app.features.demo import link_health + from app.features.demo.schemas import WorkspaceHealthResponse, WorkspaceRefHealth + + row = _orm_like_row(status="failed") + + async def fake_get(_db, workspace_id: str) -> SimpleNamespace: + return row + + async def fake_probe(_app, ws) -> WorkspaceHealthResponse: + assert ws is row # the route passes the resolved ORM row through + return WorkspaceHealthResponse( + workspace_id="a" * 32, + workspace_status="failed", + partial_run=True, + references=[ + WorkspaceRefHealth( + key="winning_run_id", + ref_type="model_run", + ref_id="run-abc", + status="dead", + probe_path="/registry/runs/run-abc", + ) + ], + alive=0, + dead=1, + unknown=0, + ) + + monkeypatch.setattr(workspace, "get_workspace", fake_get) + monkeypatch.setattr(link_health, "probe_workspace_links", fake_probe) + + resp = await client.get("/demo/workspaces/" + "a" * 32 + "/health") + assert resp.status_code == 200 + body = resp.json() + assert body["workspace_id"] == "a" * 32 + assert body["partial_run"] is True + assert body["dead"] == 1 + assert body["references"][0]["status"] == "dead" + assert body["references"][0]["probe_path"] == "/registry/runs/run-abc" + + async def test_get_workspace_404(client, monkeypatch): """E4 (#393) -- unknown workspace_id is a 404 problem+json.""" @@ -585,3 +715,110 @@ async def test_delete_workspace_integration_keeps_created_objects(client, db_ses assert still_there.status_code == 200 finally: await client.delete(f"/agents/sessions/{agent_session_id}") + + +# ============================================================================= +# E2 (#408) -- list filters / sort + health against real Postgres (integration) +# ============================================================================= + + +@pytest.mark.integration +async def test_list_workspaces_integration_filters_and_sort(client, db_session: AsyncSession): + """Filters, sort, pinned-first ordering, and filtered totals on real rows.""" + ids: dict[str, str] = {} + # Creation order matters for the default created_at sort assertions. + for name in ("alpha-match", "beta", "zeta-pinned"): + workspace_id = await workspace.create_workspace( + DemoRunRequest.model_validate({"preservation": "keep", "workspace_name": name}) + ) + assert workspace_id is not None + ids[name] = workspace_id + unnamed = await workspace.create_workspace( + DemoRunRequest.model_validate({"preservation": "keep"}) + ) + assert unnamed is not None + + # Curate via the PATCH surface (E1): pin zeta, archive beta, tag alpha. + assert ( + await client.patch(f"/demo/workspaces/{ids['zeta-pinned']}", json={"pinned": True}) + ).status_code == 200 + assert ( + await client.patch(f"/demo/workspaces/{ids['beta']}", json={"archived": True}) + ).status_code == 200 + assert ( + await client.patch(f"/demo/workspaces/{ids['alpha-match']}", json={"tags": ["smoke", "e2"]}) + ).status_code == 200 + + # Default list: archived hidden, pinned first, then newest-first. + resp = await client.get("/demo/workspaces") + assert resp.status_code == 200 + body = resp.json() + assert body["total"] == 3 # beta (archived) excluded from the total too + listed = [w["workspace_id"] for w in body["workspaces"]] + assert ids["beta"] not in listed + assert listed == [ids["zeta-pinned"], unnamed, ids["alpha-match"]] + + # include_archived=true surfaces the archived row again. + resp = await client.get("/demo/workspaces", params={"include_archived": "true"}) + assert resp.json()["total"] == 4 + assert ids["beta"] in [w["workspace_id"] for w in resp.json()["workspaces"]] + + # q: case-insensitive name substring; total respects the filter. + resp = await client.get("/demo/workspaces", params={"q": "ALPHA"}) + body = resp.json() + assert body["total"] == 1 + assert [w["workspace_id"] for w in body["workspaces"]] == [ids["alpha-match"]] + + # tags: containment -- ALL listed tags must match. + resp = await client.get("/demo/workspaces", params=[("tags", "smoke"), ("tags", "e2")]) + assert [w["workspace_id"] for w in resp.json()["workspaces"]] == [ids["alpha-match"]] + resp = await client.get("/demo/workspaces", params=[("tags", "smoke"), ("tags", "nope")]) + assert resp.json()["total"] == 0 + + # sort_by=name asc: pinned row STILL first, unnamed row sinks (NULLS LAST). + resp = await client.get("/demo/workspaces", params={"sort_by": "name", "sort_order": "asc"}) + names = [w["name"] for w in resp.json()["workspaces"]] + assert names == ["zeta-pinned", "alpha-match", None] + + # Unknown sort_by silently falls back to the default order (no 422). + resp = await client.get("/demo/workspaces", params={"sort_by": "bogus"}) + assert resp.status_code == 200 + assert [w["workspace_id"] for w in resp.json()["workspaces"]] == [ + ids["zeta-pinned"], + unnamed, + ids["alpha-match"], + ] + + +@pytest.mark.integration +async def test_workspace_health_integration_alive_and_dead(client, db_session: AsyncSession): + """A real reference probes alive; a bogus one probes dead (E2, #408).""" + session_resp = await client.post("/agents/sessions", json={"agent_type": "experiment"}) + assert session_resp.status_code == 201 + agent_session_id = session_resp.json()["session_id"] + try: + workspace_id = await workspace.create_workspace( + DemoRunRequest.model_validate({"preservation": "keep", "workspace_name": "e2-health"}) + ) + assert workspace_id is not None + row = await workspace.get_workspace(db_session, workspace_id) + assert row is not None + row.created_objects = { + "agent_session_id": agent_session_id, + "winning_run_id": "run-dangling-never-created", + } + await db_session.commit() + + resp = await client.get(f"/demo/workspaces/{workspace_id}/health") + assert resp.status_code == 200 + body = resp.json() + by_key = {r["key"]: r["status"] for r in body["references"]} + assert by_key["agent_session_id"] == "alive" + assert by_key["winning_run_id"] == "dead" + assert body["alive"] == 1 + assert body["dead"] == 1 + assert body["unknown"] == 0 + # The row was inserted as 'running' (never finalized) -> partial run. + assert body["partial_run"] is True + finally: + await client.delete(f"/agents/sessions/{agent_session_id}") diff --git a/app/features/demo/workspace.py b/app/features/demo/workspace.py index 0af35a50..364b64fd 100644 --- a/app/features/demo/workspace.py +++ b/app/features/demo/workspace.py @@ -17,8 +17,11 @@ ``GET /demo/workspaces/{workspace_id}`` in ``app/features/demo/routes.py``; :func:`delete_workspace` backs ``DELETE /demo/workspaces/{workspace_id}``; :func:`update_workspace` backs ``PATCH /demo/workspaces/{workspace_id}`` -(E1, #407). The request-scoped helpers take a caller-owned session and raise -normally -- the warn-and-continue contract is pipeline-only. +(E1, #407). E2 (#408) adds server-side list filters (``q`` name search, +``tags`` containment, ``include_archived``) and an allow-listed sort with +unconditional pinned-first ordering. The request-scoped helpers take a +caller-owned session and raise normally -- the warn-and-continue contract is +pipeline-only. """ from __future__ import annotations @@ -26,8 +29,9 @@ import uuid from typing import TYPE_CHECKING, Any -from sqlalchemy import func, select +from sqlalchemy import Select, func, select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import InstrumentedAttribute from app.core.database import get_session_maker from app.core.logging import get_logger @@ -45,6 +49,43 @@ logger = get_logger(__name__) +# E2 (#408) -- allow-listed sort columns for GET /demo/workspaces. sort_by is +# user input; unknown values fall back to the default order (created_at desc) +# rather than erroring (dimensions precedent, app/features/dimensions/service.py). +_SORT_COLUMNS: dict[str, InstrumentedAttribute[Any]] = { + "created_at": ShowcaseWorkspace.created_at, + "name": ShowcaseWorkspace.name, + "seed": ShowcaseWorkspace.seed, + "status": ShowcaseWorkspace.status, +} + + +def _apply_filters[SelectT: Select[Any]]( + stmt: SelectT, + *, + q: str | None = None, + tags: list[str] | None = None, + include_archived: bool = False, +) -> SelectT: + """Apply the E2 list filters to a select statement. + + Shared by :func:`list_workspaces` and :func:`count_workspaces` so the + page's ``total`` always respects the active filters (scenarios precedent: + ``app/features/scenarios/service.py`` applies the same ``.where`` chain to + both the count and rows statements). + """ + if not include_archived: + stmt = stmt.where(ShowcaseWorkspace.archived.is_(False)) + if q: + # Case-insensitive name search (dimensions ILIKE precedent). NAME only + # -- workspace_id prefixes are copy-paste handles, not search terms. + stmt = stmt.where(ShowcaseWorkspace.name.ilike(f"%{q}%")) + if tags: + # JSONB @> containment -- a workspace matches when it carries every + # listed tag (scenario_plan.tags precedent; GIN-indexed since E1 #407). + stmt = stmt.where(ShowcaseWorkspace.tags.contains(tags)) + return stmt + async def create_workspace(req: DemoRunRequest) -> str | None: """Insert a ``running`` workspace row for a ``preservation="keep"`` run. @@ -217,20 +258,44 @@ async def list_workspaces( *, limit: int = 50, offset: int = 0, + q: str | None = None, + tags: list[str] | None = None, + include_archived: bool = False, + sort_by: str | None = None, + sort_order: str = "desc", ) -> list[ShowcaseWorkspace]: - """List workspace rows, newest first (tie-broken by id, descending). + """List workspace rows with E2 (#408) filters; pinned rows always first. + + Default order is newest first (tie-broken by id, descending). ``sort_by`` + is allow-listed (created_at / name / seed / status); unknown values fall + back to the default order. ``name`` sorts NULLS LAST so unnamed rows sink. + Pinned rows order first regardless of the active sort. Args: db: An open async session (caller-owned). limit: Maximum rows to return. - offset: Rows to skip from the newest end. + offset: Rows to skip from the sorted front. + q: Case-insensitive name search (ILIKE substring). + tags: Tag containment filter -- a row must carry every listed tag. + include_archived: Include archived rows (hidden by default). + sort_by: Allow-listed sort column; unknown values use the default order. + sort_order: Sort direction ("asc" or "desc"). Returns: - The matching rows, newest first. + The matching rows in the requested order. """ + sort_column = _SORT_COLUMNS.get(sort_by) if sort_by else None + if sort_column is not None: + order_expr = sort_column.desc() if sort_order == "desc" else sort_column.asc() + if sort_by == "name": + order_expr = order_expr.nulls_last() + else: + order_expr = ShowcaseWorkspace.created_at.desc() + stmt = _apply_filters( + select(ShowcaseWorkspace), q=q, tags=tags, include_archived=include_archived + ) result = await db.execute( - select(ShowcaseWorkspace) - .order_by(ShowcaseWorkspace.created_at.desc(), ShowcaseWorkspace.id.desc()) + stmt.order_by(ShowcaseWorkspace.pinned.desc(), order_expr, ShowcaseWorkspace.id.desc()) .limit(limit) .offset(offset) ) @@ -262,14 +327,31 @@ async def delete_workspace(db: AsyncSession, workspace_id: str) -> bool: return True -async def count_workspaces(db: AsyncSession) -> int: - """Count all workspace rows (E4, issue #393). +async def count_workspaces( + db: AsyncSession, + *, + q: str | None = None, + tags: list[str] | None = None, + include_archived: bool = False, +) -> int: + """Count workspace rows matching the active filters (E4 #393, E2 #408). + + Applies the SAME filter chain as :func:`list_workspaces` (via + :func:`_apply_filters`) so a filtered page's ``total`` stays honest. Args: db: An open async session (caller-owned). + q: Case-insensitive name search (ILIKE substring). + tags: Tag containment filter -- a row must carry every listed tag. + include_archived: Include archived rows (hidden by default). Returns: - The total number of saved workspaces. + The number of saved workspaces matching the filters. """ - count_stmt = select(func.count()).select_from(ShowcaseWorkspace) + count_stmt = _apply_filters( + select(func.count()).select_from(ShowcaseWorkspace), + q=q, + tags=tags, + include_archived=include_archived, + ) return int(await db.scalar(count_stmt) or 0) diff --git a/docs/_base/API_CONTRACTS.md b/docs/_base/API_CONTRACTS.md index 47bc7b6e..70e6f5ab 100644 --- a/docs/_base/API_CONTRACTS.md +++ b/docs/_base/API_CONTRACTS.md @@ -60,8 +60,9 @@ All endpoints serve JSON; error responses use `application/problem+json` (RFC 78 | seeder | POST | `/seeder/phase2-enrichment` | PRP-38 — run Phase 2 generators (lifecycle, replenishment, exogenous, returns) against the existing seeded data. `422 application/problem+json` on an empty database. | | demo | POST | `/demo/run` | Run the end-to-end demo pipeline in-process; returns a `DemoRunResult`. `409 application/problem+json` if a run is already active. **PRP-38** — body accepts an Optional `scenario: 'demo_minimal' \| 'showcase_rich' \| 'sparse'` field; default `'demo_minimal'` (back-compat). **E1 (#390)** — body accepts additive Optional `preservation: 'ephemeral' \| 'keep'` (default `'ephemeral'`, today's no-row behavior) and `workspace_name: str \| null` (pattern `^[a-z0-9][a-z0-9\-_]*$`, ≤100 chars); `workspace_name` without `preservation='keep'` → `422 application/problem+json`. `preservation='keep'` records the run as a `showcase_workspace` row; `DemoRunResult` gains an additive Optional `workspace_id: str \| null`. **E2 (#391)** — `scenario` accepts all 8 `ScenarioPreset` values (`retail_standard` / `holiday_rush` / `high_variance` / `stockout_heavy` / `new_launches` / `sparse` / `demo_minimal` / `showcase_rich`); only `showcase_rich` changes the step table (24 rows), every other preset runs the legacy 11-row flow. **E1 (#407)** — body accepts additive Optional `replayed_from_workspace_id: str \| null` (`^[0-9a-f]{32}$`); requires `preservation='keep'` (else `422 application/problem+json`); recorded verbatim on the new `showcase_workspace` row as a SOFT reference (no existence check — dangles are designed). | | demo | WS | `/demo/stream` | Stream one `StepEvent` per pipeline step for the live Showcase page | -| demo | GET | `/demo/workspaces` | **E4 (#393)** — list saved showcase workspaces, newest first (`limit` 1-100 default 20 / `offset`); `200` + empty list on an empty table. **E1 (#407)** — list items additively carry `archived`, `pinned`, `tags`, `replayed_from_workspace_id`; archived rows still list (default-filtering is E2 #408) | +| demo | GET | `/demo/workspaces` | **E4 (#393)** — list saved showcase workspaces, newest first (`limit` 1-100 default 20 / `offset`); `200` + empty list on an empty table. **E1 (#407)** — list items additively carry `archived`, `pinned`, `tags`, `replayed_from_workspace_id`. **E2 (#408)** — additive query params: `q` (name ILIKE search, min 2 chars), repeated `tags` (JSONB containment — all listed tags must match), `include_archived` (default `false` — archived rows are now HIDDEN by default), allow-listed `sort_by` (`created_at`/`name`/`seed`/`status`; unknown → default `created_at desc`, no 422) + `sort_order` (`asc`/`desc`); pinned rows always order first; `total` respects the active filters | | demo | GET | `/demo/workspaces/{workspace_id}` | **E4 (#393)** — full workspace row incl. `created_objects` soft references + grain/window columns; `404 application/problem+json` when missing. **E1 (#407)** — response additively carries the list-item lifecycle fields plus `notes`, `config_schema_version`, and the six story slots (`seed_overrides` / `user_scope` / `approval_events` / `rag_events` / `job_ids` / `phase_summaries` — all `null` until their writer epic lands; schemas in `docs/_base/DOMAIN_MODEL.md`) | +| demo | GET | `/demo/workspaces/{workspace_id}/health` | **E2 (#408)** — probe the workspace's soft references in-process (model runs, scenario plans, alias, batch, agent session, `job_ids` slot) via `httpx.ASGITransport`; per-reference `status` ∈ `alive` (2xx) / `dead` (404 — deleted after the run) / `unknown` (anything else — never a 500), plus `alive`/`dead`/`unknown` counts and `partial_run` (true when the row's status ≠ `completed`); non-probeable keys (`v2_model_path`, `scenario_artifact_key`, `train_model_types`) are skipped; `404 application/problem+json` when the workspace is missing | | demo | PATCH | `/demo/workspaces/{workspace_id}` | **E1 (#407)** — partial lifecycle update (`name` / `notes` / `tags` / `archived` / `pinned`; `exclude_unset` semantics — only provided fields change; explicit `null` clears `name`/`notes`; explicit `null` on `archived`/`pinned`/`tags` → `422` (send `[]` to clear tags); `status` NOT patchable — the pipeline owns it); returns the updated `WorkspaceDetailResponse`; empty body = `200` no-op; `404 application/problem+json` when missing; `422` on unknown keys / bad name pattern / >20 tags | | demo | DELETE | `/demo/workspaces/{workspace_id}` | Delete one saved workspace METADATA row; `204` on success, `404 application/problem+json` when missing. The run's created objects (model runs, scenario plans, aliases, jobs, artifacts) are soft references and are NOT deleted | | config | GET | `/config/ai` | Effective AI-model config (agent LLM + RAG embeddings); API keys masked, never raw | @@ -98,7 +99,7 @@ Drives the end-to-end demo pipeline for the dashboard Showcase page. Verified ag - PRP-38 — `scenario="showcase_rich"` extends the data phase with `phase2_enrichment` + `historical_backfill` steps and the modeling phase with `v2_train` (one V2 `prophet_like` run). Phase ids are `data` / `modeling` / `decision` / `verify` / `agent` / `cleanup` (6 phases). - PRP-40 — `scenario="showcase_rich"` ALSO adds two phases inserted BEFORE `verify`: `planning` (2 steps — `scenario_simulate_and_save`, `multi_plan_compare`) and `knowledge` (3 steps — `embedding_provider_probe`, `rag_index_subset`, `rag_retrieve_probe`). Total step count: 19 for `showcase_rich`, 11 for `demo_minimal` and `sparse`. Phase ids on `showcase_rich` are `data` / `modeling` / `decision` / `planning` / `knowledge` / `verify` / `agent` / `cleanup` (8 phases). The knowledge steps SKIP gracefully when the embedding provider is unreachable; the pipeline still goes green. - E3 (#392) — the planning-phase steps tag the plans they save: pipeline-saved plans now carry `source:showcase` (alongside the legacy `showcase` + `price`/`holiday` tags), and on `preservation="keep"` runs additionally `workspace:` — retrievable via `GET /scenarios?tags=workspace: