Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions fileglancer/apps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,12 @@ def _poll_jobs(settings):
if not jobs_to_poll:
return True # zombie jobs still pending, keep polling

# Local executor: poll by checking PID files instead of spawning
# a worker subprocess (which would create a fresh executor with
# no knowledge of the running processes).
if settings.cluster.executor == "local":
return _poll_local_jobs(session, jobs_to_poll)

# Pick any user to run bjobs as (bjobs -u all sees all users' jobs)
poll_username = jobs_to_poll[0].username
# Pass current known statuses so stubs are seeded correctly.
Expand Down Expand Up @@ -982,6 +988,81 @@ def _poll_jobs(settings):
return True


def _poll_local_jobs(session, jobs_to_poll: list) -> bool:
"""Poll local executor jobs by checking PID files and process liveness.

The local executor runs jobs as bash subprocesses. The submit worker
writes the PID to ``{work_dir}/job.pid``, and the script writes its
exit code to ``{work_dir}/exit_code`` via an EXIT trap.

Returns True if there are still active jobs, False otherwise.
"""
still_active = False

for db_job in jobs_to_poll:
work_dir = Path(db_job.work_dir) if db_job.work_dir else None
if not work_dir:
still_active = True
continue

pid_file = work_dir / "job.pid"
if not pid_file.exists():
still_active = True
continue

try:
pid = int(pid_file.read_text().strip())
except (ValueError, OSError):
still_active = True
continue

old_status = db_job.status
try:
os.kill(pid, 0)
# Process is still alive
still_active = True
if old_status == "PENDING":
db.update_job_status(
session, db_job.id, "RUNNING",
started_at=datetime.now(UTC),
)
logger.info(f"Job {db_job.id} status updated: PENDING -> RUNNING")
except ProcessLookupError:
# Process has exited — read exit code from the trap file
exit_code = _read_exit_code(work_dir)
new_status = "DONE" if exit_code == 0 else "FAILED"
now = datetime.now(UTC)
db.update_job_status(
session, db_job.id, new_status,
exit_code=exit_code,
finished_at=now,
started_at=now if old_status == "PENDING" else None,
)
logger.info(f"Job {db_job.id} status updated: {old_status} -> {new_status}")
except PermissionError:
# Process exists but owned by another user — still running
still_active = True
if old_status == "PENDING":
db.update_job_status(
session, db_job.id, "RUNNING",
started_at=datetime.now(UTC),
)
logger.info(f"Job {db_job.id} status updated: PENDING -> RUNNING")

return still_active


def _read_exit_code(work_dir: Path) -> int | None:
"""Read the exit code written by the EXIT trap in the job script."""
exit_code_file = work_dir / "exit_code"
if not exit_code_file.exists():
return None
try:
return int(exit_code_file.read_text().strip())
except (ValueError, OSError):
return None


def _parse_iso_dt(s: str | None) -> datetime | None:
"""Parse an ISO 8601 datetime string, or return None."""
if not s:
Expand Down Expand Up @@ -1192,6 +1273,12 @@ async def submit_job(
"unset PIXI_PROJECT_MANIFEST",
f"export FG_WORK_DIR={shlex.quote(str(work_dir))}",
]
# For local executor, trap EXIT to write the exit code to a file so
# PID-based polling can determine the final status after the process exits.
if settings.cluster.executor == "local":
preamble_lines.append(
'trap \'echo $? > "$FG_WORK_DIR/exit_code"\' EXIT'
)
if settings.apps.extra_paths:
path_suffix = os.pathsep.join(shlex.quote(p) for p in settings.apps.extra_paths)
preamble_lines.append(f"export PATH=$PATH:{path_suffix}")
Expand Down
10 changes: 10 additions & 0 deletions fileglancer/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ async def _submit(request: dict) -> dict:
resources=resource_spec,
)

# For local executor, write the subprocess PID to disk so the poll
# loop can check process liveness across worker invocations.
# Only LocalExecutor has a _processes dict; HPC executors don't.
processes = getattr(executor, "_processes", None)
if processes is not None:
proc = processes.get(job.job_id)
if proc is not None:
pid_file = work_dir / "job.pid"
pid_file.write_text(str(proc.pid))

return {"job_id": job.job_id, "script_path": job.script_path}


Expand Down
141 changes: 134 additions & 7 deletions tests/test_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
import asyncio
import fcntl
import multiprocessing
import subprocess
import time
from datetime import datetime, UTC
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch, MagicMock, call

