diff --git a/src/baseline.py b/src/baseline.py index e9b98d2..e021cac 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -4,7 +4,7 @@ from src.job_management import ( process_ongoing_jobs, add_new_jobs, - assign_jobs_to_available_nodes, + assign_jobs_with_backlog_refill, fill_queue_from_backlog, age_backlog_queue, ) @@ -54,9 +54,10 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job metrics.baseline_jobs_submitted += new_jobs_count metrics.episode_baseline_jobs_submitted += new_jobs_count - _, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes( + _, baseline_next_empty_slot, _, next_job_id, _ = assign_jobs_with_backlog_refill( job_queue_2d, baseline_state['nodes'], baseline_cores_available, - baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True + baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, + baseline_backlog_queue, is_baseline=True ) num_used_nodes = np.sum(baseline_state['nodes'] > 0) diff --git a/src/callbacks.py b/src/callbacks.py index 409d0a2..8c0647a 100644 --- a/src/callbacks.py +++ b/src/callbacks.py @@ -38,6 +38,8 @@ def _on_step(self) -> bool: self.logger.record("metrics/jobs_completed", env.metrics.episode_jobs_completed) self.logger.record("metrics/completion_rate", completion_rate) self.logger.record("metrics/avg_wait_hours", avg_wait) + self.logger.record("metrics/on_nodes", env.metrics.episode_on_nodes[-1] if env.metrics.episode_on_nodes else 0) + self.logger.record("metrics/used_nodes", env.metrics.episode_used_nodes[-1] if env.metrics.episode_used_nodes else 0) self.logger.record("metrics/max_queue_size", env.metrics.episode_max_queue_size_reached) self.logger.record("metrics/max_backlog_size", env.metrics.episode_max_backlog_size_reached) self.logger.record("metrics/jobs_dropped", env.metrics.episode_jobs_dropped) diff --git a/src/environment.py b/src/environment.py index 06b9564..8b12573 100644 --- a/src/environment.py +++ b/src/environment.py @@ -22,7 +22,7 @@ ) from src.job_management import ( process_ongoing_jobs, add_new_jobs, - assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue + assign_jobs_with_backlog_refill, fill_queue_from_backlog, age_backlog_queue ) from src.node_management import adjust_nodes from src.reward_calculation import RewardCalculator @@ -158,6 +158,18 @@ def __init__(self, shape=(MAX_QUEUE_SIZE * 4,), dtype=np.int32 ), + 'backlog_size': spaces.Box( + low=0, + high=np.iinfo(np.int32).max, + shape=(1,), + dtype=np.int32 + ), + 'backlog_assigned': spaces.Box( + low=0, + high=np.iinfo(np.int32).max, + shape=(1,), + dtype=np.int32 + ), # predicted prices for the next 24h 'predicted_prices': spaces.Box( low=-1000, @@ -175,6 +187,8 @@ def _reset_timeline_state(self, start_index): 'nodes': np.zeros(MAX_NODES, dtype=np.int32), # Initialize job queue to be empty 'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32), + 'backlog_size': np.array([0], dtype=np.int32), + 'backlog_assigned': np.array([0], dtype=np.int32), # Initialize predicted prices array 'predicted_prices': self.prices.predicted_prices.copy(), } @@ -303,12 +317,17 @@ def step(self, action): # Assign jobs to available nodes self.env_print(f"[4] Assigning jobs to available nodes...") - num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes( - job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, - self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False + num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = ( + assign_jobs_with_backlog_refill( + job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs, + self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue, + is_baseline=False + ) ) self.env_print(f" {num_launched_jobs} jobs launched") + self.state['backlog_size'][0] = len(self.backlog_queue) + self.state['backlog_assigned'][0] = backlog_assigned # Calculate node utilization stats num_used_nodes = np.sum(self.state['nodes'] > 0) diff --git a/src/job_management.py b/src/job_management.py index 547baa2..df9feb7 100644 --- a/src/job_management.py +++ b/src/job_management.py @@ -163,7 +163,8 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs, - next_empty_slot, next_job_id, metrics, is_baseline=False): + next_empty_slot, next_job_id, metrics, is_baseline=False, + age_waiting_jobs=True): """ Assign jobs from queue to available nodes. @@ -250,6 +251,49 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running # metrics.jobs_dropped += 1 # metrics.episode_jobs_dropped += 1 #else: - job_queue_2d[job_idx][1] = new_age + if age_waiting_jobs: + job_queue_2d[job_idx][1] = new_age return num_processed_jobs, next_empty_slot, num_dropped, next_job_id + + +def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs, + next_empty_slot, next_job_id, metrics, backlog_queue, + is_baseline=False): + """ + Assign jobs, refilling the queue from backlog when it becomes empty. + Loop until no progress is made or resources/backlog are exhausted. + """ + total_launched_jobs = 0 + total_dropped_jobs = 0 + backlog_assigned = 0 + backlog_loaded_remaining = 0 + + while True: + num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( + job_queue_2d, nodes, cores_available, running_jobs, + next_empty_slot, next_job_id, metrics, is_baseline=is_baseline, + age_waiting_jobs=(backlog_loaded_remaining == 0) + ) + total_launched_jobs += num_launched_jobs + total_dropped_jobs += num_dropped + if backlog_loaded_remaining > 0 and num_launched_jobs > 0: + assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining) + backlog_assigned += assigned_from_backlog + backlog_loaded_remaining -= assigned_from_backlog + + queue_empty = np.all(job_queue_2d[:, 0] == 0) + backlog_has_jobs = len(backlog_queue) > 0 + resources_available = np.any((nodes >= 0) & (cores_available > 0)) + + moved_from_backlog = 0 + if queue_empty and backlog_has_jobs and resources_available: + next_empty_slot, moved_from_backlog = fill_queue_from_backlog( + job_queue_2d, backlog_queue, next_empty_slot + ) + backlog_loaded_remaining += moved_from_backlog + + if moved_from_backlog == 0: + break + + return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned