Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 98 additions & 8 deletions app/features/demo/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
from app.core.logging import get_logger
from app.core.problem_details import EMBEDDING_AUTH_CODE, ERROR_TYPES
from app.features.demo import workspace
from app.features.demo.schemas import DemoRunRequest, StepEvent, StepStatus
from app.features.demo.schemas import DemoRunRequest, StepEvent, StepStatus, UserScope
from app.shared.seeder.config import ScenarioPreset
from app.shared.seeder.overrides import SeederOverrides

logger = get_logger(__name__)

Expand Down Expand Up @@ -261,6 +262,12 @@ class DemoContext:
# E3 (#392) -- workspace label for plan tagging. Set alongside
# workspace_id in run_pipeline's keep-branch; None on ephemeral runs.
workspace_name: str | None = None
# E3 (#409) -- additive Optional start-frame config. seed_overrides is
# forwarded verbatim to /seeder/generate by step_seed (None on legacy
# frames); user_scope is the operator-selected focus pair step_status
# validates and adopts (warn + fallback to discovery when dangling).
seed_overrides: SeederOverrides | None = None
user_scope: UserScope | None = None


# =============================================================================
Expand Down Expand Up @@ -546,12 +553,33 @@ async def step_seed(ctx: DemoContext, client: _Client) -> StepResult:
ctx.scenario,
_SeedProfile(DEMO_SEED_STORES, DEMO_SEED_PRODUCTS, DEMO_SEED_SPAN_DAYS),
)
stores, products = profile.stores, profile.products
if profile.window is not None:
# E3 (#409) -- effective dims = override-or-profile, used for BOTH the POST
# scalars and the detail line so the step card tells the truth. The nested
# object is ALSO forwarded verbatim; the seeder applies it last (wins).
overrides = ctx.seed_overrides
stores = (
overrides.stores
if overrides is not None and overrides.stores is not None
else profile.stores
)
products = (
overrides.products
if overrides is not None and overrides.products is not None
else profile.products
)
if overrides is not None and overrides.window_days is not None:
# The DemoRunRequest validator guarantees window_days is never set on
# the calendar-pinned holiday_rush preset, so today-anchored is safe.
seed_end = datetime.now(UTC).date()
seed_start = seed_end - timedelta(days=overrides.window_days)
elif profile.window is not None:
seed_start, seed_end = profile.window
else:
seed_end = datetime.now(UTC).date()
seed_start = seed_end - timedelta(days=profile.span_days)
# Scalar sparsity stays 0.0 (preserves preset character per the
# `if params.sparsity > 0` guard); overrides.sparsity is the only way the
# demo overrides sparsity.
body = await client.request(
"seed",
"POST",
Expand All @@ -565,17 +593,33 @@ async def step_seed(ctx: DemoContext, client: _Client) -> StepResult:
"end_date": seed_end.isoformat(),
"sparsity": 0.0,
"dry_run": False,
**(
{"overrides": overrides.model_dump(exclude_none=True)}
if overrides is not None
else {}
),
},
)
raw_records: dict[str, Any] = body.get("records_created", {})
records = {k: int(v) for k, v in raw_records.items() if isinstance(v, int)}
ctx.seed_records = records
# GenerateResult.records_created uses "sales" (singular), not "sales_daily".
sales = records.get("sales", records.get("sales_daily", 0))
overrides_applied = (
sorted(overrides.model_dump(exclude_none=True)) if overrides is not None else []
)
detail = f"{ctx.scenario.value}: {stores} stores x {products} products, {sales} sales rows"
if overrides_applied:
detail += f" (overrides: {', '.join(overrides_applied)})"
return (
"pass",
f"{ctx.scenario.value}: {stores} stores x {products} products, {sales} sales rows",
{"records_created": records, "scenario": ctx.scenario.value},
detail,
{
"records_created": records,
"scenario": ctx.scenario.value,
# E3 (#409) -- additive echo of the applied override knobs.
"overrides_applied": overrides_applied,
},
)


Expand All @@ -593,6 +637,46 @@ async def step_status(ctx: DemoContext, client: _Client) -> StepResult:
return ("fail", "no date_range in /seeder/status -- seed the database first", {})
ctx.date_start = date.fromisoformat(raw_start)
ctx.date_end = date.fromisoformat(raw_end)
sales = body.get("sales", 0)

