Skip to content

Commit 38ed8e1

Browse files
committed
♻️ Refactor DSS mock API for clarity
1 parent 4c62a96 commit 38ed8e1

File tree

1 file changed

+128
-89
lines changed

1 file changed

+128
-89
lines changed

dss-api/main.py

Lines changed: 128 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,56 @@
33
import os
44
import uuid
55
from datetime import datetime
6-
from typing import Any, Dict
6+
from typing import Any, Dict, Optional
77

88
import requests
99
import uvicorn
1010
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, status
1111
from fastapi.security import APIKeyHeader
1212
from pydantic import BaseModel
1313

14+
15+
# Configuration
16+
class Config:
17+
API_KEY_HEADER = "X-API-Key"
18+
REQUEST_TIMEOUT = 5
19+
JOB_PROCESSING_STEPS = 5
20+
STEP_DURATION = 2
21+
22+
# Mock optimization results
23+
MOCK_RESULTS = {
24+
"energy_savings_kwh": 245.8,
25+
"cost_reduction_eur": 48.72,
26+
"co2_reduction_kg": 98.32,
27+
"optimization_score": 0.85,
28+
"recommended_actions": [
29+
"Reduce HVAC temperature by 2°C during off-peak hours",
30+
"Implement smart lighting controls in zone B",
31+
"Schedule high-energy equipment during low-cost periods",
32+
],
33+
}
34+
35+
1436
# Configure logging
1537
logging.basicConfig(level=logging.INFO)
1638
logger = logging.getLogger(__name__)
1739

40+
# In-memory job storage
41+
jobs_storage: Dict[str, Dict[str, Any]] = {}
42+
43+
# FastAPI app
1844
app = FastAPI(
1945
title="DSS Mock API",
20-
description="Mock Decision Support System - Functionality 1 (Energy Optimization) for proof-of-concept integration",
46+
description="Mock Decision Support System for energy optimization integration",
2147
version="1.0.0",
2248
)
2349

2450
# API Key authentication
25-
API_KEY_HEADER = "X-API-Key"
26-
api_key_header = APIKeyHeader(name=API_KEY_HEADER, auto_error=False)
51+
api_key_header = APIKeyHeader(name=Config.API_KEY_HEADER, auto_error=False)
2752

2853

29-
def get_api_key(api_key: str = Depends(api_key_header)):
30-
"""Get the API key from the environment variable"""
54+
def get_api_key(api_key: str = Depends(api_key_header)) -> str:
55+
"""Validate API key from request header."""
3156

3257
expected_key = os.getenv("BACKEND_API_KEY")
3358

@@ -39,10 +64,7 @@ def get_api_key(api_key: str = Depends(api_key_header)):
3964
return api_key
4065

4166

42-
# In-memory job storage
43-
jobs_storage: Dict[str, Dict[str, Any]] = {}
44-
45-
67+
# Pydantic models
4668
class F1JobRequest(BaseModel):
4769
building_id: str = "building_001"
4870
optimization_type: str = "energy_efficiency"
@@ -60,86 +82,120 @@ class F1JobStatus(BaseModel):
6082
job_id: str
6183
status: str
6284
progress: int
63-
result: Dict[str, Any] = None
85+
result: Optional[Dict[str, Any]] = None
6486
created_at: str
65-
completed_at: str = None
66-
67-
68-
async def simulate_job_processing(job_id: str, callback_url: str = None):
69-
"""Simulate DSS F1 (Energy Optimization) job processing with status updates"""
87+
completed_at: Optional[str] = None
7088

71-
try:
72-
# Update job status to running
73-
jobs_storage[job_id]["status"] = "running"
74-
jobs_storage[job_id]["progress"] = 10
75-
76-
# Simulate processing time (10 minutes -> 10 seconds for demo)
77-
await asyncio.sleep(2)
78-
jobs_storage[job_id]["progress"] = 25
7989

80-
await asyncio.sleep(2)
81-
jobs_storage[job_id]["progress"] = 50
90+
# Job processing functions
91+
def update_job_progress(job_id: str, status: str, progress: int) -> None:
92+
"""Update job status and progress in storage."""
8293

83-
await asyncio.sleep(2)
84-
jobs_storage[job_id]["progress"] = 75
94+
if job_id in jobs_storage:
95+
jobs_storage[job_id]["status"] = status
96+
jobs_storage[job_id]["progress"] = progress
8597

