From 61317c1ad05451bca14aaf96915ac2c8214a0ea5 Mon Sep 17 00:00:00 2001 From: Victor Verhaert Date: Mon, 23 Mar 2026 09:57:19 +0100 Subject: [PATCH 1/3] fixes to status management in JobManager --- CHANGELOG.md | 2 ++ openeo/extra/job_management/_manager.py | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 350808503..2f631c86a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `is_one_of()` to `property_filter` for compact building of an allow-list based property filter. ### Changed +- `MultiBackendJobManager`: "created" and "queued_for_start" are now also considered for the queue limit of a backend. ### Removed ### Fixed +- `MultiBackendJobManager`: status "queued_for_start" is no longer overwritten to "created" to allow consistent tracking of the job lifecycle and more accurate handling of job starting. ## [0.48.0] - 2026-02-17 diff --git a/openeo/extra/job_management/_manager.py b/openeo/extra/job_management/_manager.py index 5b0c1dba1..68d0021db 100644 --- a/openeo/extra/job_management/_manager.py +++ b/openeo/extra/job_management/_manager.py @@ -548,7 +548,9 @@ def _job_update_loop( # TODO: should "created" be included in here? Calling this "running" is quite misleading then. # apparently (see #839/#840) this seemingly simple change makes a lot of MultiBackendJobManager tests flaky running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"]) - queued = running[running["status"] == "queued"] + queued = running[ + running["status"] != "running" + ] # The queue limit is for non-running jobs, so we need to exclude running jobs from the queued count running_per_backend = running.groupby("backend_name").size().to_dict() queued_per_backend = queued.groupby("backend_name").size().to_dict() _log.info(f"{running_per_backend=} {queued_per_backend=}") @@ -870,6 +872,10 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = self._cancel_prolonged_job(the_job, active.loc[i]) + if new_status == "created" and previous_status == "queued_for_start": + # This means the backend needs more time to start the job + new_status = "queued_for_start" + active.loc[i, "status"] = new_status # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df` From 21cdfff0179f48e7b1c2124f9353f1d15155485b Mon Sep 17 00:00:00 2001 From: Victor Verhaert Date: Mon, 23 Mar 2026 12:06:27 +0100 Subject: [PATCH 2/3] removed created from queue limit --- CHANGELOG.md | 2 +- openeo/extra/job_management/_manager.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f631c86a..87821d70d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `is_one_of()` to `property_filter` for compact building of an allow-list based property filter. ### Changed -- `MultiBackendJobManager`: "created" and "queued_for_start" are now also considered for the queue limit of a backend. +- `MultiBackendJobManager`: "queued_for_start" is now also considered for the queue limit of a backend. ### Removed diff --git a/openeo/extra/job_management/_manager.py b/openeo/extra/job_management/_manager.py index 68d0021db..fbde6c880 100644 --- a/openeo/extra/job_management/_manager.py +++ b/openeo/extra/job_management/_manager.py @@ -548,9 +548,7 @@ def _job_update_loop( # TODO: should "created" be included in here? Calling this "running" is quite misleading then. # apparently (see #839/#840) this seemingly simple change makes a lot of MultiBackendJobManager tests flaky running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"]) - queued = running[ - running["status"] != "running" - ] # The queue limit is for non-running jobs, so we need to exclude running jobs from the queued count + queued = running[running["status"].isin(["queued", "queued_for_start"])] running_per_backend = running.groupby("backend_name").size().to_dict() queued_per_backend = queued.groupby("backend_name").size().to_dict() _log.info(f"{running_per_backend=} {queued_per_backend=}") From a537e1e3eda160fea61a4700a0b401c0fd23f27c Mon Sep 17 00:00:00 2001 From: Victor Verhaert Date: Mon, 23 Mar 2026 15:56:44 +0100 Subject: [PATCH 3/3] moved queued_for_start check to start of track_statusses --- openeo/extra/job_management/_manager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openeo/extra/job_management/_manager.py b/openeo/extra/job_management/_manager.py index fbde6c880..034500ded 100644 --- a/openeo/extra/job_management/_manager.py +++ b/openeo/extra/job_management/_manager.py @@ -844,6 +844,10 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})" ) + if new_status == "created" and previous_status == "queued_for_start": + # This means the backend needs more time to start the job, but we keep the internal status as "queued_for_start" to reflect that we have already triggered the start. + new_status = "queued_for_start" + if previous_status != "finished" and new_status == "finished": stats["job finished"] += 1 jobs_done.append((the_job, active.loc[i])) @@ -870,10 +874,6 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = self._cancel_prolonged_job(the_job, active.loc[i]) - if new_status == "created" and previous_status == "queued_for_start": - # This means the backend needs more time to start the job - new_status = "queued_for_start" - active.loc[i, "status"] = new_status # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`