diff --git a/fileglancer/apps/core.py b/fileglancer/apps/core.py index 9df90077..0f47f524 100644 --- a/fileglancer/apps/core.py +++ b/fileglancer/apps/core.py @@ -937,6 +937,12 @@ def _poll_jobs(settings): if not jobs_to_poll: return True # zombie jobs still pending, keep polling + # Local executor: poll by checking PID files instead of spawning + # a worker subprocess (which would create a fresh executor with + # no knowledge of the running processes). + if settings.cluster.executor == "local": + return _poll_local_jobs(session, jobs_to_poll) + # Pick any user to run bjobs as (bjobs -u all sees all users' jobs) poll_username = jobs_to_poll[0].username # Pass current known statuses so stubs are seeded correctly. @@ -982,6 +988,81 @@ def _poll_jobs(settings): return True +def _poll_local_jobs(session, jobs_to_poll: list) -> bool: + """Poll local executor jobs by checking PID files and process liveness. + + The local executor runs jobs as bash subprocesses. The submit worker + writes the PID to ``{work_dir}/job.pid``, and the script writes its + exit code to ``{work_dir}/exit_code`` via an EXIT trap. + + Returns True if there are still active jobs, False otherwise. + """ + still_active = False + + for db_job in jobs_to_poll: + work_dir = Path(db_job.work_dir) if db_job.work_dir else None + if not work_dir: + still_active = True + continue + + pid_file = work_dir / "job.pid" + if not pid_file.exists(): + still_active = True + continue + + try: + pid = int(pid_file.read_text().strip()) + except (ValueError, OSError): + still_active = True + continue + + old_status = db_job.status + try: + os.kill(pid, 0) + # Process is still alive + still_active = True + if old_status == "PENDING": + db.update_job_status( + session, db_job.id, "RUNNING", + started_at=datetime.now(UTC), + ) + logger.info(f"Job {db_job.id} status updated: PENDING -> RUNNING") + except ProcessLookupError: + # Process has exited — read exit code from the trap file + exit_code = _read_exit_code(work_dir) + new_status = "DONE" if exit_code == 0 else "FAILED" + now = datetime.now(UTC) + db.update_job_status( + session, db_job.id, new_status, + exit_code=exit_code, + finished_at=now, + started_at=now if old_status == "PENDING" else None, + ) + logger.info(f"Job {db_job.id} status updated: {old_status} -> {new_status}") + except PermissionError: + # Process exists but owned by another user — still running + still_active = True + if old_status == "PENDING": + db.update_job_status( + session, db_job.id, "RUNNING", + started_at=datetime.now(UTC), + ) + logger.info(f"Job {db_job.id} status updated: PENDING -> RUNNING") + + return still_active + + +def _read_exit_code(work_dir: Path) -> int | None: + """Read the exit code written by the EXIT trap in the job script.""" + exit_code_file = work_dir / "exit_code" + if not exit_code_file.exists(): + return None + try: + return int(exit_code_file.read_text().strip()) + except (ValueError, OSError): + return None + + def _parse_iso_dt(s: str | None) -> datetime | None: """Parse an ISO 8601 datetime string, or return None.""" if not s: @@ -1192,6 +1273,12 @@ async def submit_job( "unset PIXI_PROJECT_MANIFEST", f"export FG_WORK_DIR={shlex.quote(str(work_dir))}", ] + # For local executor, trap EXIT to write the exit code to a file so + # PID-based polling can determine the final status after the process exits. + if settings.cluster.executor == "local": + preamble_lines.append( + 'trap \'echo $? > "$FG_WORK_DIR/exit_code"\' EXIT' + ) if settings.apps.extra_paths: path_suffix = os.pathsep.join(shlex.quote(p) for p in settings.apps.extra_paths) preamble_lines.append(f"export PATH=$PATH:{path_suffix}") diff --git a/fileglancer/apps/worker.py b/fileglancer/apps/worker.py index cf418d94..f5170bdb 100644 --- a/fileglancer/apps/worker.py +++ b/fileglancer/apps/worker.py @@ -71,6 +71,16 @@ async def _submit(request: dict) -> dict: resources=resource_spec, ) + # For local executor, write the subprocess PID to disk so the poll + # loop can check process liveness across worker invocations. + # Only LocalExecutor has a _processes dict; HPC executors don't. + processes = getattr(executor, "_processes", None) + if processes is not None: + proc = processes.get(job.job_id) + if proc is not None: + pid_file = work_dir / "job.pid" + pid_file.write_text(str(proc.pid)) + return {"job_id": job.job_id, "script_path": job.script_path} diff --git a/tests/test_poll.py b/tests/test_poll.py index c60fb787..da52ae76 100644 --- a/tests/test_poll.py +++ b/tests/test_poll.py @@ -3,28 +3,28 @@ import asyncio import fcntl import multiprocessing +import subprocess import time from datetime import datetime, UTC +from pathlib import Path from types import SimpleNamespace from unittest.mock import patch, MagicMock, call -from fileglancer.apps.core import _poll_jobs, _POLL_LOCK_PATH +from fileglancer.apps.core import _poll_jobs, _poll_local_jobs, _POLL_LOCK_PATH # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- -def _make_settings(**overrides): +def _make_settings(executor="lsf", **overrides): """Return a minimal Settings-like object for poll tests.""" cluster = SimpleNamespace( poll_interval=0.05, zombie_timeout_minutes=30.0, - **{k: v for k, v in { - "executor": "local", - }.items()}, + executor=executor, ) - cluster.model_dump = lambda exclude_none=False: {"executor": "local"} + cluster.model_dump = lambda exclude_none=False: {"executor": executor} settings = SimpleNamespace( cluster=cluster, db_url="sqlite:///unused", @@ -33,7 +33,7 @@ def _make_settings(**overrides): def _make_db_job(job_id, cluster_job_id, status, username="alice", - created_at=None): + created_at=None, work_dir=None): """Return a mock DB job row.""" job = SimpleNamespace( id=job_id, @@ -41,6 +41,7 @@ def _make_db_job(job_id, cluster_job_id, status, username="alice", status=status, username=username, created_at=created_at or datetime.now(UTC), + work_dir=work_dir, ) return job @@ -245,3 +246,129 @@ def test_concurrent_processes_only_one_wins(self): p2.join(timeout=5) assert won_count.value == 1, f"Expected 1 poll winner, got {won_count.value}" + + +# --------------------------------------------------------------------------- +# Test: local executor PID-based polling +# --------------------------------------------------------------------------- + +class TestPollLocalJobs: + """_poll_local_jobs checks PID files and process liveness to determine + job status, bypassing the worker subprocess that can't track local + executor processes across invocations.""" + + def test_running_process_transitions_to_running(self, tmp_path): + """A PENDING job whose PID is still alive should become RUNNING.""" + # Start a long-running process + proc = subprocess.Popen(["sleep", "60"]) + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + + try: + job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is True # still active + mock_db.update_job_status.assert_called_once() + assert mock_db.update_job_status.call_args[0][2] == "RUNNING" + finally: + proc.terminate() + proc.wait() + + def test_exited_process_transitions_to_done(self, tmp_path): + """A job whose process has exited with code 0 should become DONE.""" + # Start and immediately wait for a process that exits successfully + proc = subprocess.Popen(["true"]) + proc.wait() + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + exit_code_file = tmp_path / "exit_code" + exit_code_file.write_text("0") + + job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is False # no more active jobs + mock_db.update_job_status.assert_called_once() + args = mock_db.update_job_status.call_args[0] + assert args[2] == "DONE" + + def test_exited_process_with_error_transitions_to_failed(self, tmp_path): + """A job whose process exited with non-zero code should become FAILED.""" + proc = subprocess.Popen(["false"]) + proc.wait() + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + exit_code_file = tmp_path / "exit_code" + exit_code_file.write_text("1") + + job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is False + args = mock_db.update_job_status.call_args[0] + assert args[2] == "FAILED" + + def test_already_running_not_updated_again(self, tmp_path): + """A RUNNING job whose PID is still alive should not be updated.""" + proc = subprocess.Popen(["sleep", "60"]) + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + + try: + job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + _poll_local_jobs(mock_session, [job]) + + mock_db.update_job_status.assert_not_called() + finally: + proc.terminate() + proc.wait() + + def test_missing_pid_file_keeps_polling(self, tmp_path): + """A job with no PID file should be treated as still active.""" + job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is True + mock_db.update_job_status.assert_not_called() + + def test_poll_jobs_routes_local_executor(self, tmp_path): + """_poll_jobs should route to _poll_local_jobs when executor is local.""" + settings = _make_settings(executor="local") + + proc = subprocess.Popen(["sleep", "60"]) + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + + try: + job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + mock_db.get_db_session.return_value.__enter__ = lambda _: mock_session + mock_db.get_db_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db.get_active_jobs.return_value = [job] + + result = _poll_jobs(settings) + + assert result is True + # Should NOT have called _run_as_user (cluster-based polling) + mock_db.update_job_status.assert_called_once() + finally: + proc.terminate() + proc.wait()