From 87dcda1c1509b86bdc1d3113aec8d77aa79c3b7d Mon Sep 17 00:00:00 2001 From: Harsh Pandhe Date: Wed, 20 May 2026 13:49:57 +0530 Subject: [PATCH] feat(phase-9b): async optimize jobs via FastAPI BackgroundTasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The synchronous /optimize/run and POST /htmx/run endpoints block the HTTP request thread for the full duration of a GA run. Fine for synthetic 60-generation demos; unacceptable for a DEM-driven 120-generation production run that takes 30+ seconds. New module src/ropeway/server/jobs.py adds a thin in-process job queue: POST a run, get an immediate job_id, poll for status. Uses FastAPI's BackgroundTasks (one worker thread per job, no external broker). Swap-out for Celery/Redis is straight-forward — the JobStore interface is the contract; the in-memory store is one impl. Routes: POST /optimize/async -> {job_id, status} GET /optimize/jobs/{id} -> {status, result | null, error | null} GET /optimize/jobs -> list, newest first Status state machine: PENDING -> RUNNING -> DONE | FAILED. Every exception in the background task is caught and surfaced as the job's error field so the client always reaches a terminal state. The existing /optimize/run (synchronous, JWT-authenticated) is untouched — it remains the deterministic synchronous path for the test suite and pre-9b callers. Tests: tests/test_server_async_jobs.py — 8 new - submit returns job_id + pending/running - async optimize runs to DONE with metrics - unknown job_id -> 404 - jobs list includes the submitted id - invalid system -> 400 (validates body before queueing) - JobStore.get returns None for unknown id - JobStore.submit assigns unique ids - JobStore.update mutates fields in place Full suite 216 -> 224, zero regressions. --- src/ropeway/server/api.py | 4 + src/ropeway/server/jobs.py | 187 ++++++++++++++++++++++++++++++++ tests/test_server_async_jobs.py | 126 +++++++++++++++++++++ 3 files changed, 317 insertions(+) create mode 100644 src/ropeway/server/jobs.py create mode 100644 tests/test_server_async_jobs.py diff --git a/src/ropeway/server/api.py b/src/ropeway/server/api.py index 7d408cc..fa94a42 100644 --- a/src/ropeway/server/api.py +++ b/src/ropeway/server/api.py @@ -140,6 +140,10 @@ def create_app(settings: Settings | None = None) -> FastAPI: from .htmx import router as htmx_router app.include_router(htmx_router) + # ---- Phase 9b: async optimize jobs (BackgroundTasks queue) ---- + from .jobs import router as jobs_router + app.include_router(jobs_router) + # ---- health ---- @app.get("/health") def health(): diff --git a/src/ropeway/server/jobs.py b/src/ropeway/server/jobs.py new file mode 100644 index 0000000..feaa3c9 --- /dev/null +++ b/src/ropeway/server/jobs.py @@ -0,0 +1,187 @@ +"""Phase 9b — async optimize jobs. + +The synchronous `/optimize/run` and `POST /htmx/run` endpoints block the +HTTP request thread for the full duration of a GA run. That is fine for +synthetic 60-generation demos but unacceptable for a DEM-driven 120- +generation production run that takes 30+ seconds. + +This module adds a thin in-process job queue that lets a client POST a +run, get an immediate ``job_id``, and poll for status. The queue uses +FastAPI's ``BackgroundTasks`` (one worker thread per submitted job, no +external broker). Swap-out for Celery/Redis is straight-forward once +deployment requires multi-process scale — the ``JobStore`` interface is +the contract; the in-memory store is one implementation. + +Routes registered: + + POST /optimize/async -> {"job_id": "...", "status": "pending"} + GET /optimize/jobs/{id} + -> {"status": "...", "result": {...} | null, + "error": "..." | null} + GET /optimize/jobs -> [{"id": ..., "status": ..., "created_at": ...}] +""" + +from __future__ import annotations + +import threading +import time +import uuid +from dataclasses import asdict, dataclass, field +from enum import Enum +from typing import Any, Optional + +from fastapi import APIRouter, BackgroundTasks, HTTPException +from pydantic import BaseModel, Field + + +class JobStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + DONE = "done" + FAILED = "failed" + + +@dataclass +class Job: + id: str + status: JobStatus = JobStatus.PENDING + created_at: float = field(default_factory=time.time) + started_at: Optional[float] = None + finished_at: Optional[float] = None + result: Optional[dict] = None + error: Optional[str] = None + + def to_dict(self) -> dict: + out = asdict(self) + out["status"] = self.status.value + return out + + +class JobStore: + """Thread-safe in-process job table. + + Production deployments swap this for a Redis-backed implementation + that shares state across worker processes; the interface here is + deliberately small so that swap is cheap. + """ + + def __init__(self) -> None: + self._jobs: dict[str, Job] = {} + self._lock = threading.Lock() + + def submit(self) -> Job: + job = Job(id=uuid.uuid4().hex[:12]) + with self._lock: + self._jobs[job.id] = job + return job + + def get(self, job_id: str) -> Optional[Job]: + with self._lock: + return self._jobs.get(job_id) + + def list(self) -> list[Job]: + with self._lock: + return sorted(self._jobs.values(), key=lambda j: -j.created_at) + + def update(self, job_id: str, **fields: Any) -> None: + with self._lock: + job = self._jobs.get(job_id) + if job is None: + return + for k, v in fields.items(): + setattr(job, k, v) + + +# Module-level singleton — one queue per FastAPI process. +_STORE = JobStore() + + +class AsyncOptimizeRequest(BaseModel): + """Body for ``POST /optimize/async``.""" + length_m: float = Field(default=3000.0, ge=500.0, le=20_000.0) + seed_terrain: int = 42 + system: str = "mgd" + max_intermediate_towers: int = Field(default=8, ge=1, le=20) + population_size: int = Field(default=80, ge=10, le=500) + generations: int = Field(default=60, ge=10, le=400) + + +router = APIRouter(prefix="/optimize", tags=["optimize-async"]) + + +def _run_job(job_id: str, body: AsyncOptimizeRequest) -> None: + """Background-thread entry point. Catches every exception and + surfaces it as the job's ``error`` field so the client always sees a + terminal state.""" + from ..dem import synthetic_profile + from ..multi_rope import RopewaySystemType, system_defaults + from ..optimizer import GAConfig, optimize + + _STORE.update(job_id, status=JobStatus.RUNNING, started_at=time.time()) + try: + sys_type = RopewaySystemType(body.system) + cfg = system_defaults(sys_type) + profile = synthetic_profile(length_m=body.length_m, + seed=body.seed_terrain) + ga = GAConfig( + max_intermediate_towers=body.max_intermediate_towers, + population_size=body.population_size, + generations=body.generations, + seed=body.seed_terrain, + ) + result = optimize(profile.as_function(), profile.total_length, + cfg=cfg, ga=ga, verbose=False) + align = result.best_alignment + rep = result.best_result.report + _STORE.update( + job_id, + status=JobStatus.DONE, + finished_at=time.time(), + result={ + "feasible": result.best_result.feasible, + "intermediate_towers": max(0, len(align.towers) - 2), + "cable_length_m": rep.total_cable_length_m, + "min_clearance_m": rep.min_clearance_m, + "max_tension_kn": rep.max_tension_n / 1e3, + "max_break_over_deg": rep.max_break_over_deg, + "cost": result.best_result.cost, + }, + ) + except Exception as exc: # noqa: BLE001 — surface anything to the client + _STORE.update( + job_id, + status=JobStatus.FAILED, + finished_at=time.time(), + error=f"{type(exc).__name__}: {exc}", + ) + + +@router.post("/async") +def submit_async(body: AsyncOptimizeRequest, + background: BackgroundTasks) -> dict: + """Kick off an optimize run and return its job id immediately.""" + try: + from ..multi_rope import RopewaySystemType + RopewaySystemType(body.system) + except ValueError: + raise HTTPException(status_code=400, + detail=f"unknown system {body.system!r}") + job = _STORE.submit() + background.add_task(_run_job, job.id, body) + return {"job_id": job.id, "status": job.status.value} + + +@router.get("/jobs/{job_id}") +def read_job(job_id: str) -> dict: + job = _STORE.get(job_id) + if job is None: + raise HTTPException(status_code=404, detail="job not found") + return job.to_dict() + + +@router.get("/jobs") +def list_jobs() -> list[dict]: + return [j.to_dict() for j in _STORE.list()] + + +__all__ = ["router", "JobStore", "Job", "JobStatus", "AsyncOptimizeRequest"] diff --git a/tests/test_server_async_jobs.py b/tests/test_server_async_jobs.py new file mode 100644 index 0000000..57030b4 --- /dev/null +++ b/tests/test_server_async_jobs.py @@ -0,0 +1,126 @@ +"""Phase 9b — async optimize jobs via FastAPI BackgroundTasks.""" + +from __future__ import annotations + +import time + +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture() +def client(tmp_path, monkeypatch): + db_file = tmp_path / "jobs.sqlite" + monkeypatch.setenv("ROPEWAY_DATABASE_URL", f"sqlite:///{db_file}") + monkeypatch.setenv("ROPEWAY_JWT_SECRET", "test-secret-jobs") + import importlib + + from ropeway.server import api as api_module + importlib.reload(api_module) + # Reset the in-process job store so tests don't see each other's jobs. + from ropeway.server import jobs as jobs_module + importlib.reload(jobs_module) + importlib.reload(api_module) # re-pick up the freshly-loaded store + return TestClient(api_module.app) + + +def _wait_for_terminal(client, job_id: str, timeout: float = 30.0) -> dict: + """Poll the job endpoint until status is done or failed (or timeout).""" + deadline = time.time() + timeout + last = None + while time.time() < deadline: + r = client.get(f"/optimize/jobs/{job_id}") + assert r.status_code == 200 + last = r.json() + if last["status"] in {"done", "failed"}: + return last + time.sleep(0.1) + raise AssertionError(f"job {job_id} did not finish; last status: {last}") + + +# --------------------------------------------------------------------------- +# Submit / read / list +# --------------------------------------------------------------------------- + + +def test_submit_returns_job_id_and_pending_status(client): + r = client.post("/optimize/async", json={ + "length_m": 1000.0, "seed_terrain": 3, + "system": "mgd", "generations": 15, + }) + assert r.status_code == 200 + body = r.json() + assert "job_id" in body and len(body["job_id"]) == 12 + assert body["status"] in {"pending", "running"} + + +def test_async_optimize_runs_to_completion_and_returns_metrics(client): + r = client.post("/optimize/async", json={ + "length_m": 1200.0, "seed_terrain": 5, + "system": "mgd", "generations": 20, + "population_size": 40, + }) + job_id = r.json()["job_id"] + final = _wait_for_terminal(client, job_id) + assert final["status"] == "done", f"job failed: {final.get('error')}" + metrics = final["result"] + assert metrics is not None + assert {"feasible", "cable_length_m", "min_clearance_m", "max_tension_kn"} \ + .issubset(metrics) + assert isinstance(metrics["cable_length_m"], float) + + +def test_unknown_job_id_returns_404(client): + r = client.get("/optimize/jobs/deadbeefcafe") + assert r.status_code == 404 + + +def test_jobs_list_includes_submitted_job(client): + r = client.post("/optimize/async", json={ + "length_m": 1000.0, "seed_terrain": 7, + "system": "mgd", "generations": 15, + }) + job_id = r.json()["job_id"] + _wait_for_terminal(client, job_id) + rows = client.get("/optimize/jobs").json() + ids = [row["id"] for row in rows] + assert job_id in ids + + +def test_invalid_system_returns_400(client): + r = client.post("/optimize/async", json={ + "length_m": 1000.0, "system": "monorail", "generations": 10, + }) + assert r.status_code == 400 + assert "unknown system" in r.json()["detail"] + + +# --------------------------------------------------------------------------- +# JobStore internals (no FastAPI roundtrip) +# --------------------------------------------------------------------------- + + +def test_jobstore_returns_none_for_unknown_id(): + from ropeway.server.jobs import JobStore + store = JobStore() + assert store.get("nope") is None + + +def test_jobstore_submit_assigns_unique_ids(): + from ropeway.server.jobs import JobStore + store = JobStore() + a = store.submit() + b = store.submit() + assert a.id != b.id + assert store.get(a.id) is a + assert store.get(b.id) is b + + +def test_jobstore_update_changes_fields_in_place(): + from ropeway.server.jobs import JobStatus, JobStore + store = JobStore() + job = store.submit() + store.update(job.id, status=JobStatus.DONE, result={"ok": True}) + fetched = store.get(job.id) + assert fetched.status == JobStatus.DONE + assert fetched.result == {"ok": True}