From 4d633ae585c211826d60c75f453fdaa3d80f208d Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 11 Feb 2026 11:58:10 +0100 Subject: [PATCH] Limit backlog size to a reasonable value and count drops there --- src/baseline.py | 5 ++++- src/config.py | 1 + src/environment.py | 9 +++++++-- src/job_management.py | 11 ++++++++--- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/baseline.py b/src/baseline.py index 5f91e0f..4b8057e 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -47,12 +47,15 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot ) - _new_baseline_jobs, baseline_next_empty_slot = add_new_jobs( + _new_baseline_jobs, baseline_next_empty_slot, baseline_backlog_dropped = add_new_jobs( job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores, baseline_next_empty_slot, baseline_backlog_queue ) metrics.baseline_jobs_submitted += new_jobs_count metrics.episode_baseline_jobs_submitted += new_jobs_count + if baseline_backlog_dropped > 0: + metrics.baseline_jobs_dropped += baseline_backlog_dropped + metrics.episode_baseline_jobs_dropped += baseline_backlog_dropped num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes( job_queue_2d, baseline_state['nodes'], baseline_cores_available, diff --git a/src/config.py b/src/config.py index 7d1a01b..6e0d7d9 100644 --- a/src/config.py +++ b/src/config.py @@ -4,6 +4,7 @@ MAX_NODES = 335 # Maximum number of nodes MAX_QUEUE_SIZE = 2500 # Maximum number of jobs in the queue +MAX_BACKLOG_SIZE = 50000 # Maximum number of jobs in the backlog (overflow) queue MAX_CHANGE = MAX_NODES MAX_JOB_DURATION = 170 # maximum job runtime in hours # Use a very high cap; age-based dropping is temporarily disabled in code. diff --git a/src/environment.py b/src/environment.py index 2353bd8..01a08e1 100644 --- a/src/environment.py +++ b/src/environment.py @@ -328,10 +328,13 @@ def step(self, action): # Add new jobs to queue (overflow goes to helper) self.env_print(f"[2] Adding {new_jobs_count} new jobs to the queue...") - new_jobs, self.next_empty_slot = add_new_jobs( + new_jobs, self.next_empty_slot, backlog_dropped = add_new_jobs( job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores, self.next_empty_slot, self.backlog_queue ) + if backlog_dropped > 0: + self.metrics.jobs_dropped += backlog_dropped + self.metrics.episode_jobs_dropped += backlog_dropped self.metrics.jobs_submitted += new_jobs_count self.metrics.episode_jobs_submitted += new_jobs_count @@ -350,10 +353,12 @@ def step(self, action): # Assign jobs to available nodes self.env_print(f"[4] Assigning jobs to available nodes...") - num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes( + num_dropped_this_step = backlog_dropped + num_launched_jobs, self.next_empty_slot, queue_dropped, self.next_job_id = assign_jobs_to_available_nodes( job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False ) + num_dropped_this_step += queue_dropped self.env_print(f" {num_launched_jobs} jobs launched") diff --git a/src/job_management.py b/src/job_management.py index 547baa2..7febc49 100644 --- a/src/job_management.py +++ b/src/job_management.py @@ -2,7 +2,7 @@ import numpy as np from src.config import ( - MAX_NODES, CORES_PER_NODE + MAX_NODES, CORES_PER_NODE, MAX_BACKLOG_SIZE ) @@ -125,16 +125,21 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node new_jobs_nodes: List of nodes required per job new_jobs_cores: List of cores per node required per job next_empty_slot: Index of next empty slot in queue + backlog_queue: Optional deque for overflow jobs Returns: - Tuple of (list of added jobs (real queue + backlog queue), updated next_empty_slot) + Tuple of (list of added jobs, updated next_empty_slot, num_dropped) """ new_jobs = [] + num_dropped = 0 for i in range(new_jobs_count): # Check if we have space in the queue if next_empty_slot >= len(job_queue_2d): if backlog_queue is None: break # Queue is full + if len(backlog_queue) >= MAX_BACKLOG_SIZE: + num_dropped += 1 + continue # Backlog full, drop incoming job job_entry = [ new_jobs_durations[i], 0, # Age starts at 0 @@ -159,7 +164,7 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node while next_empty_slot < len(job_queue_2d) and job_queue_2d[next_empty_slot][0] != 0: next_empty_slot += 1 - return new_jobs, next_empty_slot + return new_jobs, next_empty_slot, num_dropped def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs,