Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ropeway/server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ def create_app(settings: Settings | None = None) -> FastAPI:
from .htmx import router as htmx_router
app.include_router(htmx_router)

# ---- Phase 9b: async optimize jobs (BackgroundTasks queue) ----
from .jobs import router as jobs_router
app.include_router(jobs_router)

# ---- health ----
@app.get("/health")
def health():
Expand Down
187 changes: 187 additions & 0 deletions src/ropeway/server/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Phase 9b — async optimize jobs.

The synchronous `/optimize/run` and `POST /htmx/run` endpoints block the
HTTP request thread for the full duration of a GA run. That is fine for
synthetic 60-generation demos but unacceptable for a DEM-driven 120-
generation production run that takes 30+ seconds.

This module adds a thin in-process job queue that lets a client POST a
run, get an immediate ``job_id``, and poll for status. The queue uses
FastAPI's ``BackgroundTasks`` (one worker thread per submitted job, no
external broker). Swap-out for Celery/Redis is straight-forward once
deployment requires multi-process scale — the ``JobStore`` interface is
the contract; the in-memory store is one implementation.

Routes registered:

POST /optimize/async -> {"job_id": "...", "status": "pending"}
GET /optimize/jobs/{id}
-> {"status": "...", "result": {...} | null,
"error": "..." | null}
GET /optimize/jobs -> [{"id": ..., "status": ..., "created_at": ...}]
"""

from __future__ import annotations

import threading
import time
import uuid
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Any, Optional

from fastapi import APIRouter, BackgroundTasks, HTTPException
from pydantic import BaseModel, Field


class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"


@dataclass
class Job:
id: str
status: JobStatus = JobStatus.PENDING
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
finished_at: Optional[float] = None
result: Optional[dict] = None
error: Optional[str] = None

def to_dict(self) -> dict:
out = asdict(self)
out["status"] = self.status.value
return out


class JobStore:
"""Thread-safe in-process job table.

Production deployments swap this for a Redis-backed implementation
that shares state across worker processes; the interface here is
deliberately small so that swap is cheap.
"""

def __init__(self) -> None:
self._jobs: dict[str, Job] = {}
self._lock = threading.Lock()

def submit(self) -> Job:
job = Job(id=uuid.uuid4().hex[:12])
with self._lock:
self._jobs[job.id] = job
return job

def get(self, job_id: str) -> Optional[Job]:
with self._lock:
return self._jobs.get(job_id)

def list(self) -> list[Job]:
with self._lock:
return sorted(self._jobs.values(), key=lambda j: -j.created_at)

def update(self, job_id: str, **fields: Any) -> None:
with self._lock:
job = self._jobs.get(job_id)
if job is None:
return
for k, v in fields.items():
setattr(job, k, v)


# Module-level singleton — one queue per FastAPI process.
_STORE = JobStore()


class AsyncOptimizeRequest(BaseModel):
"""Body for ``POST /optimize/async``."""
length_m: float = Field(default=3000.0, ge=500.0, le=20_000.0)
seed_terrain: int = 42
system: str = "mgd"
max_intermediate_towers: int = Field(default=8, ge=1, le=20)
population_size: int = Field(default=80, ge=10, le=500)
generations: int = Field(default=60, ge=10, le=400)


router = APIRouter(prefix="/optimize", tags=["optimize-async"])


def _run_job(job_id: str, body: AsyncOptimizeRequest) -> None:
"""Background-thread entry point. Catches every exception and
surfaces it as the job's ``error`` field so the client always sees a
terminal state."""
from ..dem import synthetic_profile
from ..multi_rope import RopewaySystemType, system_defaults
from ..optimizer import GAConfig, optimize

_STORE.update(job_id, status=JobStatus.RUNNING, started_at=time.time())
try:
sys_type = RopewaySystemType(body.system)
cfg = system_defaults(sys_type)
profile = synthetic_profile(length_m=body.length_m,
seed=body.seed_terrain)
ga = GAConfig(
max_intermediate_towers=body.max_intermediate_towers,
population_size=body.population_size,
generations=body.generations,
seed=body.seed_terrain,
)
result = optimize(profile.as_function(), profile.total_length,
cfg=cfg, ga=ga, verbose=False)
align = result.best_alignment
rep = result.best_result.report
_STORE.update(
job_id,
status=JobStatus.DONE,
finished_at=time.time(),
result={
"feasible": result.best_result.feasible,
"intermediate_towers": max(0, len(align.towers) - 2),
"cable_length_m": rep.total_cable_length_m,
"min_clearance_m": rep.min_clearance_m,
"max_tension_kn": rep.max_tension_n / 1e3,
"max_break_over_deg": rep.max_break_over_deg,
"cost": result.best_result.cost,
},
)
except Exception as exc: # noqa: BLE001 — surface anything to the client
_STORE.update(
job_id,
status=JobStatus.FAILED,
finished_at=time.time(),
error=f"{type(exc).__name__}: {exc}",
)


@router.post("/async")
def submit_async(body: AsyncOptimizeRequest,
background: BackgroundTasks) -> dict:
"""Kick off an optimize run and return its job id immediately."""
try:
from ..multi_rope import RopewaySystemType
RopewaySystemType(body.system)
except ValueError:
raise HTTPException(status_code=400,
detail=f"unknown system {body.system!r}")
job = _STORE.submit()
background.add_task(_run_job, job.id, body)
return {"job_id": job.id, "status": job.status.value}


@router.get("/jobs/{job_id}")
def read_job(job_id: str) -> dict:
job = _STORE.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return job.to_dict()


@router.get("/jobs")
def list_jobs() -> list[dict]:
return [j.to_dict() for j in _STORE.list()]


__all__ = ["router", "JobStore", "Job", "JobStatus", "AsyncOptimizeRequest"]
126 changes: 126 additions & 0 deletions tests/test_server_async_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Phase 9b — async optimize jobs via FastAPI BackgroundTasks."""

