Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job
job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot
)

_new_baseline_jobs, baseline_next_empty_slot = add_new_jobs(
_new_baseline_jobs, baseline_next_empty_slot, baseline_backlog_dropped = add_new_jobs(
job_queue_2d, new_jobs_count, new_jobs_durations,
new_jobs_nodes, new_jobs_cores, baseline_next_empty_slot, baseline_backlog_queue
)
metrics.baseline_jobs_submitted += new_jobs_count
metrics.episode_baseline_jobs_submitted += new_jobs_count
if baseline_backlog_dropped > 0:
metrics.baseline_jobs_dropped += baseline_backlog_dropped
metrics.episode_baseline_jobs_dropped += baseline_backlog_dropped

num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
Expand Down
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

MAX_NODES = 335 # Maximum number of nodes
MAX_QUEUE_SIZE = 2500 # Maximum number of jobs in the queue
MAX_BACKLOG_SIZE = 50000 # Maximum number of jobs in the backlog (overflow) queue
MAX_CHANGE = MAX_NODES
MAX_JOB_DURATION = 170 # maximum job runtime in hours
# Use a very high cap; age-based dropping is temporarily disabled in code.
Expand Down
9 changes: 7 additions & 2 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,13 @@ def step(self, action):

# Add new jobs to queue (overflow goes to helper)
self.env_print(f"[2] Adding {new_jobs_count} new jobs to the queue...")
new_jobs, self.next_empty_slot = add_new_jobs(
new_jobs, self.next_empty_slot, backlog_dropped = add_new_jobs(
job_queue_2d, new_jobs_count, new_jobs_durations,
new_jobs_nodes, new_jobs_cores, self.next_empty_slot, self.backlog_queue
)
if backlog_dropped > 0:
self.metrics.jobs_dropped += backlog_dropped
self.metrics.episode_jobs_dropped += backlog_dropped
self.metrics.jobs_submitted += new_jobs_count
self.metrics.episode_jobs_submitted += new_jobs_count

Expand All @@ -350,10 +353,12 @@ def step(self, action):
# Assign jobs to available nodes
self.env_print(f"[4] Assigning jobs to available nodes...")

num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes(
num_dropped_this_step = backlog_dropped
num_launched_jobs, self.next_empty_slot, queue_dropped, self.next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False
)
num_dropped_this_step += queue_dropped

self.env_print(f" {num_launched_jobs} jobs launched")

Expand Down
11 changes: 8 additions & 3 deletions src/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import numpy as np
from src.config import (
MAX_NODES, CORES_PER_NODE
MAX_NODES, CORES_PER_NODE, MAX_BACKLOG_SIZE
)


Expand Down Expand Up @@ -125,16 +125,21 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node
new_jobs_nodes: List of nodes required per job
new_jobs_cores: List of cores per node required per job
next_empty_slot: Index of next empty slot in queue
backlog_queue: Optional deque for overflow jobs

Returns:
Tuple of (list of added jobs (real queue + backlog queue), updated next_empty_slot)
Tuple of (list of added jobs, updated next_empty_slot, num_dropped)
"""
new_jobs = []
num_dropped = 0
for i in range(new_jobs_count):
# Check if we have space in the queue
if next_empty_slot >= len(job_queue_2d):
if backlog_queue is None:
break # Queue is full
if len(backlog_queue) >= MAX_BACKLOG_SIZE:
num_dropped += 1
continue # Backlog full, drop incoming job
job_entry = [
new_jobs_durations[i],
0, # Age starts at 0
Expand All @@ -159,7 +164,7 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node
while next_empty_slot < len(job_queue_2d) and job_queue_2d[next_empty_slot][0] != 0:
next_empty_slot += 1

return new_jobs, next_empty_slot
return new_jobs, next_empty_slot, num_dropped


def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs,
Expand Down