# E3 (#409) -- operator-selected focus pair: validate both ids against the
# dimensions endpoints and adopt them. A dangling pair (e.g. after a
# reset+reseed re-issued ids -- sequences never reset) WARNS and falls back
# to discovery so a replayed reset=true workspace can never hard-fail here.
scope_warning = ""
if ctx.user_scope is not None:
try:
await client.request(
"status[scope-store]",
"GET",
f"/dimensions/stores/{ctx.user_scope.store_id}",
)
await client.request(
"status[scope-product]",
"GET",
f"/dimensions/products/{ctx.user_scope.product_id}",
)
except _StepError:
scope_warning = (
f"user_scope (store={ctx.user_scope.store_id}, "
f"product={ctx.user_scope.product_id}) not found -- "
"fell back to discovered pair; "
)
else:
ctx.store_id = ctx.user_scope.store_id
ctx.product_id = ctx.user_scope.product_id
return (
"pass",
f"date_range={raw_start}..{raw_end} sales={sales} "
f"store_id={ctx.store_id} product_id={ctx.product_id} (user-selected)",
{
"store_id": ctx.store_id,
"product_id": ctx.product_id,
"date_range_start": raw_start,
"date_range_end": raw_end,
"user_scope_applied": True,
},
)

stores_body = await client.request(
"status[stores]", "GET", "/dimensions/stores?page=1&page_size=1"
Expand All @@ -617,16 +701,19 @@ async def step_status(ctx: DemoContext, client: _Client) -> StepResult:
ctx.store_id = store_id_raw
ctx.product_id = product_id_raw

sales = body.get("sales", 0)
return (
"pass",
f"date_range={raw_start}..{raw_end} sales={sales} "
# E3 (#409) -- "warn" (never "fail") when a requested scope dangled:
# only "fail" stops the run, so the pipeline proceeds on the
# discovered pair with the divergence visible on the step card.
"warn" if scope_warning else "pass",
f"{scope_warning}date_range={raw_start}..{raw_end} sales={sales} "
f"store_id={ctx.store_id} product_id={ctx.product_id}",
{
"store_id": ctx.store_id,
"product_id": ctx.product_id,
"date_range_start": raw_start,
"date_range_end": raw_end,
"user_scope_applied": False,
},
)

Expand Down Expand Up @@ -2648,6 +2735,9 @@ async def run_pipeline(app: FastAPI, req: DemoRunRequest) -> AsyncIterator[StepE
skip_seed=req.skip_seed,
reset=req.reset,
scenario=req.scenario,
# E3 (#409) -- thread the validated start-frame config verbatim.
seed_overrides=req.seed_overrides,
user_scope=req.user_scope,
)
# E1 (#390) -- create the workspace row BEFORE the first step executes so
# even an early failure records the run config. create_workspace is
Expand Down
86 changes: 79 additions & 7 deletions app/features/demo/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

from app.shared.seeder.config import ScenarioPreset
from app.shared.seeder.overrides import SeederOverrides

# One pipeline step's outcome.
StepStatus = Literal["running", "pass", "fail", "skip", "warn"]
Expand All @@ -26,6 +27,22 @@ def _utc_now() -> datetime:
return datetime.now(UTC)


class UserScope(BaseModel):
"""Operator-selected (store, product) focus pair (E3, issue #409).

Ids are REAL discovered ids (Postgres sequences never reset -- ids are not
1-based); ``step_status`` validates them against ``/dimensions/*/{id}``
and warn-falls-back to discovery when the pair dangles (e.g. after a
reset+reseed re-issued ids). ``extra="forbid"`` keeps the slot schema
closed; additive keys need a documented schema change.
"""

model_config = ConfigDict(strict=True, extra="forbid")

store_id: int = Field(..., ge=1, description="Real store id from /dimensions/stores.")
product_id: int = Field(..., ge=1, description="Real product id from /dimensions/products.")


class DemoRunRequest(BaseModel):
"""Request body for ``POST /demo/run`` and the ``WS /demo/stream`` start frame.

Expand All @@ -34,7 +51,10 @@ class DemoRunRequest(BaseModel):
override -- there is no ``date`` / ``datetime`` / ``UUID`` / ``Decimal``
field (see ``.claude/rules/security-patterns.md`` and
``test_strict_mode_policy.py``). The sole exception is ``scenario``, whose
enum-on-the-wire form carries its own override (PRP-38).
enum-on-the-wire form carries its own override (PRP-38). The nested
``seed_overrides`` / ``user_scope`` models are themselves all-JSON-native
and validate from the JSON-parsed dict under the parent's strict mode
(runtime-verified on pydantic 2.12.5 -- E3 #409).
"""

model_config = ConfigDict(strict=True)
Expand Down Expand Up @@ -85,6 +105,25 @@ class DemoRunRequest(BaseModel):
pattern=r"^[0-9a-f]{32}$", # uuid4().hex shape of workspace_id
description="workspace_id this run replays; requires preservation='keep'.",
)
# E3 (#409): curated seed overrides + operator-selected focus pair. Both
# additive Optional with None defaults so legacy frames stay byte-identical.
# The nested models carry their own ConfigDict(strict=True, extra="forbid").
seed_overrides: SeederOverrides | None = Field(
default=None,
description=(
"Curated seeder overrides (allow-listed knobs); requires "
"skip_seed=false (Re-seed first). Forwarded verbatim to "
"POST /seeder/generate and recorded on a kept workspace row."
),
)
user_scope: UserScope | None = Field(
default=None,
description=(
"Operator-selected (store, product) focus pair the pipeline models "
"instead of the auto-discovered first pair; validated by the status "
"step (warn + fallback to discovery on a dangling pair)."
),
)

