Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from src.job_management import (
process_ongoing_jobs,
add_new_jobs,
assign_jobs_to_available_nodes,
assign_jobs_with_backlog_refill,
fill_queue_from_backlog,
age_backlog_queue,
)
Expand Down Expand Up @@ -54,9 +54,10 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job
metrics.baseline_jobs_submitted += new_jobs_count
metrics.episode_baseline_jobs_submitted += new_jobs_count

_, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
_, baseline_next_empty_slot, _, next_job_id, _ = assign_jobs_with_backlog_refill(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics,
baseline_backlog_queue, is_baseline=True
)

num_used_nodes = np.sum(baseline_state['nodes'] > 0)
Expand Down
2 changes: 2 additions & 0 deletions src/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def _on_step(self) -> bool:
self.logger.record("metrics/jobs_completed", env.metrics.episode_jobs_completed)
self.logger.record("metrics/completion_rate", completion_rate)
self.logger.record("metrics/avg_wait_hours", avg_wait)
self.logger.record("metrics/on_nodes", env.metrics.episode_on_nodes[-1] if env.metrics.episode_on_nodes else 0)
self.logger.record("metrics/used_nodes", env.metrics.episode_used_nodes[-1] if env.metrics.episode_used_nodes else 0)
self.logger.record("metrics/max_queue_size", env.metrics.episode_max_queue_size_reached)
self.logger.record("metrics/max_backlog_size", env.metrics.episode_max_backlog_size_reached)
self.logger.record("metrics/jobs_dropped", env.metrics.episode_jobs_dropped)
Expand Down
27 changes: 23 additions & 4 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from src.job_management import (
process_ongoing_jobs, add_new_jobs,
assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue
assign_jobs_with_backlog_refill, fill_queue_from_backlog, age_backlog_queue
)
from src.node_management import adjust_nodes
from src.reward_calculation import RewardCalculator
Expand Down Expand Up @@ -158,6 +158,18 @@ def __init__(self,
shape=(MAX_QUEUE_SIZE * 4,),
dtype=np.int32
),
'backlog_size': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
'backlog_assigned': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
Comment on lines +161 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add time features to the observation space.

The observation still lacks explicit time information (e.g., hour-of-episode or hour-of-day), which is required for the state representation. Please add a time feature to observation_space and update it each step/reset.

✅ Suggested addition (example)
             'predicted_prices': spaces.Box(
                 low=-1000,
                 high=1000,
                 shape=(24,),
                 dtype=np.float32
             ),
+            'time': spaces.Box(
+                low=0,
+                high=EPISODE_HOURS - 1,
+                shape=(1,),
+                dtype=np.int32
+            ),
         })
Based on learnings: State space representation should include node counts, job queue status, electricity prices, and time information.
🤖 Prompt for AI Agents
In `@src/environment.py` around lines 161 - 172, Observation lacks explicit time
info: add a time feature to the observation_space dict (e.g., 'time_of_day' or
'episode_hour') alongside existing keys like 'backlog_size' and
'backlog_assigned', and ensure the environment updates this feature on every
reset() and step() (or inside the existing _get_observation/_observe method) so
the returned observation contains the current time-of-day or hour-of-episode
value; update observation_space dtype/shape appropriately (e.g., Box(low=0,
high=23, shape=(1,), dtype=np.int32) or normalized float) and set the value
during reset() initialization and each step() before returning observations.

# predicted prices for the next 24h
'predicted_prices': spaces.Box(
low=-1000,
Expand All @@ -175,6 +187,8 @@ def _reset_timeline_state(self, start_index):
'nodes': np.zeros(MAX_NODES, dtype=np.int32),
# Initialize job queue to be empty
'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32),
'backlog_size': np.array([0], dtype=np.int32),
'backlog_assigned': np.array([0], dtype=np.int32),
# Initialize predicted prices array
'predicted_prices': self.prices.predicted_prices.copy(),
}
Expand Down Expand Up @@ -303,12 +317,17 @@ def step(self, action):
# Assign jobs to available nodes
self.env_print(f"[4] Assigning jobs to available nodes...")

num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False
num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = (
assign_jobs_with_backlog_refill(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue,
is_baseline=False
)
)

self.env_print(f" {num_launched_jobs} jobs launched")
self.state['backlog_size'][0] = len(self.backlog_queue)
self.state['backlog_assigned'][0] = backlog_assigned

# Calculate node utilization stats
num_used_nodes = np.sum(self.state['nodes'] > 0)
Expand Down
48 changes: 46 additions & 2 deletions src/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node


def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=False):
next_empty_slot, next_job_id, metrics, is_baseline=False,
age_waiting_jobs=True):
"""
Assign jobs from queue to available nodes.

Expand Down Expand Up @@ -250,6 +251,49 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running
# metrics.jobs_dropped += 1
# metrics.episode_jobs_dropped += 1
#else:
job_queue_2d[job_idx][1] = new_age
if age_waiting_jobs:
job_queue_2d[job_idx][1] = new_age

return num_processed_jobs, next_empty_slot, num_dropped, next_job_id


def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, backlog_queue,
is_baseline=False):
"""
Assign jobs, refilling the queue from backlog when it becomes empty.
Loop until no progress is made or resources/backlog are exhausted.
"""
total_launched_jobs = 0
total_dropped_jobs = 0
backlog_assigned = 0
backlog_loaded_remaining = 0

while True:
num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=is_baseline,
age_waiting_jobs=(backlog_loaded_remaining == 0)
)
total_launched_jobs += num_launched_jobs
total_dropped_jobs += num_dropped
if backlog_loaded_remaining > 0 and num_launched_jobs > 0:
assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining)
backlog_assigned += assigned_from_backlog
backlog_loaded_remaining -= assigned_from_backlog

queue_empty = np.all(job_queue_2d[:, 0] == 0)
backlog_has_jobs = len(backlog_queue) > 0
resources_available = np.any((nodes >= 0) & (cores_available > 0))

moved_from_backlog = 0
if queue_empty and backlog_has_jobs and resources_available:
next_empty_slot, moved_from_backlog = fill_queue_from_backlog(
job_queue_2d, backlog_queue, next_empty_slot
)
backlog_loaded_remaining += moved_from_backlog

if moved_from_backlog == 0:
break

return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned