diff --git a/tests/integration/endpoints/test_jobs.py b/tests/integration/endpoints/test_jobs.py index b37cd67..546982c 100644 --- a/tests/integration/endpoints/test_jobs.py +++ b/tests/integration/endpoints/test_jobs.py @@ -13,6 +13,7 @@ from workerfacing_api.core.filesystem import FileSystem, LocalFilesystem, S3Filesystem from workerfacing_api.core.queue import RDSJobQueue from workerfacing_api.crud import job_tracking +from workerfacing_api.exceptions import JobDeletedException from workerfacing_api.schemas.queue_jobs import ( AppSpecs, EnvironmentTypes, @@ -277,7 +278,7 @@ def test_put_job_status_canceled( client.get(self.endpoint, params={"memory": 1}) def mock_update_job(*args: Any, **kwargs: Any) -> None: - raise ValueError("Job not found") + raise JobDeletedException("Job not found") monkeypatch.setattr(job_tracking, "update_job", mock_update_job) res = client.put(f"{self.endpoint}/1/status", params={"status": "running"}) diff --git a/tests/unit/crud/__init__.py b/tests/unit/crud/__init__.py new file mode 100644 index 0000000..deae50a --- /dev/null +++ b/tests/unit/crud/__init__.py @@ -0,0 +1 @@ +# Empty init file diff --git a/tests/unit/crud/test_job_tracking.py b/tests/unit/crud/test_job_tracking.py new file mode 100644 index 0000000..9625282 --- /dev/null +++ b/tests/unit/crud/test_job_tracking.py @@ -0,0 +1,27 @@ +"""Tests for job_tracking module.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from workerfacing_api.crud.job_tracking import update_job +from workerfacing_api.exceptions import JobDeletedException +from workerfacing_api.schemas.rds_models import JobStates + + +@patch("workerfacing_api.crud.job_tracking.requests.put") +def test_update_job_raises_job_deleted_exception(mock_put: MagicMock) -> None: + """Test that update_job raises JobDeletedException on 404 response.""" + # Mock a 404 response + mock_response = MagicMock() + mock_response.status_code = 404 + mock_put.return_value = mock_response + + job_id = 789 + + with pytest.raises(JobDeletedException) as exc_info: + update_job(job_id, JobStates.running) + + assert f"Job {job_id} not found; it was probably deleted by the user." in str( + exc_info.value + ) diff --git a/workerfacing_api/core/queue.py b/workerfacing_api/core/queue.py index dcbd91b..a9e668b 100644 --- a/workerfacing_api/core/queue.py +++ b/workerfacing_api/core/queue.py @@ -18,6 +18,7 @@ from workerfacing_api import settings from workerfacing_api.crud import job_tracking +from workerfacing_api.exceptions import JobDeletedException, JobNotAssignedException from workerfacing_api.schemas.queue_jobs import ( EnvironmentTypes, JobFilter, @@ -457,7 +458,7 @@ def pop(self, environment: EnvironmentTypes, receipt_handle: str) -> bool: job.workers = ";".join(job.workers.split(";") + [hostname]) try: self._update_job_status(session, job, status=JobStates.pulled) - except ValueError: + except JobDeletedException: # job probably deleted by user return False return True @@ -484,7 +485,7 @@ def get_job( if hostname: workers = job.workers.split(";") if not workers or hostname != workers[-1]: - raise ValueError( + raise JobNotAssignedException( f"Job with id {job_id} is not assigned to worker {hostname}" ) return job @@ -505,11 +506,11 @@ def _update_job_status( job_id = job.job["meta"]["job_id"] assert isinstance(job_id, int) job_tracking.update_job(job_id, status, runtime_details) - except ValueError as e: + except JobDeletedException as e: # job probably deleted by user session.delete(job) session.commit() - raise ValueError(f"Could not update job, probably deleted by user: {e}") + raise e from e def update_job_status( self, @@ -545,19 +546,27 @@ def handle_timeouts( # TODO: increase priority? job.num_retries += 1 session.add(job) - self.update_job_status( - job.id, - JobStates.queued, - f"timeout {job.num_retries} (workers tried: {job.workers})", - ) - n_retry += 1 + try: + self.update_job_status( + job.id, + JobStates.queued, + f"timeout {job.num_retries} (workers tried: {job.workers})", + ) + n_retry += 1 + except JobDeletedException: + # job probably deleted by user, skip updating status + pass jobs_failed = jobs_timeout.filter(QueuedJob.num_retries >= max_retries) for job in jobs_failed: - self.update_job_status( - job.id, - JobStates.error, - "max retries reached", - ) - n_failed += 1 + try: + self.update_job_status( + job.id, + JobStates.error, + "max retries reached", + ) + n_failed += 1 + except JobDeletedException: + # job probably deleted by user, skip updating status + pass session.commit() return n_retry, n_failed diff --git a/workerfacing_api/crud/job_tracking.py b/workerfacing_api/crud/job_tracking.py index a332ddc..233d835 100644 --- a/workerfacing_api/crud/job_tracking.py +++ b/workerfacing_api/crud/job_tracking.py @@ -2,6 +2,7 @@ from fastapi.encoders import jsonable_encoder import workerfacing_api.settings as settings +from workerfacing_api.exceptions import JobDeletedException from workerfacing_api.schemas.rds_models import JobStates @@ -19,7 +20,7 @@ def update_job( headers={"x-api-key": settings.internal_api_key_secret}, ) if resp.status_code == 404: - raise ValueError( + raise JobDeletedException( f"Job {job_id} not found; it was probably deleted by the user." ) resp.raise_for_status() diff --git a/workerfacing_api/endpoints/jobs.py b/workerfacing_api/endpoints/jobs.py index 6c52ba4..06eea1f 100644 --- a/workerfacing_api/endpoints/jobs.py +++ b/workerfacing_api/endpoints/jobs.py @@ -17,6 +17,7 @@ from workerfacing_api.core.filesystem import FileSystem from workerfacing_api.core.queue import RDSJobQueue from workerfacing_api.dependencies import filesystem_dep, queue_dep +from workerfacing_api.exceptions import JobDeletedException, JobNotAssignedException from workerfacing_api.schemas.files import FileHTTPRequest from workerfacing_api.schemas.queue_jobs import ( EnvironmentTypes, @@ -111,7 +112,7 @@ async def put_job_status( hostname = request.state.current_user.username try: queue.update_job_status(job_id, status, runtime_details, hostname=hostname) - except ValueError: + except (JobDeletedException, JobNotAssignedException): # acts as a "cancel job" signal to worker raise HTTPException(status_code=httpstatus.HTTP_404_NOT_FOUND) diff --git a/workerfacing_api/exceptions.py b/workerfacing_api/exceptions.py new file mode 100644 index 0000000..354cde8 --- /dev/null +++ b/workerfacing_api/exceptions.py @@ -0,0 +1,13 @@ +"""Custom exceptions for the workerfacing API.""" + + +class JobDeletedException(Exception): + """Exception raised when a job has been deleted by the user.""" + + pass + + +class JobNotAssignedException(Exception): + """Exception raised when a job is not assigned to the current user.""" + + pass