@model_validator(mode="after")
def _workspace_name_requires_keep(self) -> DemoRunRequest:
Expand All @@ -100,6 +139,34 @@ def _replayed_from_requires_keep(self) -> DemoRunRequest:
raise ValueError("replayed_from_workspace_id requires preservation='keep'")
return self

@model_validator(mode="after")
def _seed_overrides_require_reseed(self) -> DemoRunRequest:
"""Reject overrides on a run that skips the seed step (silent no-op trap).

An empty overrides object (``{}`` on the wire) normalizes to ``None``
so downstream code has a single "no overrides" representation.
"""
if self.seed_overrides is not None and self.seed_overrides.is_empty():
self.seed_overrides = None
if self.seed_overrides is not None and self.skip_seed:
raise ValueError("seed_overrides requires skip_seed=false (Re-seed first)")
return self

@model_validator(mode="after")
def _window_days_forbidden_on_holiday_rush(self) -> DemoRunRequest:
"""Reject window_days on the calendar-pinned holiday_rush preset.

The preset's HolidayConfig spikes are fixed 2024 dates -- a shifted
window would silently drop every holiday spike, so this fails loudly.
"""
if (
self.seed_overrides is not None
and self.seed_overrides.window_days is not None
and self.scenario is ScenarioPreset.HOLIDAY_RUSH
):
raise ValueError("window_days cannot override the calendar-pinned holiday_rush window")
return self


class WorkspaceUpdateRequest(BaseModel):
"""Partial lifecycle update for ``PATCH /demo/workspaces/{workspace_id}``.
Expand Down Expand Up @@ -266,6 +333,15 @@ class WorkspaceListItem(BaseModel):
default=None,
description="workspace_id this run replayed (soft reference; may dangle).",
)
# E3 (#409) -- the two replay-relevant story slots live on the LIST item
# (not detail-only): the frontend Replay reads list rows, and the
# replay-verbatim contract includes both slots.
seed_overrides: dict[str, Any] | None = Field(
default=None, description="Story slot (E3 #409): seeder-override payload."
)
user_scope: dict[str, Any] | None = Field(
default=None, description="Story slot (E3 #409): operator-selected focus."
)


class WorkspaceDetailResponse(WorkspaceListItem):
Expand All @@ -285,12 +361,8 @@ class WorkspaceDetailResponse(WorkspaceListItem):
config_schema_version: int = Field(
default=1, description="Version of the config + story-slot schema."
)
seed_overrides: dict[str, Any] | None = Field(
default=None, description="Story slot (E3 #409 writes): seeder-override payload."
)
user_scope: dict[str, Any] | None = Field(
default=None, description="Story slot (E3 #409 writes): operator-selected focus."
)
# E3 (#409) -- seed_overrides / user_scope moved UP to WorkspaceListItem
# (replay reads list rows); the four remaining story slots stay detail-only.
approval_events: list[dict[str, Any]] | None = Field(
default=None, description="Story slot (E5 #411 writes): HITL approval audit."
)
Expand Down
Loading