from fileglancer.apps.core import _poll_jobs, _POLL_LOCK_PATH
from fileglancer.apps.core import _poll_jobs, _poll_local_jobs, _POLL_LOCK_PATH


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _make_settings(**overrides):
def _make_settings(executor="lsf", **overrides):
"""Return a minimal Settings-like object for poll tests."""
cluster = SimpleNamespace(
poll_interval=0.05,
zombie_timeout_minutes=30.0,
**{k: v for k, v in {
"executor": "local",
}.items()},
executor=executor,
)
cluster.model_dump = lambda exclude_none=False: {"executor": "local"}
cluster.model_dump = lambda exclude_none=False: {"executor": executor}
settings = SimpleNamespace(
cluster=cluster,
db_url="sqlite:///unused",
Expand All @@ -33,14 +33,15 @@ def _make_settings(**overrides):


def _make_db_job(job_id, cluster_job_id, status, username="alice",
created_at=None):
created_at=None, work_dir=None):
"""Return a mock DB job row."""
job = SimpleNamespace(
id=job_id,
cluster_job_id=cluster_job_id,
status=status,
username=username,
created_at=created_at or datetime.now(UTC),
work_dir=work_dir,
)
return job

Expand Down Expand Up @@ -245,3 +246,129 @@ def test_concurrent_processes_only_one_wins(self):
p2.join(timeout=5)

assert won_count.value == 1, f"Expected 1 poll winner, got {won_count.value}"


# ---------------------------------------------------------------------------
# Test: local executor PID-based polling
# ---------------------------------------------------------------------------

class TestPollLocalJobs:
"""_poll_local_jobs checks PID files and process liveness to determine
job status, bypassing the worker subprocess that can't track local
executor processes across invocations."""

def test_running_process_transitions_to_running(self, tmp_path):
"""A PENDING job whose PID is still alive should become RUNNING."""
# Start a long-running process
proc = subprocess.Popen(["sleep", "60"])
pid_file = tmp_path / "job.pid"
pid_file.write_text(str(proc.pid))

try:
job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path))
mock_session = MagicMock()

with patch("fileglancer.apps.core.db") as mock_db:
result = _poll_local_jobs(mock_session, [job])

assert result is True # still active
mock_db.update_job_status.assert_called_once()
assert mock_db.update_job_status.call_args[0][2] == "RUNNING"
finally:
proc.terminate()
proc.wait()

def test_exited_process_transitions_to_done(self, tmp_path):
"""A job whose process has exited with code 0 should become DONE."""
# Start and immediately wait for a process that exits successfully
proc = subprocess.Popen(["true"])
proc.wait()
pid_file = tmp_path / "job.pid"
pid_file.write_text(str(proc.pid))
exit_code_file = tmp_path / "exit_code"
exit_code_file.write_text("0")

job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path))
mock_session = MagicMock()

with patch("fileglancer.apps.core.db") as mock_db:
result = _poll_local_jobs(mock_session, [job])

assert result is False # no more active jobs
mock_db.update_job_status.assert_called_once()
args = mock_db.update_job_status.call_args[0]
assert args[2] == "DONE"

def test_exited_process_with_error_transitions_to_failed(self, tmp_path):
"""A job whose process exited with non-zero code should become FAILED."""
proc = subprocess.Popen(["false"])
proc.wait()
pid_file = tmp_path / "job.pid"
pid_file.write_text(str(proc.pid))
exit_code_file = tmp_path / "exit_code"
exit_code_file.write_text("1")

job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path))
mock_session = MagicMock()

with patch("fileglancer.apps.core.db") as mock_db:
result = _poll_local_jobs(mock_session, [job])

assert result is False
args = mock_db.update_job_status.call_args[0]
assert args[2] == "FAILED"

def test_already_running_not_updated_again(self, tmp_path):
"""A RUNNING job whose PID is still alive should not be updated."""
proc = subprocess.Popen(["sleep", "60"])
pid_file = tmp_path / "job.pid"
pid_file.write_text(str(proc.pid))

try:
job = _make_db_job(1, "1", "RUNNING", work_dir=str(tmp_path))
mock_session = MagicMock()

with patch("fileglancer.apps.core.db") as mock_db:
_poll_local_jobs(mock_session, [job])

mock_db.update_job_status.assert_not_called()
finally:
proc.terminate()
proc.wait()

def test_missing_pid_file_keeps_polling(self, tmp_path):
"""A job with no PID file should be treated as still active."""
job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path))
mock_session = MagicMock()

with patch("fileglancer.apps.core.db") as mock_db:
result = _poll_local_jobs(mock_session, [job])

assert result is True
mock_db.update_job_status.assert_not_called()

def test_poll_jobs_routes_local_executor(self, tmp_path):
"""_poll_jobs should route to _poll_local_jobs when executor is local."""
settings = _make_settings(executor="local")

proc = subprocess.Popen(["sleep", "60"])
pid_file = tmp_path / "job.pid"
pid_file.write_text(str(proc.pid))

try:
job = _make_db_job(1, "1", "PENDING", work_dir=str(tmp_path))
mock_session = MagicMock()

with patch("fileglancer.apps.core.db") as mock_db:
mock_db.get_db_session.return_value.__enter__ = lambda _: mock_session
mock_db.get_db_session.return_value.__exit__ = MagicMock(return_value=False)
mock_db.get_active_jobs.return_value = [job]

result = _poll_jobs(settings)

assert result is True
# Should NOT have called _run_as_user (cluster-based polling)
mock_db.update_job_status.assert_called_once()
finally:
proc.terminate()
proc.wait()
Loading