From 0a9b2fe0af9633b254834e681e0c177f88502e53 Mon Sep 17 00:00:00 2001 From: Allison Truhlar Date: Tue, 21 Apr 2026 20:52:11 +0000 Subject: [PATCH 1/2] fix: resolve job status never updating for local executor The LocalExecutor tracks running processes in-memory, but the worker subprocess architecture creates a fresh executor for each submit/poll invocation. The submit worker's process references are lost when it exits, so the poll worker's executor.poll() never detects status changes. Fix by bypassing the worker subprocess for local executor polling: - Write the subprocess PID to {work_dir}/job.pid during submit - Add a bash EXIT trap to capture exit code in {work_dir}/exit_code - Poll by checking PID liveness via os.kill(pid, 0) instead of spawning a worker that creates a new (empty) executor --- fileglancer/apps/core.py | 87 ++++++++++++++++++++++ fileglancer/apps/worker.py | 10 +++ tests/test_poll.py | 144 +++++++++++++++++++++++++++++++++++-- 3 files changed, 234 insertions(+), 7 deletions(-) diff --git a/fileglancer/apps/core.py b/fileglancer/apps/core.py index 9df90077..0f47f524 100644 --- a/fileglancer/apps/core.py +++ b/fileglancer/apps/core.py @@ -937,6 +937,12 @@ def _poll_jobs(settings): if not jobs_to_poll: return True # zombie jobs still pending, keep polling + # Local executor: poll by checking PID files instead of spawning + # a worker subprocess (which would create a fresh executor with + # no knowledge of the running processes). + if settings.cluster.executor == "local": + return _poll_local_jobs(session, jobs_to_poll) + # Pick any user to run bjobs as (bjobs -u all sees all users' jobs) poll_username = jobs_to_poll[0].username # Pass current known statuses so stubs are seeded correctly. @@ -982,6 +988,81 @@ def _poll_jobs(settings): return True +def _poll_local_jobs(session, jobs_to_poll: list) -> bool: + """Poll local executor jobs by checking PID files and process liveness. + + The local executor runs jobs as bash subprocesses. The submit worker + writes the PID to ``{work_dir}/job.pid``, and the script writes its + exit code to ``{work_dir}/exit_code`` via an EXIT trap. + + Returns True if there are still active jobs, False otherwise. + """ + still_active = False + + for db_job in jobs_to_poll: + work_dir = Path(db_job.work_dir) if db_job.work_dir else None + if not work_dir: + still_active = True + continue + + pid_file = work_dir / "job.pid" + if not pid_file.exists(): + still_active = True + continue + + try: + pid = int(pid_file.read_text().strip()) + except (ValueError, OSError): + still_active = True + continue + + old_status = db_job.status + try: + os.kill(pid, 0) + # Process is still alive + still_active = True + if old_status == "PENDING": + db.update_job_status( + session, db_job.id, "RUNNING", + started_at=datetime.now(UTC), + ) + logger.info(f"Job {db_job.id} status updated: PENDING -> RUNNING") + except ProcessLookupError: + # Process has exited — read exit code from the trap file + exit_code = _read_exit_code(work_dir) + new_status = "DONE" if exit_code == 0 else "FAILED" + now = datetime.now(UTC) + db.update_job_status( + session, db_job.id, new_status, + exit_code=exit_code, + finished_at=now, + started_at=now if old_status == "PENDING" else None, + ) + logger.info(f"Job {db_job.id} status updated: {old_status} -> {new_status}") + except PermissionError: + # Process exists but owned by another user — still running + still_active = True + if old_status == "PENDING": + db.update_job_status( + session, db_job.id, "RUNNING", + started_at=datetime.now(UTC), + ) + logger.info(f"Job {db_job.id} status updated: PENDING -> RUNNING") + + return still_active + + +def _read_exit_code(work_dir: Path) -> int | None: + """Read the exit code written by the EXIT trap in the job script.""" + exit_code_file = work_dir / "exit_code" + if not exit_code_file.exists(): + return None + try: + return int(exit_code_file.read_text().strip()) + except (ValueError, OSError): + return None + + def _parse_iso_dt(s: str | None) -> datetime | None: """Parse an ISO 8601 datetime string, or return None.""" if not s: @@ -1192,6 +1273,12 @@ async def submit_job( "unset PIXI_PROJECT_MANIFEST", f"export FG_WORK_DIR={shlex.quote(str(work_dir))}", ] + # For local executor, trap EXIT to write the exit code to a file so + # PID-based polling can determine the final status after the process exits. + if settings.cluster.executor == "local": + preamble_lines.append( + 'trap \'echo $? > "$FG_WORK_DIR/exit_code"\' EXIT' + ) if settings.apps.extra_paths: path_suffix = os.pathsep.join(shlex.quote(p) for p in settings.apps.extra_paths) preamble_lines.append(f"export PATH=$PATH:{path_suffix}") diff --git a/fileglancer/apps/worker.py b/fileglancer/apps/worker.py index cf418d94..f5170bdb 100644 --- a/fileglancer/apps/worker.py +++ b/fileglancer/apps/worker.py @@ -71,6 +71,16 @@ async def _submit(request: dict) -> dict: resources=resource_spec, ) + # For local executor, write the subprocess PID to disk so the poll + # loop can check process liveness across worker invocations. + # Only LocalExecutor has a _processes dict; HPC executors don't. + processes = getattr(executor, "_processes", None) + if processes is not None: + proc = processes.get(job.job_id) + if proc is not None: + pid_file = work_dir / "job.pid" + pid_file.write_text(str(proc.pid)) + return {"job_id": job.job_id, "script_path": job.script_path} diff --git a/tests/test_poll.py b/tests/test_poll.py index c60fb787..2ffcf7ae 100644 --- a/tests/test_poll.py +++ b/tests/test_poll.py @@ -3,28 +3,30 @@ import asyncio import fcntl import multiprocessing +import os +import signal +import subprocess import time from datetime import datetime, UTC +from pathlib import Path from types import SimpleNamespace from unittest.mock import patch, MagicMock, call -from fileglancer.apps.core import _poll_jobs, _POLL_LOCK_PATH +from fileglancer.apps.core import _poll_jobs, _poll_local_jobs, _POLL_LOCK_PATH # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- -def _make_settings(**overrides): +def _make_settings(executor="lsf", **overrides): """Return a minimal Settings-like object for poll tests.""" cluster = SimpleNamespace( poll_interval=0.05, zombie_timeout_minutes=30.0, - **{k: v for k, v in { - "executor": "local", - }.items()}, + executor=executor, ) - cluster.model_dump = lambda exclude_none=False: {"executor": "local"} + cluster.model_dump = lambda exclude_none=False: {"executor": executor} settings = SimpleNamespace( cluster=cluster, db_url="sqlite:///unused", @@ -33,7 +35,7 @@ def _make_settings(**overrides): def _make_db_job(job_id, cluster_job_id, status, username="alice", - created_at=None): + created_at=None, work_dir=None): """Return a mock DB job row.""" job = SimpleNamespace( id=job_id, @@ -41,6 +43,7 @@ def _make_db_job(job_id, cluster_job_id, status, username="alice", status=status, username=username, created_at=created_at or datetime.now(UTC), + work_dir=work_dir, ) return job @@ -245,3 +248,130 @@ def test_concurrent_processes_only_one_wins(self): p2.join(timeout=5) assert won_count.value == 1, f"Expected 1 poll winner, got {won_count.value}" + + +# --------------------------------------------------------------------------- +# Test: local executor PID-based polling +# --------------------------------------------------------------------------- + +class TestPollLocalJobs: + """_poll_local_jobs checks PID files and process liveness to determine + job status, bypassing the worker subprocess that can't track local + executor processes across invocations.""" + + def test_running_process_transitions_to_running(self, tmp_path): + """A PENDING job whose PID is still alive should become RUNNING.""" + # Start a long-running process + proc = subprocess.Popen(["sleep", "60"]) + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + + try: + job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is True # still active + mock_db.update_job_status.assert_called_once() + _, kwargs = mock_db.update_job_status.call_args + assert kwargs.get("status") or mock_db.update_job_status.call_args[0][2] == "RUNNING" + finally: + proc.terminate() + proc.wait() + + def test_exited_process_transitions_to_done(self, tmp_path): + """A job whose process has exited with code 0 should become DONE.""" + # Start and immediately wait for a process that exits successfully + proc = subprocess.Popen(["true"]) + proc.wait() + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + exit_code_file = tmp_path / "exit_code" + exit_code_file.write_text("0") + + job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is False # no more active jobs + mock_db.update_job_status.assert_called_once() + args = mock_db.update_job_status.call_args[0] + assert args[2] == "DONE" + + def test_exited_process_with_error_transitions_to_failed(self, tmp_path): + """A job whose process exited with non-zero code should become FAILED.""" + proc = subprocess.Popen(["false"]) + proc.wait() + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + exit_code_file = tmp_path / "exit_code" + exit_code_file.write_text("1") + + job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is False + args = mock_db.update_job_status.call_args[0] + assert args[2] == "FAILED" + + def test_already_running_not_updated_again(self, tmp_path): + """A RUNNING job whose PID is still alive should not be updated.""" + proc = subprocess.Popen(["sleep", "60"]) + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + + try: + job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + _poll_local_jobs(mock_session, [job]) + + mock_db.update_job_status.assert_not_called() + finally: + proc.terminate() + proc.wait() + + def test_missing_pid_file_keeps_polling(self, tmp_path): + """A job with no PID file should be treated as still active.""" + job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + result = _poll_local_jobs(mock_session, [job]) + + assert result is True + mock_db.update_job_status.assert_not_called() + + def test_poll_jobs_routes_local_executor(self, tmp_path): + """_poll_jobs should route to _poll_local_jobs when executor is local.""" + settings = _make_settings(executor="local") + + proc = subprocess.Popen(["sleep", "60"]) + pid_file = tmp_path / "job.pid" + pid_file.write_text(str(proc.pid)) + + try: + job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path)) + mock_session = MagicMock() + + with patch("fileglancer.apps.core.db") as mock_db: + mock_db.get_db_session.return_value.__enter__ = lambda _: mock_session + mock_db.get_db_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db.get_active_jobs.return_value = [job] + + result = _poll_jobs(settings) + + assert result is True + # Should NOT have called _run_as_user (cluster-based polling) + mock_db.update_job_status.assert_called_once() + finally: + proc.terminate() + proc.wait() From 4cd3145f0ef39d26a7aeb54be66807a2eb8179d9 Mon Sep 17 00:00:00 2001 From: Allison Truhlar Date: Thu, 23 Apr 2026 14:48:16 -0400 Subject: [PATCH 2/2] tests: fix incorrect test assertion - any string would cause kwargs.get("status") to be true, and the second condition to check for "RUNNING" would never be checked. --- tests/test_poll.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_poll.py b/tests/test_poll.py index 2ffcf7ae..da52ae76 100644 --- a/tests/test_poll.py +++ b/tests/test_poll.py @@ -3,8 +3,6 @@ import asyncio import fcntl import multiprocessing -import os -import signal import subprocess import time from datetime import datetime, UTC @@ -275,8 +273,7 @@ def test_running_process_transitions_to_running(self, tmp_path): assert result is True # still active mock_db.update_job_status.assert_called_once() - _, kwargs = mock_db.update_job_status.call_args - assert kwargs.get("status") or mock_db.update_job_status.call_args[0][2] == "RUNNING" + assert mock_db.update_job_status.call_args[0][2] == "RUNNING" finally: proc.terminate() proc.wait()