from __future__ import annotations

import time

import pytest
from fastapi.testclient import TestClient


@pytest.fixture()
def client(tmp_path, monkeypatch):
db_file = tmp_path / "jobs.sqlite"
monkeypatch.setenv("ROPEWAY_DATABASE_URL", f"sqlite:///{db_file}")
monkeypatch.setenv("ROPEWAY_JWT_SECRET", "test-secret-jobs")
import importlib

from ropeway.server import api as api_module
importlib.reload(api_module)
# Reset the in-process job store so tests don't see each other's jobs.
from ropeway.server import jobs as jobs_module
importlib.reload(jobs_module)
importlib.reload(api_module) # re-pick up the freshly-loaded store
return TestClient(api_module.app)


def _wait_for_terminal(client, job_id: str, timeout: float = 30.0) -> dict:
"""Poll the job endpoint until status is done or failed (or timeout)."""
deadline = time.time() + timeout
last = None
while time.time() < deadline:
r = client.get(f"/optimize/jobs/{job_id}")
assert r.status_code == 200
last = r.json()
if last["status"] in {"done", "failed"}:
return last
time.sleep(0.1)
raise AssertionError(f"job {job_id} did not finish; last status: {last}")


# ---------------------------------------------------------------------------
# Submit / read / list
# ---------------------------------------------------------------------------


def test_submit_returns_job_id_and_pending_status(client):
r = client.post("/optimize/async", json={
"length_m": 1000.0, "seed_terrain": 3,
"system": "mgd", "generations": 15,
})
assert r.status_code == 200
body = r.json()
assert "job_id" in body and len(body["job_id"]) == 12
assert body["status"] in {"pending", "running"}


def test_async_optimize_runs_to_completion_and_returns_metrics(client):
r = client.post("/optimize/async", json={
"length_m": 1200.0, "seed_terrain": 5,
"system": "mgd", "generations": 20,
"population_size": 40,
})
job_id = r.json()["job_id"]
final = _wait_for_terminal(client, job_id)
assert final["status"] == "done", f"job failed: {final.get('error')}"
metrics = final["result"]
assert metrics is not None
assert {"feasible", "cable_length_m", "min_clearance_m", "max_tension_kn"} \
.issubset(metrics)
assert isinstance(metrics["cable_length_m"], float)


def test_unknown_job_id_returns_404(client):
r = client.get("/optimize/jobs/deadbeefcafe")
assert r.status_code == 404


def test_jobs_list_includes_submitted_job(client):
r = client.post("/optimize/async", json={
"length_m": 1000.0, "seed_terrain": 7,
"system": "mgd", "generations": 15,
})
job_id = r.json()["job_id"]
_wait_for_terminal(client, job_id)
rows = client.get("/optimize/jobs").json()
ids = [row["id"] for row in rows]
assert job_id in ids


def test_invalid_system_returns_400(client):
r = client.post("/optimize/async", json={
"length_m": 1000.0, "system": "monorail", "generations": 10,
})
assert r.status_code == 400
assert "unknown system" in r.json()["detail"]


# ---------------------------------------------------------------------------
# JobStore internals (no FastAPI roundtrip)
# ---------------------------------------------------------------------------


def test_jobstore_returns_none_for_unknown_id():
from ropeway.server.jobs import JobStore
store = JobStore()
assert store.get("nope") is None


def test_jobstore_submit_assigns_unique_ids():
from ropeway.server.jobs import JobStore
store = JobStore()
a = store.submit()
b = store.submit()
assert a.id != b.id
assert store.get(a.id) is a
assert store.get(b.id) is b


def test_jobstore_update_changes_fields_in_place():
from ropeway.server.jobs import JobStatus, JobStore
store = JobStore()
job = store.submit()
store.update(job.id, status=JobStatus.DONE, result={"ok": True})
fetched = store.get(job.id)
assert fetched.status == JobStatus.DONE
assert fetched.result == {"ok": True}
Loading