86-
await asyncio.sleep(2)
87-
jobs_storage[job_id]["progress"] = 90
8898

89-
await asyncio.sleep(2)
99+
def complete_job(job_id: str) -> None:
100+
"""Mark job as completed with results."""
90101

91-
# Complete the job
102+
if job_id in jobs_storage:
92103
jobs_storage[job_id]["status"] = "completed"
93104
jobs_storage[job_id]["progress"] = 100
94105
jobs_storage[job_id]["completed_at"] = datetime.now().isoformat()
95-
jobs_storage[job_id]["result"] = {
96-
"energy_savings_kwh": 245.8,
97-
"cost_reduction_eur": 48.72,
98-
"co2_reduction_kg": 98.32,
99-
"optimization_score": 0.85,
100-
"recommended_actions": [
101-
"Reduce HVAC temperature by 2°C during off-peak hours",
102-
"Implement smart lighting controls in zone B",
103-
"Schedule high-energy equipment during low-cost periods",
104-
],
106+
jobs_storage[job_id]["result"] = Config.MOCK_RESULTS
107+
108+
109+
def fail_job(job_id: str, error: str) -> None:
110+
"""Mark job as failed with error message."""
111+
112+
if job_id in jobs_storage:
113+
jobs_storage[job_id]["status"] = "failed"
114+
jobs_storage[job_id]["error"] = error
115+
116+
117+
async def send_webhook(callback_url: str, job_id: str) -> None:
118+
"""Send webhook notification for completed job."""
119+
120+
try:
121+
callback_data = {
122+
"job_id": job_id,
123+
"status": "completed",
124+
"result": jobs_storage[job_id]["result"],
105125
}
106126

107-
# Send callback if provided (webhook simulation)
108-
if callback_url:
109-
callback_data = {
110-
"job_id": job_id,
111-
"status": "completed",
112-
"result": jobs_storage[job_id]["result"],
113-
}
127+
requests.post(callback_url, json=callback_data, timeout=Config.REQUEST_TIMEOUT)
128+
logger.info(f"Webhook sent to {callback_url} for job {job_id}")
129+
except Exception as e:
130+
logger.error(f"Webhook failed for job {job_id}: {e}")
114131

115-
requests.post(callback_url, json=callback_data, timeout=5)
116-
logger.info(f"Sent callback to {callback_url} for job {job_id}")
132+
133+
async def simulate_job_processing(
134+
job_id: str, callback_url: Optional[str] = None
135+
) -> None:
136+
"""Simulate energy optimization job processing."""
137+
138+
try:
139+
update_job_progress(job_id, "running", 10)
140+
141+
# Simulate processing steps
142+
progress_steps = [25, 50, 75, 90]
143+
for progress in progress_steps:
144+
await asyncio.sleep(Config.STEP_DURATION)
145+
update_job_progress(job_id, "running", progress)
146+
147+
await asyncio.sleep(Config.STEP_DURATION)
148+
complete_job(job_id)
149+
150+
if callback_url:
151+
await send_webhook(callback_url, job_id)
117152

118153
except Exception as e:
119-
logger.error(f"Job processing failed: {e}")
120-
jobs_storage[job_id]["status"] = "failed"
121-
jobs_storage[job_id]["error"] = str(e)
154+
logger.error(f"Job processing failed for {job_id}: {e}")
155+
fail_job(job_id, str(e))
122156

123157

158+
# Utility functions
159+
def validate_job_exists(job_id: str) -> Dict[str, Any]:
160+
"""Validate job exists and return job data."""
161+
162+
if job_id not in jobs_storage:
163+
raise HTTPException(
164+
status_code=status.HTTP_404_NOT_FOUND, detail="Job not found"
165+
)
166+
167+
return jobs_storage[job_id]
168+
169+
170+
def validate_job_cancellable(job_data: Dict[str, Any]) -> None:
171+
"""Validate job can be cancelled."""
172+
173+
if job_data["status"] in ["completed", "failed"]:
174+
raise HTTPException(
175+
status_code=status.HTTP_400_BAD_REQUEST,
176+
detail="Cannot cancel completed or failed job",
177+
)
178+
179+
180+
# API endpoints
124181
@app.get("/health")
125182
async def health_check():
126-
"""Health check endpoint"""
183+
"""Health check endpoint."""
127184

128-
return {"status": "healthy", "service": "DSS F1 (Energy Optimization) Mock API"}
185+
return {"status": "healthy", "service": "DSS Mock API"}
129186

130187

131188
@app.post("/f1/jobs", response_model=F1JobResponse)
132189
async def create_f1_job(
133190
job_request: F1JobRequest,
134191
background_tasks: BackgroundTasks,
135-
callback_url: str = None,
136-
api_key: str = Depends(get_api_key),
192+
callback_url: Optional[str] = None,
193+
_: str = Depends(get_api_key),
137194
):
138-
"""Create a new DSS F1 (Energy Optimization) analysis job"""
195+
"""Create energy optimization analysis job."""
139196

140197
job_id = str(uuid.uuid4())
141198

142-
# Store job in memory
143199
job_data = {
144200
"job_id": job_id,
145201
"status": "pending",
@@ -152,30 +208,23 @@ async def create_f1_job(
152208
}
153209

154210
jobs_storage[job_id] = job_data
155-
156-
# Start background processing
157211
background_tasks.add_task(simulate_job_processing, job_id, callback_url)
158212

159-
logger.info(f"Created DSS F1 job {job_id} for building {job_request.building_id}")
213+
logger.info(f"Created job {job_id} for building {job_request.building_id}")
160214

161215
return F1JobResponse(
162216
job_id=job_id,
163217
status="pending",
164-
message=f"DSS F1 energy optimization job created for building {job_request.building_id} ({job_request.optimization_type})",
218+
message=f"Energy optimization job created for building {job_request.building_id} ({job_request.optimization_type})",
165219
created_at=job_data["created_at"],
166220
)
167221

168222

169223
@app.get("/f1/jobs/{job_id}", response_model=F1JobStatus)
170-
async def get_job_status(job_id: str, api_key: str = Depends(get_api_key)):
171-
"""Get the status of a specific DSS F1 energy optimization job"""
224+
async def get_job_status(job_id: str, _: str = Depends(get_api_key)):
225+
"""Get energy optimization job status."""
172226

173-
if job_id not in jobs_storage:
174-
raise HTTPException(
175-
status_code=status.HTTP_404_NOT_FOUND, detail="Job not found"
176-
)
177-
178-
job_data = jobs_storage[job_id]
227+
job_data = validate_job_exists(job_id)
179228

180229
return F1JobStatus(
181230
job_id=job_id,
@@ -188,31 +237,21 @@ async def get_job_status(job_id: str, api_key: str = Depends(get_api_key)):
188237

189238

190239
@app.get("/f1/jobs")
191-
async def list_jobs(api_key: str = Depends(get_api_key)):
192-
"""List all DSS F1 energy optimization jobs"""
240+
async def list_jobs(_: str = Depends(get_api_key)):
241+
"""List all energy optimization jobs."""
193242

194243
return {"jobs": list(jobs_storage.values())}
195244

196245

197246
@app.delete("/f1/jobs/{job_id}")
198-
async def cancel_job(job_id: str, api_key: str = Depends(get_api_key)):
199-
"""Cancel a running DSS F1 energy optimization job"""
200-
201-
if job_id not in jobs_storage:
202-
raise HTTPException(
203-
status_code=status.HTTP_404_NOT_FOUND, detail="Job not found"
204-
)
247+
async def cancel_job(job_id: str, _: str = Depends(get_api_key)):
248+
"""Cancel energy optimization job."""
205249

206-
job_data = jobs_storage[job_id]
207-
208-
if job_data["status"] in ["completed", "failed"]:
209-
raise HTTPException(
210-
status_code=status.HTTP_400_BAD_REQUEST,
211-
detail="Cannot cancel completed or failed job",
212-
)
250+
job_data = validate_job_exists(job_id)
251+
validate_job_cancellable(job_data)
213252

214253
jobs_storage[job_id]["status"] = "cancelled"
215-
logger.info(f"Cancelled DSS F1 job {job_id}")
254+
logger.info(f"Cancelled job {job_id}")
216255

217256
return {"message": f"Job {job_id} cancelled"}
218257

0 commit comments

Comments
 (0)