diff --git a/app/core/config.py b/app/core/config.py index 9f51f5c..997eed3 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -33,6 +33,8 @@ class Settings(BaseSettings): POSTGRES_HOST: str = "localhost" POSTGRES_PORT: int = 5432 + PHOTO_APPROVAL_TIMEOUT_DAYS: int = 7 + # Mobile auth/session defaults MOBILE_SESSION_LIMIT: int = 3 MOBILE_SESSION_TTL_SECONDS: int = 180 diff --git a/app/main.py b/app/main.py index 1e06dcb..981aa8f 100644 --- a/app/main.py +++ b/app/main.py @@ -8,6 +8,7 @@ from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint from app.core.config import settings +from app.infra.database import engine from app.infra.minio import init_minio_client from app.infra.nats import NatsClient from app.infra.redis import RedisClient @@ -48,6 +49,18 @@ async def dispatch( +async def _approval_expiry_loop() -> None: + while True: + await asyncio.sleep(3600) + try: + async with engine.begin() as conn: + from app.container import Container + container = Container(conn) + await container.photo_approval_service.expire_stale(settings.PHOTO_APPROVAL_TIMEOUT_DAYS) + except Exception as exc: + logger.warning("Approval expiry task failed: %s", exc) + + MAX_RETRIES = 5 RETRY_DELAY = 2 # seconds @asynccontextmanager @@ -77,8 +90,11 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: await NatsClient.connect() get_face_embedding_service() + expiry_task = asyncio.create_task(_approval_expiry_loop()) + yield + expiry_task.cancel() await RedisClient.get_instance().close() await NatsClient.close() diff --git a/app/service/photo_approval.py b/app/service/photo_approval.py index b6cb870..7b8617f 100644 --- a/app/service/photo_approval.py +++ b/app/service/photo_approval.py @@ -64,6 +64,14 @@ async def decide( await self._photo_querier.update_photo_status(id=photo_id, status="approved") return "approved" + async def expire_stale(self, timeout_days: int) -> int: + count = 0 + async for _ in self._approval_querier.expire_stale_approvals(timeout_days=timeout_days): + count += 1 + if count: + logger.info("Auto-expired %d stale pending photo(s)", count) + return count + async def _delete_photo_storage(self, photo_id: UUID) -> None: photo = await self._photo_querier.get_photo_by_id(id=photo_id) if photo is None: diff --git a/app/worker/photo_worker/main.py b/app/worker/photo_worker/main.py index 651b606..892e50b 100644 --- a/app/worker/photo_worker/main.py +++ b/app/worker/photo_worker/main.py @@ -123,6 +123,8 @@ async def _handle_single_face(self, event: PhotoProcessEvent, face: DetectedFace async def _handle_group_photo(self, event: PhotoProcessEvent, faces: list[DetectedFace]) -> None: logger.info("Processing group photo %s with %d faces", event.photo_id, len(faces)) + approvals_created = 0 + for face_index, face in enumerate(faces): bbox_json = json.dumps({ "x1": float(face.bbox[0]), @@ -155,6 +157,8 @@ async def _handle_group_photo(self, event: PhotoProcessEvent, faces: list[Detect logger.info("No match for face %d in photo %s", face_index, event.photo_id) continue + approvals_created += 1 + try: await self._notification_service.create_notification( user_id=approval.user_id, @@ -178,6 +182,11 @@ async def _handle_group_photo(self, event: PhotoProcessEvent, faces: list[Detect approval.user_id, event.photo_id, exc, ) + if approvals_created == 0: + logger.info("No users matched in group photo %s, auto-approving as public", event.photo_id) + await self._photo_querier.update_photo_status(id=event.photo_id, status="approved") + await self._photo_querier.update_photo_visibility(id=event.photo_id, visibility="public") + async def _create_job(self, event: PhotoProcessEvent) -> models.ProcessingJob | None: if self._pj_querier is None: diff --git a/db/generated/photo_approvals.py b/db/generated/photo_approvals.py index f712392..d3776c3 100644 --- a/db/generated/photo_approvals.py +++ b/db/generated/photo_approvals.py @@ -11,6 +11,25 @@ from db.generated import models +EXPIRE_STALE_APPROVALS = """-- name: expire_stale_approvals \\:many +WITH stale_photos AS ( + SELECT id FROM photos + WHERE status = 'pending' + AND created_at < now() - make_interval(days => :p1::int) +), +_update_approvals AS ( + UPDATE photo_approvals + SET decision = 'approved', decided_at = now() + WHERE photo_id IN (SELECT id FROM stale_photos) + AND decision = 'pending' +) +UPDATE photos +SET status = 'approved' +WHERE id IN (SELECT id FROM stale_photos) +RETURNING id +""" + + CREATE_PHOTO_APPROVAL = """-- name: create_photo_approval \\:one INSERT INTO photo_approvals ( photo_id, @@ -49,6 +68,11 @@ class AsyncQuerier: def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection): self._conn = conn + async def expire_stale_approvals(self, *, timeout_days: int) -> AsyncIterator[uuid.UUID]: + result = await self._conn.stream(sqlalchemy.text(EXPIRE_STALE_APPROVALS), {"p1": timeout_days}) + async for row in result: + yield row[0] + async def create_photo_approval(self, *, photo_id: uuid.UUID, user_id: uuid.UUID, decision: str) -> Optional[models.PhotoApproval]: row = (await self._conn.execute(sqlalchemy.text(CREATE_PHOTO_APPROVAL), {"p1": photo_id, "p2": user_id, "p3": decision})).first() if row is None: diff --git a/db/queries/photo_approvals.sql b/db/queries/photo_approvals.sql index ac6b787..298f1fc 100644 --- a/db/queries/photo_approvals.sql +++ b/db/queries/photo_approvals.sql @@ -17,6 +17,23 @@ RETURNING *; -- name: GetPhotoApprovalsByPhotoId :many SELECT * FROM photo_approvals WHERE photo_id = $1; +-- name: ExpireStaleApprovals :many +WITH stale_photos AS ( + SELECT id FROM photos + WHERE status = 'pending' + AND created_at < now() - make_interval(days => $1::int) +), +_update_approvals AS ( + UPDATE photo_approvals + SET decision = 'approved', decided_at = now() + WHERE photo_id IN (SELECT id FROM stale_photos) + AND decision = 'pending' +) +UPDATE photos +SET status = 'approved' +WHERE id IN (SELECT id FROM stale_photos) +RETURNING id; + -- name: ListApprovalsByUserAndStatus :many SELECT * FROM photo_approvals WHERE user_id = $1