Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
666 changes: 666 additions & 0 deletions PRPs/PRP-showcase-workspace-E1-persistence-backbone.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from app.features.batch import models as batch_models # noqa: F401
from app.features.config import models as config_models # noqa: F401
from app.features.data_platform import models as data_platform_models # noqa: F401
from app.features.demo import models as demo_models # noqa: F401
from app.features.explainability import models as explainability_models # noqa: F401
from app.features.jobs import models as jobs_models # noqa: F401
from app.features.model_selection import models as model_selection_models # noqa: F401
Expand Down
103 changes: 103 additions & 0 deletions alembic/versions/324a2fa37fcc_create_showcase_workspace_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""create showcase_workspace table

Revision ID: 324a2fa37fcc
Revises: e4f5a6b7c8d9
Create Date: 2026-06-12 10:00:00.000000

E1 of the showcase-workspace initiative (umbrella #389, epic #390). First
table owned by the demo slice: one row per preserved showcase run -- its
configuration (replay inputs) plus the soft-reference ids of every object the
pipeline created. Deliberately NO ForeignKey to ``model_run`` /
``scenario_plan`` / ``batch_job`` / ``agent_session`` -- recorded ids are
opaque soft references so cross-slice schema coupling stays zero and the
referenced rows remain independently deletable.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "324a2fa37fcc"
down_revision: str | None = "e4f5a6b7c8d9"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
"""Apply migration -- create the showcase_workspace table."""
op.create_table(
"showcase_workspace",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("workspace_id", sa.String(length=32), nullable=False),
sa.Column("name", sa.String(length=100), nullable=True),
sa.Column("status", sa.String(length=20), nullable=False),
sa.Column("seed", sa.Integer(), nullable=False),
sa.Column("scenario", sa.String(length=40), nullable=False),
sa.Column("reset", sa.Boolean(), nullable=False),
sa.Column("skip_seed", sa.Boolean(), nullable=False),
sa.Column("store_id", sa.Integer(), nullable=True),
sa.Column("product_id", sa.Integer(), nullable=True),
sa.Column("date_start", sa.Date(), nullable=True),
sa.Column("date_end", sa.Date(), nullable=True),
sa.Column(
"created_objects",
postgresql.JSONB(astext_type=sa.Text()),
server_default=sa.text("'{}'::jsonb"),
nullable=False,
),
sa.Column("result_summary", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.CheckConstraint(
"status IN ('running', 'completed', 'failed')",
name="ck_showcase_workspace_status",
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_showcase_workspace_workspace_id"),
"showcase_workspace",
["workspace_id"],
unique=True,
)
op.create_index(
op.f("ix_showcase_workspace_name"),
"showcase_workspace",
["name"],
unique=False,
)
op.create_index(
op.f("ix_showcase_workspace_status"),
"showcase_workspace",
["status"],
unique=False,
)
op.create_index(
"ix_showcase_workspace_status_created",
"showcase_workspace",
["status", "created_at"],
unique=False,
)


def downgrade() -> None:
"""Revert migration -- drop the showcase_workspace table."""
op.drop_index("ix_showcase_workspace_status_created", table_name="showcase_workspace")
op.drop_index(op.f("ix_showcase_workspace_status"), table_name="showcase_workspace")
op.drop_index(op.f("ix_showcase_workspace_name"), table_name="showcase_workspace")
op.drop_index(op.f("ix_showcase_workspace_workspace_id"), table_name="showcase_workspace")
op.drop_table("showcase_workspace")
89 changes: 89 additions & 0 deletions app/features/demo/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Showcase workspace ORM model.

First table owned by the demo slice (precedent: ``app/features/batch/models.py``).
A row = one preserved showcase run: its configuration (replay inputs) plus the
ids of every object the pipeline created. All recorded ids are OPAQUE SOFT
REFERENCES -- deliberately NO ForeignKey to ``model_run`` / ``scenario_plan`` /
``batch_job`` / ``agent_session``: a cross-slice FK would couple the demo
slice's schema to four other slices and break independent deletion (e.g.
``DELETE /registry/runs/{id}`` must keep working while a workspace row still
references the run). E1 of the showcase-workspace initiative (umbrella #389,
epic #390).

GOTCHA: SQLAlchemy reserves the declarative attribute name ``metadata``; the
JSONB columns are therefore named ``created_objects`` and ``result_summary``.
"""

from __future__ import annotations

import datetime as _dt
from typing import Any

from sqlalchemy import CheckConstraint, Date, Index, Integer, String, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column

from app.core.database import Base
from app.shared.models import TimestampMixin

# Workspace lifecycle states -- guarded by a CHECK constraint. ``running`` is
# written at creation (before the first pipeline step executes); the finalize
# hook settles the row to ``completed`` or ``failed``.
WORKSPACE_STATUS_RUNNING = "running"
WORKSPACE_STATUS_COMPLETED = "completed"
WORKSPACE_STATUS_FAILED = "failed"


class ShowcaseWorkspace(TimestampMixin, Base):
"""A preserved showcase run.

Attributes:
id: Surrogate primary key.
workspace_id: Unique external identifier (UUID hex, 32 chars).
name: Optional human label from ``DemoRunRequest.workspace_name``.
status: Lifecycle state -- running / completed / failed (CHECK-constrained).
seed: Seeder seed the run was started with (replay input).
scenario: Seeder scenario preset value (replay input).
reset: Whether the run wiped the database before seeding (replay input).
skip_seed: Whether the run skipped the seed step (replay input).
store_id: Showcase grain store id; NULL when the run failed early.
product_id: Showcase grain product id; NULL when the run failed early.
date_start: Seeded data window start; NULL when unknown.
date_end: Seeded data window end; NULL when unknown.
created_objects: Soft-reference ids of everything the run created (JSONB).
result_summary: Winner / WAPE / wall-clock display payload (JSONB).
"""

__tablename__ = "showcase_workspace"

id: Mapped[int] = mapped_column(Integer, primary_key=True)
workspace_id: Mapped[str] = mapped_column(String(32), unique=True, index=True)
name: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True)
status: Mapped[str] = mapped_column(
String(20), default=WORKSPACE_STATUS_RUNNING, nullable=False, index=True
)
# Run configuration -- replay inputs (E4 restore/replay reads these verbatim).
seed: Mapped[int] = mapped_column(Integer, nullable=False)
scenario: Mapped[str] = mapped_column(String(40), nullable=False)
reset: Mapped[bool] = mapped_column(nullable=False, default=False)
skip_seed: Mapped[bool] = mapped_column(nullable=False, default=True)
# Grain + window discovered by the status/seed steps (NULL on early failure).
store_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
product_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
date_start: Mapped[_dt.date | None] = mapped_column(Date, nullable=True)
date_end: Mapped[_dt.date | None] = mapped_column(Date, nullable=True)
# Everything the run created -- flexible JSONB of soft references (see the
# module docstring for the deliberate no-FK decision).
created_objects: Mapped[dict[str, Any]] = mapped_column(
JSONB, nullable=False, default=dict, server_default=text("'{}'::jsonb")
)
# winner_model_type / winner_wape / wall_clock_s -- display payload.
result_summary: Mapped[dict[str, Any] | None] = mapped_column(JSONB, nullable=True)

__table_args__ = (
CheckConstraint(
"status IN ('running', 'completed', 'failed')",
name="ck_showcase_workspace_status",
),
Index("ix_showcase_workspace_status_created", "status", "created_at"),
)
19 changes: 19 additions & 0 deletions app/features/demo/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from app.core.config import get_settings
from app.core.logging import get_logger
from app.core.problem_details import EMBEDDING_AUTH_CODE, ERROR_TYPES
from app.features.demo import workspace
from app.features.demo.schemas import DemoRunRequest, StepEvent, StepStatus
from app.shared.seeder.config import ScenarioPreset

Expand Down Expand Up @@ -254,6 +255,9 @@ class DemoContext:
# step_agent_hitl_flow on SHOWCASE_RICH. Remain None on every other path.
approval_action_id: str | None = None
agent_approval_decision: str | None = None # "executed"|"rejected"|"expired"|"timed_out"
# E1 (#390) -- workspace persistence. Set only on preservation="keep" runs
# (and only when the row insert succeeded); None on ephemeral runs.
workspace_id: str | None = None


# =============================================================================
Expand Down Expand Up @@ -2585,6 +2589,11 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE
reset=req.reset,
scenario=req.scenario,
)
# E1 (#390) -- create the workspace row BEFORE the first step executes so
# even an early failure records the run config. create_workspace is
# warn-and-continue: a DB failure returns None and the run proceeds.
if req.preservation == "keep":
ctx.workspace_id = await workspace.create_workspace(req)
wall_start = time.monotonic()
any_fail = False
# PRP-41 — buffer for intermediate events the HITL step emits via
Expand Down Expand Up @@ -2668,6 +2677,13 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE
break

wall = time.monotonic() - wall_start
# E1 (#390) -- settle the workspace row BEFORE the final yield so the
# mid-run-failure path records partial created_objects too.
# finalize_workspace is warn-and-continue: it never raises.
if ctx.workspace_id is not None:
await workspace.finalize_workspace(
ctx.workspace_id, ctx, failed=any_fail, wall_clock_s=wall
)
yield StepEvent(
event_type="pipeline_complete",
step_name="summary",
Expand All @@ -2687,5 +2703,8 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE
# PRP-38 — expose the V2 run id when set so the Inspect deep
# link can target /explorer/runs/{v2_run_id}.
"v2_run_id": ctx.v2_run_id,
# E1 (#390) -- additive; a string on preservation='keep' runs,
# None otherwise (legacy clients ignore unknown keys).
"workspace_id": ctx.workspace_id,
},
)
40 changes: 35 additions & 5 deletions app/features/demo/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import UTC, datetime
from typing import Any, Literal

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, model_validator

from app.shared.seeder.config import ScenarioPreset

Expand All @@ -29,10 +29,12 @@ def _utc_now() -> datetime:
class DemoRunRequest(BaseModel):
"""Request body for ``POST /demo/run`` and the ``WS /demo/stream`` start frame.

Every field is JSON-native (``int`` / ``bool``), so ``ConfigDict(strict=True)``
is safe with no ``Field(strict=False)`` override -- there is no
``date`` / ``datetime`` / ``UUID`` / ``Decimal`` field (see
``.claude/rules/security-patterns.md`` and ``test_strict_mode_policy.py``).
Every field is JSON-native (``int`` / ``bool`` / ``str`` / ``Literal``), so
``ConfigDict(strict=True)`` is safe with no ``Field(strict=False)``
override -- there is no ``date`` / ``datetime`` / ``UUID`` / ``Decimal``
field (see ``.claude/rules/security-patterns.md`` and
``test_strict_mode_policy.py``). The sole exception is ``scenario``, whose
enum-on-the-wire form carries its own override (PRP-38).
"""

model_config = ConfigDict(strict=True)
Expand All @@ -59,6 +61,28 @@ class DemoRunRequest(BaseModel):
strict=False,
description="Seeder scenario preset that drives the pipeline shape.",
)
# E1 (#390): preservation policy. Default "ephemeral" keeps legacy
# behaviour byte-identical (no workspace row). Both new fields are
# JSON-native (Literal[str] / str), so the model-level ``strict=True``
# needs no per-field override.
preservation: Literal["ephemeral", "keep"] = Field(
default="ephemeral",
description="'keep' records this run as a showcase_workspace row.",
)
workspace_name: str | None = Field(
default=None,
max_length=100,
# Same pattern as the registry alias_name (registry/schemas.py).
pattern=r"^[a-z0-9][a-z0-9\-_]*$",
description="Optional workspace label; requires preservation='keep'.",
)

@model_validator(mode="after")
def _workspace_name_requires_keep(self) -> DemoRunRequest:
"""Reject a workspace_name on a run that does not keep a workspace."""
if self.workspace_name is not None and self.preservation != "keep":
raise ValueError("workspace_name requires preservation='keep'")
return self


class StepEvent(BaseModel):
Expand Down Expand Up @@ -134,3 +158,9 @@ class DemoRunResult(BaseModel):
default=0.0,
description="Total pipeline wall-clock in seconds.",
)
# E1 (#390): additive Optional field mirroring ``winning_run_id`` --
# ``None`` on ephemeral runs, the workspace_id on preservation='keep' runs.
workspace_id: str | None = Field(
default=None,
description="showcase_workspace id recorded for this run, if kept.",
)
2 changes: 2 additions & 0 deletions app/features/demo/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ async def run_pipeline_sync(app: FastAPI, req: DemoRunRequest) -> DemoRunResult:
winning_run_id=final.data.get("winning_run_id"),
alias=final.data.get("alias"),
wall_clock_s=float(wall_clock) if isinstance(wall_clock, (int, float)) else 0.0,
# E1 (#390) -- additive; mirrors the winning_run_id passthrough.
workspace_id=final.data.get("workspace_id"),
)
28 changes: 28 additions & 0 deletions app/features/demo/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from app.core.config import get_settings
from app.features.demo.models import ShowcaseWorkspace
from app.main import app


Expand All @@ -20,3 +24,27 @@ async def client() -> AsyncGenerator[AsyncClient, None]:
base_url="http://demo-test",
) as ac:
yield ac


@pytest.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
"""Yield an async session; wipe every showcase_workspace row on teardown.

E1 (#390) integration fixture (pattern:
``app/features/scenarios/tests/conftest.py``). ``showcase_workspace`` is a
slice-private table -- no seeder or other slice writes it -- so the
teardown safely wipes it whole rather than relying on a row marker.
"""
settings = get_settings()
engine = create_async_engine(settings.database_url, echo=False)
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async with session_maker() as session:
try:
yield session
finally:
await session.rollback()
await session.execute(delete(ShowcaseWorkspace))
await session.commit()

await engine.dispose()
Loading