From 52128afa2bacf12ef6378606c898b00bfa741580 Mon Sep 17 00:00:00 2001 From: Harsh Pandhe Date: Wed, 20 May 2026 13:12:25 +0530 Subject: [PATCH] feat(phase-12e): NSGA-II is now Phase 12c (surface_fn) and 12d (forced flyover) aware MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Until now the GA accepted surface_fn (Phase 12c joint H+V) and forced_flyover_zones (Phase 12d) but NSGA-II quietly ignored them. Anyone wanting Pareto + horizontal optimization had no path. Same for fly-over obstacles on the multi-objective track. multi_optimizer changes: - _RopewayProblem takes surface_fn + forced_flyover_zones kwargs - When cfg.corridor_half_width_m > 0 the pymoo n_var picks up the 4th per-slot gene (lateral offset) via the shared _genes_per_slot helper from optimizer.py — keeps the GA and NSGA-II genome layouts identical - Decoder is now zone-aware (no_tower_zones passed in), matching the GA's behaviour so NSGA-II never wastes evaluations on tower-in-zone individuals - Final alignment construction carries surface_fn and forced_flyover_zones forward so the evaluator sees them - optimize_pareto signature extended with both kwargs; defaults None / [] keep all existing callers behaviourally identical Scope note: RL env (Gymnasium RopewayRoutingEnv) is a follow-up — its action space would need new offset gene; not in this PR. The NSGA-II/GA parity is the high-value win. Tests: tests/test_nsga_phase12.py — 4 new - pure-vertical NSGA-II still produces a feasible front (no offsets) - NSGA-II accepts surface_fn + corridor_half_width and uses the lateral-offset gene to route around a centreline wall - NSGA-II honours forced_flyover_zones (no fly-over violations in the best feasible solution) - NSGA-II honours no_tower_zones (decoder zone-awareness reaches pymoo path) Full suite 195 -> 203, zero regressions. --- src/ropeway/multi_optimizer.py | 36 +++++++-- tests/test_nsga_phase12.py | 135 +++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 6 deletions(-) create mode 100644 tests/test_nsga_phase12.py diff --git a/src/ropeway/multi_optimizer.py b/src/ropeway/multi_optimizer.py index f6c0c76..db392e5 100644 --- a/src/ropeway/multi_optimizer.py +++ b/src/ropeway/multi_optimizer.py @@ -74,8 +74,15 @@ def __init__( clearance_profile: ClearanceProfile | None, no_tower_zones: list | None = None, intermediate_stations: Sequence[float] | None = None, + surface_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] | None = None, + forced_flyover_zones: list | None = None, ): - n_var = HEADER_GENES + GENES_PER_SLOT * max_slots + # Phase 12e: track the per-slot gene count the GA-side decoder uses so + # NSGA-II's pymoo Problem has the matching n_var when the Phase 12c + # corridor_half_width_m > 0 lateral-offset gene is in play. + from .optimizer import _genes_per_slot + gps = _genes_per_slot(cfg) + n_var = HEADER_GENES + gps * max_slots h_lo, h_hi = cfg.min_tower_height_m, cfg.max_tower_height_m xl = np.zeros(n_var) xu = np.ones(n_var) @@ -83,11 +90,14 @@ def __init__( xl[0] = xl[1] = h_lo xu[0] = xu[1] = h_hi for i in range(max_slots): - j = HEADER_GENES + i * GENES_PER_SLOT + j = HEADER_GENES + i * gps # x_frac in [0, 1]; height in [h_lo, h_hi]; active in [0, 1]. xl[j], xu[j] = 0.0, 1.0 xl[j + 1], xu[j + 1] = h_lo, h_hi xl[j + 2], xu[j + 2] = 0.0, 1.0 + if gps == 4: + # Phase 12e lateral-offset gene (offset_frac in [0, 1]). + xl[j + 3], xu[j + 3] = 0.0, 1.0 super().__init__(n_var=n_var, n_obj=3, n_constr=1, xl=xl, xu=xu) self.profile_fn = profile_fn self.corridor_length = corridor_length @@ -96,19 +106,24 @@ def __init__( self.clearance_profile = clearance_profile self.no_tower_zones = no_tower_zones or [] self.intermediate_stations = list(intermediate_stations or []) + self.surface_fn = surface_fn + self.forced_flyover_zones = forced_flyover_zones or [] def _evaluate(self, X: np.ndarray, out: dict, *args, **kwargs) -> None: F = np.zeros((X.shape[0], 3)) G = np.zeros((X.shape[0], 1)) for k, genome in enumerate(X): towers = _decode(list(genome), self.corridor_length, self.cfg, self.max_slots, - intermediate_stations=self.intermediate_stations) + intermediate_stations=self.intermediate_stations, + no_tower_zones=self.no_tower_zones) alignment = Alignment( towers=towers, profile_fn=self.profile_fn, cfg=self.cfg, clearance_profile=self.clearance_profile, no_tower_zones=self.no_tower_zones, + surface_fn=self.surface_fn, + forced_flyover_zones=self.forced_flyover_zones, ) res = evaluate_alignment(alignment) n_towers = len(towers) @@ -134,20 +149,26 @@ def optimize_pareto( clearance_profile: ClearanceProfile | None = None, no_tower_zones: list | None = None, intermediate_stations: Sequence[float] | None = None, + surface_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] | None = None, + forced_flyover_zones: list | None = None, verbose: bool = False, ) -> ParetoFront: """Run NSGA-II and return the feasible non-dominated solutions. - ``no_tower_zones`` / ``intermediate_stations`` behave exactly as in - :func:`ropeway.optimizer.optimize`. + ``no_tower_zones`` / ``intermediate_stations`` / ``surface_fn`` / + ``forced_flyover_zones`` behave exactly as in + :func:`ropeway.optimizer.optimize` — NSGA-II is now 12c/12d-aware. """ cfg = cfg or ConstraintConfig() nsga = nsga or NSGAConfig() no_tower_zones = no_tower_zones or [] intermediate_stations = list(intermediate_stations or []) + forced_flyover_zones = forced_flyover_zones or [] problem = _RopewayProblem( profile_fn, corridor_length, cfg, nsga.max_intermediate_towers, clearance_profile, no_tower_zones, intermediate_stations, + surface_fn=surface_fn, + forced_flyover_zones=forced_flyover_zones, ) algorithm = NSGA2( pop_size=nsga.population_size, @@ -170,13 +191,16 @@ def optimize_pareto( if g_row[0] > 1e-6: continue # skip any infeasibles that slipped through towers = _decode(list(genome), corridor_length, cfg, nsga.max_intermediate_towers, - intermediate_stations=intermediate_stations) + intermediate_stations=intermediate_stations, + no_tower_zones=no_tower_zones) alignment = Alignment( towers=towers, profile_fn=profile_fn, cfg=cfg, clearance_profile=clearance_profile, no_tower_zones=no_tower_zones, + surface_fn=surface_fn, + forced_flyover_zones=forced_flyover_zones, ) eval_res = evaluate_alignment(alignment) front.solutions.append( diff --git a/tests/test_nsga_phase12.py b/tests/test_nsga_phase12.py new file mode 100644 index 0000000..527ca6e --- /dev/null +++ b/tests/test_nsga_phase12.py @@ -0,0 +1,135 @@ +"""Phase 12e — NSGA-II is now 12c (surface_fn) and 12d (forced flyover) aware.""" + +from __future__ import annotations + +import numpy as np +import pytest + +from ropeway.dem import synthetic_profile +from ropeway.multi_optimizer import NSGAConfig, optimize_pareto +from ropeway.obstacles import ForcedFlyOverZone, NoTowerZone, ZoneKind +from ropeway.safety import ConstraintConfig + + +def _flat_profile(level: float = 50.0): + def fn(x): + x = np.asarray(x, dtype=float) + return np.full_like(x, level) + return fn + + +def _wall_surface(level: float, wall_h: float, d0: float, d1: float, + half_width: float = 30.0): + """Flat ground with a wall straddling the centreline; offset clears it.""" + def surf(distance, offset): + d = np.asarray(distance, dtype=float) + o = np.asarray(offset, dtype=float) + z = np.full(np.broadcast(d, o).shape, level, dtype=float) + on_wall = (d >= d0) & (d <= d1) & (np.abs(o) <= half_width) + return np.where(on_wall, level + wall_h, z) + return surf + + +# --------------------------------------------------------------------------- +# Backward compatibility — pure-vertical NSGA-II unchanged +# --------------------------------------------------------------------------- + + +def test_nsga_pure_vertical_still_produces_feasible_front(): + """Without 12c/12d kwargs the NSGA-II behaviour is unchanged.""" + profile = synthetic_profile(length_m=2000.0, seed=7) + cfg = ConstraintConfig( + horizontal_tension_n=250_000.0, cable_weight_n_per_m=50.0, + seat_spacing_m=50.0, passengers_per_seat=8, max_span_m=1200.0, + ) + nsga = NSGAConfig(max_intermediate_towers=6, population_size=40, + generations=15, seed=3) + front = optimize_pareto(profile.as_function(), profile.total_length, + cfg=cfg, nsga=nsga, verbose=False) + assert len(front.solutions) > 0 + for s in front.solutions: + assert all(t.offset == 0.0 for t in s.alignment.towers) + + +# --------------------------------------------------------------------------- +# Phase 12c — NSGA-II accepts surface_fn and uses the offset gene +# --------------------------------------------------------------------------- + + +def test_nsga_accepts_surface_fn_and_corridor_half_width(): + """With ``corridor_half_width_m > 0`` and a ``surface_fn`` provided, + NSGA-II returns feasible solutions whose intermediate towers may + have non-zero lateral offsets.""" + cfg = ConstraintConfig( + horizontal_tension_n=250_000.0, cable_weight_n_per_m=50.0, + seat_spacing_m=50.0, passengers_per_seat=8, + max_span_m=900.0, min_tower_height_m=5.0, max_tower_height_m=60.0, + corridor_half_width_m=100.0, + ) + nsga = NSGAConfig(max_intermediate_towers=4, population_size=40, + generations=20, seed=11) + surf = _wall_surface(level=50.0, wall_h=400.0, d0=350.0, d1=450.0) + front = optimize_pareto( + _flat_profile(level=50.0), 800.0, + cfg=cfg, nsga=nsga, + surface_fn=surf, + verbose=False, + ) + assert len(front.solutions) > 0 + best = front.best_by(0) + # At least one intermediate tower had to step off the centreline to + # clear the wall; the GA-side test asserts the same on the GA path. + inter_offsets = [t.offset for t in best.alignment.towers[1:-1]] + assert any(abs(o) > 1.0 for o in inter_offsets) + + +# --------------------------------------------------------------------------- +# Phase 12d — NSGA-II honours forced flyover zones +# --------------------------------------------------------------------------- + + +def test_nsga_honours_forced_flyover_zone(): + """A tall fly-over requirement forces NSGA-II to lift the cable.""" + cfg = ConstraintConfig( + horizontal_tension_n=400_000.0, cable_weight_n_per_m=50.0, + seat_spacing_m=50.0, passengers_per_seat=8, + max_span_m=500.0, min_tower_height_m=5.0, max_tower_height_m=100.0, + ) + nsga = NSGAConfig(max_intermediate_towers=4, population_size=40, + generations=25, seed=42) + fz = ForcedFlyOverZone(200.0, 400.0, min_cable_elev_m=130.0, + name="freeway deck") + front = optimize_pareto( + _flat_profile(level=50.0), 600.0, + cfg=cfg, nsga=nsga, + forced_flyover_zones=[fz], + verbose=False, + ) + assert len(front.solutions) > 0 + best = front.best_by(0) + # No "fly-over" deficit in the violations list (best is feasible). + assert not any("fly-over" in v for v in best.result.report.violations) + + +def test_nsga_honours_no_tower_zone(): + """The decoder zone-aware behaviour must apply to NSGA-II too.""" + cfg = ConstraintConfig( + horizontal_tension_n=250_000.0, cable_weight_n_per_m=50.0, + seat_spacing_m=50.0, passengers_per_seat=8, max_span_m=1200.0, + ) + nsga = NSGAConfig(max_intermediate_towers=6, population_size=40, + generations=15, seed=5) + zone = NoTowerZone(400.0, 800.0, kind=ZoneKind.WATER, name="river") + front = optimize_pareto( + _flat_profile(level=50.0), 1500.0, + cfg=cfg, nsga=nsga, + no_tower_zones=[zone], + verbose=False, + ) + assert len(front.solutions) > 0 + for sol in front.solutions: + for t in sol.alignment.towers: + if t.is_station: + continue + assert not zone.contains(t.distance), \ + f"tower at {t.distance:.0f} fell inside the forbidden zone"