From a1910ed424220b22415d2a2049c4f69f853588fa Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Fri, 30 Jan 2026 13:33:48 +0100 Subject: [PATCH 1/3] Backlogger: After assign jobs, if queue is empty but backlogger has jobs, refill queue and continue assigning until ressources are filled up. Add backlog refill loop helper for job assignment centralize refill/assign loop in job_management use progress-based break (launched or moved from backlog) update environment and baseline to call helper --- src/baseline.py | 7 ++++--- src/environment.py | 11 +++++++---- src/job_management.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/src/baseline.py b/src/baseline.py index e9b98d2..9a9c890 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -4,7 +4,7 @@ from src.job_management import ( process_ongoing_jobs, add_new_jobs, - assign_jobs_to_available_nodes, + assign_jobs_with_backlog_refill, fill_queue_from_backlog, age_backlog_queue, ) @@ -54,9 +54,10 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job metrics.baseline_jobs_submitted += new_jobs_count metrics.episode_baseline_jobs_submitted += new_jobs_count - _, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes( + _, baseline_next_empty_slot, _, next_job_id = assign_jobs_with_backlog_refill( job_queue_2d, baseline_state['nodes'], baseline_cores_available, - baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True + baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, + baseline_backlog_queue, is_baseline=True ) num_used_nodes = np.sum(baseline_state['nodes'] > 0) diff --git a/src/environment.py b/src/environment.py index 06b9564..6530c32 100644 --- a/src/environment.py +++ b/src/environment.py @@ -22,7 +22,7 @@ ) from src.job_management import ( process_ongoing_jobs, add_new_jobs, - assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue + assign_jobs_with_backlog_refill, fill_queue_from_backlog, age_backlog_queue ) from src.node_management import adjust_nodes from src.reward_calculation import RewardCalculator @@ -303,9 +303,12 @@ def step(self, action): # Assign jobs to available nodes self.env_print(f"[4] Assigning jobs to available nodes...") - num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes( - job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, - self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False + num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = ( + assign_jobs_with_backlog_refill( + job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, + self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue, + is_baseline=False + ) ) self.env_print(f" {num_launched_jobs} jobs launched") diff --git a/src/job_management.py b/src/job_management.py index 547baa2..5338f4f 100644 --- a/src/job_management.py +++ b/src/job_management.py @@ -253,3 +253,37 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running job_queue_2d[job_idx][1] = new_age return num_processed_jobs, next_empty_slot, num_dropped, next_job_id + + +def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs, + next_empty_slot, next_job_id, metrics, backlog_queue, + is_baseline=False): + """ + Assign jobs, refilling the queue from backlog when it becomes empty. + Loop until no progress is made or resources/backlog are exhausted. + """ + total_launched_jobs = 0 + total_dropped_jobs = 0 + + while True: + num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( + job_queue_2d, nodes, cores_available, running_jobs, + next_empty_slot, next_job_id, metrics, is_baseline=is_baseline + ) + total_launched_jobs += num_launched_jobs + total_dropped_jobs += num_dropped + + queue_empty = np.all(job_queue_2d[:, 0] == 0) + backlog_has_jobs = len(backlog_queue) > 0 + resources_available = np.any((nodes >= 0) & (cores_available > 0)) + + moved_from_backlog = 0 + if queue_empty and backlog_has_jobs and resources_available: + next_empty_slot, moved_from_backlog = fill_queue_from_backlog( + job_queue_2d, backlog_queue, next_empty_slot + ) + + if num_launched_jobs == 0 and moved_from_backlog == 0: + break + + return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id From 4d706f6782ccea87e0c90e1e250f30267704a39e Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Fri, 30 Jan 2026 13:39:10 +0100 Subject: [PATCH 2/3] Callbacks: Added current on and used nodes to track agent behavior. --- src/callbacks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/callbacks.py b/src/callbacks.py index 409d0a2..8c0647a 100644 --- a/src/callbacks.py +++ b/src/callbacks.py @@ -38,6 +38,8 @@ def _on_step(self) -> bool: self.logger.record("metrics/jobs_completed", env.metrics.episode_jobs_completed) self.logger.record("metrics/completion_rate", completion_rate) self.logger.record("metrics/avg_wait_hours", avg_wait) + self.logger.record("metrics/on_nodes", env.metrics.episode_on_nodes[-1] if env.metrics.episode_on_nodes else 0) + self.logger.record("metrics/used_nodes", env.metrics.episode_used_nodes[-1] if env.metrics.episode_used_nodes else 0) self.logger.record("metrics/max_queue_size", env.metrics.episode_max_queue_size_reached) self.logger.record("metrics/max_backlog_size", env.metrics.episode_max_backlog_size_reached) self.logger.record("metrics/jobs_dropped", env.metrics.episode_jobs_dropped) From 2112bc5501a972e4dc97cfda40ff2f3acb844227 Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Fri, 30 Jan 2026 17:02:47 +0100 Subject: [PATCH 3/3] Observation Space: Added backlog_size and backlog_assigned. Agent now has two extra integer variables in observation space to know of the existence and the length of the backlog_queue. Flagged assign_jobs so it only ages waiting jobs once per step --- src/baseline.py | 2 +- src/environment.py | 18 +++++++++++++++++- src/job_management.py | 20 +++++++++++++++----- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/baseline.py b/src/baseline.py index 9a9c890..e021cac 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -54,7 +54,7 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job metrics.baseline_jobs_submitted += new_jobs_count metrics.episode_baseline_jobs_submitted += new_jobs_count - _, baseline_next_empty_slot, _, next_job_id = assign_jobs_with_backlog_refill( + _, baseline_next_empty_slot, _, next_job_id, _ = assign_jobs_with_backlog_refill( job_queue_2d, baseline_state['nodes'], baseline_cores_available, baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, baseline_backlog_queue, is_baseline=True diff --git a/src/environment.py b/src/environment.py index 6530c32..8b12573 100644 --- a/src/environment.py +++ b/src/environment.py @@ -158,6 +158,18 @@ def __init__(self, shape=(MAX_QUEUE_SIZE * 4,), dtype=np.int32 ), + 'backlog_size': spaces.Box( + low=0, + high=np.iinfo(np.int32).max, + shape=(1,), + dtype=np.int32 + ), + 'backlog_assigned': spaces.Box( + low=0, + high=np.iinfo(np.int32).max, + shape=(1,), + dtype=np.int32 + ), # predicted prices for the next 24h 'predicted_prices': spaces.Box( low=-1000, @@ -175,6 +187,8 @@ def _reset_timeline_state(self, start_index): 'nodes': np.zeros(MAX_NODES, dtype=np.int32), # Initialize job queue to be empty 'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32), + 'backlog_size': np.array([0], dtype=np.int32), + 'backlog_assigned': np.array([0], dtype=np.int32), # Initialize predicted prices array 'predicted_prices': self.prices.predicted_prices.copy(), } @@ -303,7 +317,7 @@ def step(self, action): # Assign jobs to available nodes self.env_print(f"[4] Assigning jobs to available nodes...") - num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = ( + num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = ( assign_jobs_with_backlog_refill( job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue, @@ -312,6 +326,8 @@ def step(self, action): ) self.env_print(f" {num_launched_jobs} jobs launched") + self.state['backlog_size'][0] = len(self.backlog_queue) + self.state['backlog_assigned'][0] = backlog_assigned # Calculate node utilization stats num_used_nodes = np.sum(self.state['nodes'] > 0) diff --git a/src/job_management.py b/src/job_management.py index 5338f4f..df9feb7 100644 --- a/src/job_management.py +++ b/src/job_management.py @@ -163,7 +163,8 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs, - next_empty_slot, next_job_id, metrics, is_baseline=False): + next_empty_slot, next_job_id, metrics, is_baseline=False, + age_waiting_jobs=True): """ Assign jobs from queue to available nodes. @@ -250,7 +251,8 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running # metrics.jobs_dropped += 1 # metrics.episode_jobs_dropped += 1 #else: - job_queue_2d[job_idx][1] = new_age + if age_waiting_jobs: + job_queue_2d[job_idx][1] = new_age return num_processed_jobs, next_empty_slot, num_dropped, next_job_id @@ -264,14 +266,21 @@ def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, runnin """ total_launched_jobs = 0 total_dropped_jobs = 0 + backlog_assigned = 0 + backlog_loaded_remaining = 0 while True: num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( job_queue_2d, nodes, cores_available, running_jobs, - next_empty_slot, next_job_id, metrics, is_baseline=is_baseline + next_empty_slot, next_job_id, metrics, is_baseline=is_baseline, + age_waiting_jobs=(backlog_loaded_remaining == 0) ) total_launched_jobs += num_launched_jobs total_dropped_jobs += num_dropped + if backlog_loaded_remaining > 0 and num_launched_jobs > 0: + assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining) + backlog_assigned += assigned_from_backlog + backlog_loaded_remaining -= assigned_from_backlog queue_empty = np.all(job_queue_2d[:, 0] == 0) backlog_has_jobs = len(backlog_queue) > 0 @@ -282,8 +291,9 @@ def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, runnin next_empty_slot, moved_from_backlog = fill_queue_from_backlog( job_queue_2d, backlog_queue, next_empty_slot ) + backlog_loaded_remaining += moved_from_backlog - if num_launched_jobs == 0 and moved_from_backlog == 0: + if moved_from_backlog == 0: break - return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id + return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned