From 84e0e27f77108abe644de0a8b963024d22409158 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 29 Apr 2026 20:34:15 +0200 Subject: [PATCH] Add LiquidOracle and ContiguousOracle --- src/environment.py | 29 ++++- src/metrics_tracker.py | 19 +++ src/oracle.py | 264 +++++++++++++++++++++++++++++++++++++++++ src/oracles.md | 178 +++++++++++++++++++++++++++ train.py | 61 ++++++++-- train_iter.py | 8 +- 6 files changed, 546 insertions(+), 13 deletions(-) create mode 100644 src/oracle.py create mode 100644 src/oracles.md diff --git a/src/environment.py b/src/environment.py index 034e3f3..ec75fb6 100644 --- a/src/environment.py +++ b/src/environment.py @@ -32,6 +32,7 @@ from src.baseline import baseline_step from src.workload_generator import generate_jobs from src.metrics_tracker import MetricsTracker +from src.oracle import LiquidOracle, ContiguousOracle init() # Initialize colorama @@ -75,7 +76,8 @@ def __init__(self, workload_gen: WorkloadGenerator | None = None, job_arrival_scale: float = 1.0, jobs_exact_replay: bool = False, - output_dir: str = "sessions") -> None: + output_dir: str = "sessions", + enable_oracle: bool = False) -> None: super().__init__() self.weights = weights @@ -90,6 +92,8 @@ def __init__(self, self.evaluation_mode = evaluation_mode self.job_arrival_scale = float(job_arrival_scale) self.jobs_exact_replay = bool(jobs_exact_replay) + self.oracle = LiquidOracle() if enable_oracle else None + self.contiguous_oracle = ContiguousOracle() if enable_oracle else None self.next_plot_save = self.steps_per_iteration @@ -289,6 +293,15 @@ def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) self.episode_idx += 1 self.metrics.reset_episode_metrics() + if self.oracle is not None or self.contiguous_oracle is not None: + job_queue_2d = self.state['job_queue'].reshape(-1, 4) + active_rows = job_queue_2d[job_queue_2d[:, 0] > 0] + carried: list = [(int(r[1]), int(r[0]), int(r[2]), int(r[3])) for r in active_rows] + carried += [(int(j[1]), int(j[0]), int(j[2]), int(j[3])) for j in self.backlog_queue] + if self.oracle is not None: + self.oracle.reset(carried_jobs=carried if carried else None) + if self.contiguous_oracle is not None: + self.contiguous_oracle.reset(carried_jobs=carried if carried else None) if "price_start_index" in options: if self.prices is not None and self.prices.external_prices is not None: n_prices = len(self.prices.external_prices) @@ -354,6 +367,12 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, jobs_exact_replay=self.jobs_exact_replay, ) + # Record arriving jobs for oracles (same data the baseline receives) + if self.oracle is not None: + self.oracle.record(current_price, new_jobs_durations, new_jobs_nodes, new_jobs_cores) + if self.contiguous_oracle is not None: + self.contiguous_oracle.record(current_price, new_jobs_durations, new_jobs_nodes, new_jobs_cores) + # Add new jobs to queue (overflow goes to helper) self.env_print(f"[2] Adding {new_jobs_count} new jobs to the queue...") new_jobs, self.next_empty_slot, backlog_dropped = add_new_jobs( @@ -535,6 +554,14 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, truncated = True terminated = False + # Solve oracles for this episode before recording completion metrics + if self.oracle is not None: + self.metrics.episode_oracle_cost = self.oracle.solve() + if self.contiguous_oracle is not None: + self.metrics.episode_oracle_contiguous_cost = self.contiguous_oracle.solve() + self.metrics.episode_oracle_contiguous_unscheduled = self.contiguous_oracle.unscheduled_count + self.metrics.episode_oracle_contiguous_spillover = self.contiguous_oracle.spillover_count + # Record episode costs for long-term analysis self.metrics.record_episode_completion(self.current_episode) diff --git a/src/metrics_tracker.py b/src/metrics_tracker.py index ef2dc71..04532b2 100644 --- a/src/metrics_tracker.py +++ b/src/metrics_tracker.py @@ -31,6 +31,10 @@ def reset_timeline_metrics(self) -> None: self.total_cost: float = 0.0 self.baseline_cost: float = 0.0 self.baseline_cost_off: float = 0.0 + self.oracle_cost: float = 0.0 + self.oracle_contiguous_cost: float = 0.0 + self.oracle_contiguous_unscheduled: int = 0 + self.oracle_contiguous_spillover: int = 0 self.total_power_consumption_mwh: float = 0.0 self.baseline_power_consumption_mwh: float = 0.0 self.baseline_power_consumption_off_mwh: float = 0.0 @@ -72,6 +76,10 @@ def reset_episode_metrics(self) -> None: self.episode_total_cost: float = 0.0 self.episode_baseline_cost: float = 0.0 self.episode_baseline_cost_off: float = 0.0 + self.episode_oracle_cost: float = 0.0 + self.episode_oracle_contiguous_cost: float = 0.0 + self.episode_oracle_contiguous_unscheduled: int = 0 + self.episode_oracle_contiguous_spillover: int = 0 self.episode_total_power_consumption_mwh: float = 0.0 self.episode_baseline_power_consumption_mwh: float = 0.0 self.episode_baseline_power_consumption_off_mwh: float = 0.0 @@ -178,6 +186,7 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i ) savings_vs_baseline: float = self.episode_baseline_cost - self.episode_total_cost savings_vs_baseline_off: float = self.episode_baseline_cost_off - self.episode_total_cost + savings_vs_oracle: float = self.episode_total_cost - self.episode_oracle_cost dropped_jobs_per_saved_euro: float = self._safe_ratio( float(self.episode_jobs_dropped), savings_vs_baseline @@ -186,11 +195,21 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i float(self.episode_jobs_dropped), savings_vs_baseline_off ) if savings_vs_baseline_off > 0.0 else float("nan") + self.oracle_cost += self.episode_oracle_cost + self.oracle_contiguous_cost += self.episode_oracle_contiguous_cost + self.oracle_contiguous_unscheduled += self.episode_oracle_contiguous_unscheduled + self.oracle_contiguous_spillover += self.episode_oracle_contiguous_spillover + episode_data: dict[str, float | int] = { 'episode': current_episode, 'agent_cost': self.episode_total_cost, 'baseline_cost': self.episode_baseline_cost, 'baseline_cost_off': self.episode_baseline_cost_off, + 'oracle_cost': self.episode_oracle_cost, + 'oracle_contiguous_cost': self.episode_oracle_contiguous_cost, + 'oracle_contiguous_unscheduled': self.episode_oracle_contiguous_unscheduled, + 'oracle_contiguous_spillover': self.episode_oracle_contiguous_spillover, + 'savings_vs_oracle': savings_vs_oracle, 'agent_power_consumption_mwh': self.episode_total_power_consumption_mwh, 'baseline_power_consumption_mwh': self.episode_baseline_power_consumption_mwh, 'baseline_power_consumption_off_mwh': self.episode_baseline_power_consumption_off_mwh, diff --git a/src/oracle.py b/src/oracle.py new file mode 100644 index 0000000..77d1da0 --- /dev/null +++ b/src/oracle.py @@ -0,0 +1,264 @@ +"""Liquid oracle: theoretical lower bound on electricity cost for a given workload. + +The liquid oracle answers: given perfect knowledge of all arriving jobs and prices +for an episode, and the freedom to run any job at any hour (no arrival-time or +deadline constraints), what is the minimum achievable electricity cost? + +It treats the workload as a divisible fluid -- total core-hours -- and redistributes +it greedily to the cheapest hours first, up to cluster capacity. This is an +optimistic lower bound; real schedulers face arrival times, deadlines, and discrete +job sizes that the oracle ignores. + +Use it as a benchmark: the gap between baseline_off and oracle is the maximum +workload-shifting savings available in principle. The fraction of that gap captured +by the agent is how close to optimal the agent is. +""" + +import numpy as np +from numpy.lib.stride_tricks import sliding_window_view + +from src.config import CORES_PER_NODE, MAX_NODES, COST_USED_MW, MAX_JOB_AGE + +# Carried-over job tuple: (age, duration, nodes, cores_per_node) +CarriedJob = tuple[int, int, int, int] + + +class LiquidOracle: + """ + Accumulates job work (core-hours) and prices during an episode, then solves + for the minimum cost schedule at episode end. + + Algorithm: + 1. Total work W = sum over all arriving jobs of (duration * nodes * cores_per_node). + 2. Sort episode hours by price, cheapest first. + 3. Fill each hour up to MAX_CORES_PER_HOUR until W is exhausted. + 4. Cost = sum_t (core_hours_t / CORES_PER_NODE) * COST_USED_MW * price_t. + + No idle-node overhead is charged: the oracle powers on exactly the nodes it needs. + Negative prices are handled correctly -- those hours are filled first and earn revenue. + """ + + MAX_CORES_PER_HOUR: int = MAX_NODES * CORES_PER_NODE # 335 * 96 = 32,160 + + def __init__(self) -> None: + self._prices: list[float] = [] + self._total_core_hours: float = 0.0 + + def reset(self, carried_jobs: list[CarriedJob] | None = None) -> None: + """Reset at the start of a new episode. + + carried_jobs: jobs left in the queue/backlog at the end of the previous + episode, as (age, duration, nodes, cores_per_node) tuples. Their + work is pre-loaded so the oracle accounts for the same workload the + agent inherits. + """ + self._prices = [] + self._total_core_hours = 0.0 + if carried_jobs: + for _, duration, nodes, cores in carried_jobs: + self._total_core_hours += float(duration * nodes * cores) + + def record( + self, + price: float, + new_jobs_durations: list[int], + new_jobs_nodes: list[int], + new_jobs_cores: list[int], + ) -> None: + """Record one hour's price and the core-hours of work that arrived.""" + if len(new_jobs_durations) != len(new_jobs_nodes) or len(new_jobs_durations) != len(new_jobs_cores): + raise ValueError( + f"Job list lengths must match: durations={len(new_jobs_durations)}, " + f"nodes={len(new_jobs_nodes)}, cores={len(new_jobs_cores)}" + ) + self._prices.append(price) + for d, n, c in zip(new_jobs_durations, new_jobs_nodes, new_jobs_cores): + self._total_core_hours += float(d * n * c) + + def solve(self) -> float: + """ + Compute the minimum cost to execute all recorded work given the episode's + price series and cluster capacity. + + Returns: + Minimum achievable electricity cost (euros) for this episode's workload. + Returns 0.0 if no work or no prices were recorded. + """ + if not self._prices or self._total_core_hours <= 0.0: + return 0.0 + + sorted_prices = np.sort(np.asarray(self._prices, dtype=float)) # cheapest first + + remaining = self._total_core_hours + cost = 0.0 + max_cores = float(self.MAX_CORES_PER_HOUR) + + for price in sorted_prices: + if remaining <= 0.0: + break + core_hours = min(remaining, max_cores) + # No idle overhead: nodes powered = core_hours / CORES_PER_NODE (fractional) + power_mwh = (core_hours / CORES_PER_NODE) * COST_USED_MW + cost += power_mwh * price + remaining -= core_hours + + return cost + + @property + def total_core_hours(self) -> float: + """Total core-hours of work recorded so far this episode.""" + return self._total_core_hours + + +class ContiguousOracle: + """ + Achievable cost benchmark under perfect price foresight, respecting all real + job constraints: + + - Jobs cannot start before they arrive. + - Each job runs continuously for its full duration (no splitting/preemption). + - Fixed node/core requirements must be met simultaneously at every hour. + - Jobs expire if not started within MAX_JOB_AGE hours of arrival. + + Algorithm (least-slack-first greedy): + For each job, sorted by fewest feasible start positions relative to duration: + 1. Find every candidate start time t in [arrival, arrival+MAX_JOB_AGE-duration]. + 2. Check feasibility: max capacity in [t, t+duration) + job_cores <= cluster_capacity. + 3. Pick the cheapest feasible window (window price sum via prefix sums, O(1)/candidate). + 4. Commit that window to the capacity array. + + The greedy does not guarantee the global optimum, so the result is an upper + bound on what a perfect-foresight scheduler could achieve, not a lower bound. + It is always >= LiquidOracle cost (fewer scheduling freedoms). + + Drop-in replacement for LiquidOracle: same record()/reset()/solve() interface. + """ + + MAX_CORES_PER_HOUR: int = MAX_NODES * CORES_PER_NODE # 335 * 96 = 32,160 + + def __init__(self) -> None: + self._prices: list[float] = [] + # (arrival_hour, duration, total_cores) per job. + # arrival_hour is negative for carried-over jobs: -age encodes remaining + # lifetime as MAX_JOB_AGE - age hours from episode start. + self._jobs: list[tuple[int, int, float]] = [] + self.unscheduled_count: int = 0 + self.spillover_count: int = 0 + + def reset(self, carried_jobs: list[CarriedJob] | None = None) -> None: + """Reset at the start of a new episode. + + carried_jobs: jobs left in the queue/backlog at the end of the previous + episode, as (age, duration, nodes, cores_per_node) tuples. Encoded + with arrival = -age so that their deadline (arrival + MAX_JOB_AGE) + equals the remaining lifetime from episode start. + """ + self._prices = [] + self._jobs = [] + self.unscheduled_count = 0 + self.spillover_count = 0 + if carried_jobs: + for age, duration, nodes, cores in carried_jobs: + self._jobs.append((-age, duration, float(nodes * cores))) + + def record( + self, + price: float, + new_jobs_durations: list[int], + new_jobs_nodes: list[int], + new_jobs_cores: list[int], + ) -> None: + """Record one hour's price and all jobs that arrived this step.""" + if len(new_jobs_durations) != len(new_jobs_nodes) or len(new_jobs_durations) != len(new_jobs_cores): + raise ValueError( + f"Job list lengths must match: durations={len(new_jobs_durations)}, " + f"nodes={len(new_jobs_nodes)}, cores={len(new_jobs_cores)}" + ) + arrival = len(self._prices) # 0-based index of this hour in the episode + self._prices.append(price) + for d, n, c in zip(new_jobs_durations, new_jobs_nodes, new_jobs_cores): + self._jobs.append((arrival, d, float(n * c))) + + def solve(self) -> float: + """ + Compute a least-slack-first greedy schedule honoring job continuity, + arrival times, deadlines, and capacity constraints. + + Returns: + Schedule cost (euros) under perfect price foresight. 0.0 if no jobs + recorded. Not guaranteed to be the global minimum. + Jobs that cannot be placed (expired or fully capacity-blocked) are + skipped; check unscheduled_count and spillover_count after calling. + """ + if not self._prices or not self._jobs: + return 0.0 + + T = len(self._prices) + prices = np.asarray(self._prices, dtype=float) + + # Prefix sums: sum(prices[t : t+d]) = price_prefix[t+d] - price_prefix[t] + price_prefix = np.empty(T + 1, dtype=float) + price_prefix[0] = 0.0 + np.cumsum(prices, out=price_prefix[1:]) + + capacity = np.zeros(T, dtype=float) + max_cores = float(self.MAX_CORES_PER_HOUR) + cost_factor = COST_USED_MW / CORES_PER_NODE # cost per core-hour per unit price + + # Least-slack-first: jobs with fewest candidate start positions (relative + # to their duration) are harder to place and get first pick of cheap windows. + # For carried-over jobs arrival is negative; clamp t_min to 0 for slack calc. + def _slack(job: tuple[int, int, float]) -> float: + arrival, duration, _ = job + t_min = max(0, arrival) + t_max = min(arrival + MAX_JOB_AGE, T) - duration + return max(t_max - t_min + 1, 0) / max(duration, 1) + + self.unscheduled_count = 0 + self.spillover_count = 0 + total_cost = 0.0 + + for arrival, duration, total_cores in sorted(self._jobs, key=_slack): + t_min = max(0, arrival) # carried-over jobs have negative arrival + t_max = min(arrival + MAX_JOB_AGE, T) - duration + + if t_max < t_min: + # For non-carried jobs (arrival >= 0): deadline = arrival + MAX_JOB_AGE > T, + # so the job genuinely spills into the next episode. + # For carried-over jobs (arrival < 0): deadline = arrival + MAX_JOB_AGE <= T, + # so the job expired within this episode and cannot be scheduled. + if arrival + MAX_JOB_AGE > T: + self.spillover_count += 1 + else: + self.unscheduled_count += 1 + continue + + n_candidates = t_max - t_min + 1 + + # Sliding max of the capacity array over windows of size `duration`. + # Vectorised across all candidate start times at once. + cap_slice = capacity[t_min : t_max + duration] + if duration == 1: + max_in_window = cap_slice[:n_candidates].copy() + else: + max_in_window = sliding_window_view(cap_slice, duration).max(axis=1) + feasible = max_in_window + total_cores <= max_cores + + if not feasible.any(): + self.unscheduled_count += 1 + continue # fully capacity-blocked; job unschedulable + + # O(1) per candidate: window price sum from prefix array + t_starts = np.arange(t_min, t_max + 1, dtype=np.intp) + window_costs = ( + (price_prefix[t_starts + duration] - price_prefix[t_starts]) + * total_cores + * cost_factor + ) + window_costs[~feasible] = np.inf + + best_idx = int(window_costs.argmin()) + capacity[t_min + best_idx : t_min + best_idx + duration] += total_cores + total_cost += float(window_costs[best_idx]) + + return total_cost diff --git a/src/oracles.md b/src/oracles.md new file mode 100644 index 0000000..c25e56d --- /dev/null +++ b/src/oracles.md @@ -0,0 +1,178 @@ +# Oracle Benchmarks for PowerSched + +## Motivation + +PowerSched trains a reinforcement learning agent to reduce electricity costs by shifting +compute workload towards cheaper price periods. A key open question is: + +> **How much savings are achievable in principle?** + +Without an upper bound, we can observe that the agent saves X euros versus the greedy +baseline, but we cannot say whether that is 10% or 90% of what is theoretically possible. +An *oracle* — a scheduler with perfect foresight that solves for the optimal schedule +offline — provides that upper bound. + +The oracle does not need to run in real time. It receives the same job stream and price +data that the environment sees, accumulates them over the episode, and solves for the +minimum-cost schedule at episode end. + +--- + +## What the Agent Is Up Against + +The agent operates under hard real-world constraints: + +- Jobs arrive online; the agent cannot know future arrivals +- A job, once started, runs continuously until it finishes (no preemption) +- Each job has fixed resource requirements (nodes, cores) that must be met simultaneously +- Jobs expire if they wait longer than `MAX_AGE` (336 hours) +- The cluster has a hard capacity cap (335 nodes, 96 cores/node) + +The core lever available to the agent is **workload shifting**: delay jobs that arrive +during expensive price periods until prices drop, then burst through the backlog. + +--- + +## Oracle Hierarchy + +Four levels of oracle are possible, ordered from most optimistic (lowest cost, easiest +to compute) to most realistic (tightest bound, hardest to compute): + +| Oracle | Arrival times | Deadlines | Job continuity | Node/core requirements | Complexity | +|---|:---:|:---:|:---:|:---:|---| +| **Liquid** (implemented) | ✗ | ✗ | ✗ | ✗ | Trivial — sort + greedy fill | +| **LP relaxation** | ✓ | ✓ | ✗ | ✓ | Easy — standard LP solver | +| **Job-contiguous greedy** | ✓ | ✓ | ✓ | ✓ | Fast heuristic | +| **ILP (exact optimal)** | ✓ | ✓ | ✓ | ✓ | NP-hard, intractable at scale | + +--- + +## Oracle 1 — Liquid Lower Bound (Implemented) + +### Idea + +Treat all arriving work as a divisible fluid. Compute total core-hours demanded by the +episode's workload, then redistribute that fluid freely across all 336 hours, filling the +cheapest hours first up to cluster capacity. + +### Algorithm + +1. Accumulate `W = Σ (duration × nodes × cores_per_node)` over all arriving jobs. +2. Sort the 336 hourly prices cheapest-first. +3. Fill each hour up to `335 × 96 = 32,160` core-hours until `W` is exhausted. +4. Cost = `Σ_t (core_hours_t / 96) × 0.45 kW × price_t` + +### What it ignores + +- Jobs cannot be split across hours; a 100-hour job on 4 nodes is **not** equivalent to + 400 node-hours of fluid. +- Work cannot be done before jobs arrive. +- Jobs have deadlines. + +### When it is useful + +The liquid oracle is a fast sanity check. If the agent's cost is far above the liquid +oracle, there is meaningful room to improve. If the agent's cost is close, the agent is +near-optimal (or the oracle bound is loose for this workload). + +It is particularly loose for workloads with many long-running jobs, where splitting is +physically impossible. It is fairly tight for workloads with many short jobs. + +--- + +## Oracle 2 — Job-Contiguous Greedy + +### Idea + +Honor all real job constraints (arrival times, deadlines, continuous execution, fixed +resource requirements) but give the scheduler **perfect foresight**: it sees all future +prices and arrivals before making any decision. It assigns each job to the cheapest +contiguous time window in which it fits. + +### Algorithm + +**Setup** (run once at episode end): +- `capacity[t]` = cores committed at hour `t`, initialized to 0 for all 336 hours. +- Full price series `price[0..335]`. +- Full job list with `(arrival, duration, nodes, cores)` for every job. + +**For each job** (sorted by decreasing scheduling difficulty — see below): + +1. Determine the feasibility window: start time `t` must satisfy + `arrival ≤ t ≤ arrival + MAX_AGE − duration` +2. For each candidate start `t` in that window, check: + `max(capacity[t : t+duration]) + nodes × cores ≤ 335 × 96` +3. Among all feasible candidates, pick the cheapest: + `t* = argmin Σ_{h=t}^{t+duration−1} price[h]` +4. Commit: `capacity[t* : t*+duration] += nodes × cores` +5. Accumulate cost. + +Both the feasibility check and the cost sum are numpy slice operations, accelerated to +O(1) per candidate with prefix sums. + +**Sort order** (matters for greedy quality): + +The recommended order is **least-slack first**: sort by +`(feasibility window size) / duration` ascending, so jobs with the fewest placement +options get first pick. This is analogous to Earliest Deadline First scheduling and +produces tighter, more realistic assignments than arrival-time order. + +### Complexity + +`O(N_jobs × MAX_AGE)` per episode with prefix-sum acceleration. For a typical episode +(~50,000 jobs, 336-hour window) this is approximately 17 million simple operations — +fast enough to run at episode end with negligible overhead. + +### What it still ignores vs. the real agent + +- **Perfect foresight**: the oracle knows all future prices and arrivals; the agent does + not. +- **No scheduling overhead**: the oracle makes one globally optimal assignment; the agent + takes one action per hour under uncertainty. + +These are the only remaining freedoms the oracle has over the agent. Everything else +(arrival times, deadlines, continuity, resource requirements) is respected. + +--- + +## How the Benchmarks Fit Together + +With both oracles implemented, a single evaluation run produces: + +``` +baseline_cost greedy FIFO, all nodes always on +baseline_cost_off greedy FIFO, idle nodes turned off +agent_cost trained RL agent +oracle_cost liquid oracle (optimistic lower bound) +oracle_contiguous_cost job-contiguous greedy oracle (tight lower bound) +``` + +This gives three meaningful gaps: + +| Gap | Meaning | +|---|---| +| `baseline_off − agent` | Savings the agent actually achieved | +| `baseline_off − oracle_jcg` | Maximum workload-shifting savings available in principle | +| `agent − oracle_jcg` | How far the agent is from the realistic optimum | +| `oracle_jcg − oracle_liq` | Cost of honoring job-continuity constraints | + +The key metric for evaluating agent quality becomes: + +``` +Agent Capture Rate = (baseline_off − agent) / (baseline_off − oracle_jcg) +``` + +A capture rate of 80% means the agent recovers 80% of the theoretically achievable +workload-shifting savings, with the remaining 20% left on the table. + +--- + +## Implementation Status + +| Component | Status | +|---|---| +| Liquid oracle (`src/oracle.py`) | ✅ Implemented | +| Wired into simulation (runs alongside every episode) | ✅ Implemented | +| `--oracle` flag in `train.py` and `train_iter.py` | ✅ Implemented | +| Oracle cost reported in evaluation output | ✅ Implemented | +| Job-contiguous greedy oracle | ✅ Implemented | diff --git a/train.py b/train.py index e6f30c8..cc53579 100644 --- a/train.py +++ b/train.py @@ -72,6 +72,7 @@ def main(): parser.add_argument("--session", default="default", help="Session ID") parser.add_argument("--output-dir", default="sessions", help="Base directory for all output (models, logs, plots). Defaults to 'sessions'.") parser.add_argument("--evaluate-savings", action='store_true', help="Load latest model and evaluate long-term savings (no training)") + parser.add_argument("--oracle", action='store_true', help="Enable both liquid and contiguous oracles alongside simulation to compute theoretical minimum cost lower bounds.") parser.add_argument("--eval-months", type=int, default=12, help="Months to evaluate for savings analysis (default: 12, only used with --evaluate-savings)") add_workloadgen_args(parser) parser.add_argument("--plot-dashboard", action="store_true", help="Generate dashboard plot (per-hour panels + cumulative savings).") @@ -171,7 +172,8 @@ def main(): workload_gen=workload_gen, job_arrival_scale=args.job_arrival_scale, jobs_exact_replay=args.jobs_exact_replay, - output_dir=args.output_dir) + output_dir=args.output_dir, + enable_oracle=args.oracle) env.reset(seed=args.seed) # Check if there are any saved models in models_dir @@ -277,17 +279,24 @@ def main(): baseline_occupancy_cores_pct = mean_occupancy_pct(env.metrics.episode_baseline_used_cores, CORES_PER_NODE * MAX_NODES) agent_occupancy_nodes_pct = mean_occupancy_pct(env.metrics.episode_used_nodes, MAX_NODES) baseline_occupancy_nodes_pct = mean_occupancy_pct(env.metrics.episode_baseline_used_nodes, MAX_NODES) - print( - build_episode_summary_line( - episode_number=episode + 1, - episode_data=episode_data, - timeline_max_queue=env.metrics.max_queue_size_reached, - agent_occupancy_cores_pct=agent_occupancy_cores_pct, - baseline_occupancy_cores_pct=baseline_occupancy_cores_pct, - agent_occupancy_nodes_pct=agent_occupancy_nodes_pct, - baseline_occupancy_nodes_pct=baseline_occupancy_nodes_pct, - ) + summary_line = build_episode_summary_line( + episode_number=episode + 1, + episode_data=episode_data, + timeline_max_queue=env.metrics.max_queue_size_reached, + agent_occupancy_cores_pct=agent_occupancy_cores_pct, + baseline_occupancy_cores_pct=baseline_occupancy_cores_pct, + agent_occupancy_nodes_pct=agent_occupancy_nodes_pct, + baseline_occupancy_nodes_pct=baseline_occupancy_nodes_pct, ) + if args.oracle: + liq = float(episode_data.get('oracle_cost', 0.0)) + con = float(episode_data.get('oracle_contiguous_cost', 0.0)) + if liq != 0.0: + summary_line += f", OracleLiq=€{liq:.0f}" + if con != 0.0: + above = float(episode_data['agent_cost']) - con + summary_line += f", OracleCon=€{con:.0f}, AboveOracleCon=€{above:.0f}" + print(summary_line) print(f"\nEvaluation complete! Generated {num_episodes} episodes of cost data.") @@ -379,6 +388,36 @@ def main(): print("\n=== COST SAVINGS (TOTAL OVER EVALUATION) ===") print(f" Vs Baseline: €{total_savings_vs_baseline:,.0f}, {fmt_optional(safe_ratio(total_savings_vs_baseline * 100.0, total_baseline_cost), 1)}%") print(f" Vs Baseline_off: €{total_savings_vs_baseline_off:,.0f}, {fmt_optional(safe_ratio(total_savings_vs_baseline_off * 100.0, total_baseline_off_cost), 1)}%") + + if args.oracle: + total_oracle_liquid_cost = env.metrics.oracle_cost + total_oracle_contiguous_cost = env.metrics.oracle_contiguous_cost + print("\n=== ORACLE BENCHMARKS (THEORETICAL LOWER BOUNDS) ===") + if total_oracle_liquid_cost != 0.0: + print("\n Liquid Oracle (optimistic — allows job splitting):") + print(f" Total Cost: €{total_oracle_liquid_cost:,.0f}") + liq_window = total_baseline_off_cost - total_oracle_liquid_cost + liq_capture = safe_ratio((total_baseline_off_cost - total_agent_cost) * 100.0, liq_window) + print(f" Max Shifting Window (fluid): €{liq_window:,.0f} (baseline_off - oracle_liq)") + print(f" Agent Capture Rate: {fmt_optional(liq_capture, 1)}%") + if total_oracle_contiguous_cost != 0.0: + total_oracle_contiguous_unscheduled = env.metrics.oracle_contiguous_unscheduled + print("\n Contiguous Oracle (realistic — honors job continuity):") + print(f" Total Cost: €{total_oracle_contiguous_cost:,.0f}") + con_window = total_baseline_off_cost - total_oracle_contiguous_cost + con_capture = safe_ratio((total_baseline_off_cost - total_agent_cost) * 100.0, con_window) + agent_above = total_agent_cost - total_oracle_contiguous_cost + print(f" Max Shifting Window (real jobs): €{con_window:,.0f} (baseline_off - oracle_jcg)") + print(f" Agent Gap to Oracle: €{agent_above:,.0f} (agent - oracle_jcg)") + print(f" Agent Capture Rate: {fmt_optional(con_capture, 1)}%") + total_oracle_contiguous_spillover = env.metrics.oracle_contiguous_spillover + if total_oracle_contiguous_unscheduled > 0: + print(f" Unscheduled Jobs (oracle): {total_oracle_contiguous_unscheduled} (capacity-blocked)") + if total_oracle_contiguous_spillover > 0: + print(f" Cross-Episode Spillover (oracle): {total_oracle_contiguous_spillover} (carried to next episode)") + if total_oracle_liquid_cost != 0.0 and total_oracle_contiguous_cost != 0.0: + continuity_cost = total_oracle_contiguous_cost - total_oracle_liquid_cost + print(f"\n Continuity Constraint Cost: €{continuity_cost:,.0f} (oracle_jcg - oracle_liq)") except Exception as e: print(f"Could not generate cumulative savings plot: {e}") diff --git a/train_iter.py b/train_iter.py index ca601aa..d678655 100644 --- a/train_iter.py +++ b/train_iter.py @@ -120,6 +120,7 @@ def build_command( eval_months=0, workloadgen_args=None, output_dir=None, + oracle=False, ): python_executable = sys.executable command = [ @@ -147,6 +148,8 @@ def build_command( command += ["--seed-sweep"] if evaluate_savings: command += ["--evaluate-savings", "--eval-months", str(eval_months)] + if oracle: + command += ["--oracle"] if workloadgen_args: command += workloadgen_args if output_dir is not None: @@ -367,7 +370,7 @@ def run_all_parallel(combinations, max_parallel, iter_limit_per_step, session, p job_durations, jobs, hourly_jobs, job_arrival_scale, jobs_exact_replay, plot_dashboard, dashboard_hours, seeds, seed_sweep, evaluate_savings, eval_months, workloadgen_args, - no_tui=False, output_dir=None): + no_tui=False, output_dir=None, oracle=False): multi_seed = len(seeds) > 1 current_env = os.environ.copy() log_dir = make_log_dir(session, output_dir or "sessions") @@ -385,6 +388,7 @@ def launch(combo, seed): plot_dashboard, dashboard_hours, seed, seed_sweep, evaluate_savings, eval_months, workloadgen_args, output_dir=output_dir, + oracle=oracle, ) log_path = os.path.join(log_dir, label_to_filename(label)) log_fh = open(log_path, "w") @@ -443,6 +447,7 @@ def main(): parser.add_argument("--parallel", type=int, default=1, metavar="N", help="Number of training runs to execute in parallel (default: 1, sequential)") parser.add_argument("--evaluate-savings", action="store_true", help="Forward to train.py to evaluate savings compared to baseline.") parser.add_argument("--eval-months", type=int, default=6, help="Number of months to evaluate savings over (forwarded to train.py)") + parser.add_argument("--oracle", action="store_true", help="Forward to train.py: enable both liquid and contiguous oracles alongside simulation.") parser.add_argument("--no-tui", action="store_true", help="Disable interactive TUI; print plain progress lines instead (auto-disabled when not a TTY)") add_workloadgen_args(parser) @@ -510,6 +515,7 @@ def main(): workloadgen_args=workloadgen_args, no_tui=args.no_tui, output_dir=args.output_dir, + oracle=args.oracle, ) if failures: print(f"{failures} run(s) failed")