Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/ropeway/multi_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,30 @@ def __init__(
clearance_profile: ClearanceProfile | None,
no_tower_zones: list | None = None,
intermediate_stations: Sequence[float] | None = None,
surface_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] | None = None,
forced_flyover_zones: list | None = None,
):
n_var = HEADER_GENES + GENES_PER_SLOT * max_slots
# Phase 12e: track the per-slot gene count the GA-side decoder uses so
# NSGA-II's pymoo Problem has the matching n_var when the Phase 12c
# corridor_half_width_m > 0 lateral-offset gene is in play.
from .optimizer import _genes_per_slot
gps = _genes_per_slot(cfg)
n_var = HEADER_GENES + gps * max_slots
h_lo, h_hi = cfg.min_tower_height_m, cfg.max_tower_height_m
xl = np.zeros(n_var)
xu = np.ones(n_var)
# First two genes are start_h, end_h (height range).
xl[0] = xl[1] = h_lo
xu[0] = xu[1] = h_hi
for i in range(max_slots):
j = HEADER_GENES + i * GENES_PER_SLOT
j = HEADER_GENES + i * gps
# x_frac in [0, 1]; height in [h_lo, h_hi]; active in [0, 1].
xl[j], xu[j] = 0.0, 1.0
xl[j + 1], xu[j + 1] = h_lo, h_hi
xl[j + 2], xu[j + 2] = 0.0, 1.0
if gps == 4:
# Phase 12e lateral-offset gene (offset_frac in [0, 1]).
xl[j + 3], xu[j + 3] = 0.0, 1.0
super().__init__(n_var=n_var, n_obj=3, n_constr=1, xl=xl, xu=xu)
self.profile_fn = profile_fn
self.corridor_length = corridor_length
Expand All @@ -96,19 +106,24 @@ def __init__(
self.clearance_profile = clearance_profile
self.no_tower_zones = no_tower_zones or []
self.intermediate_stations = list(intermediate_stations or [])
self.surface_fn = surface_fn
self.forced_flyover_zones = forced_flyover_zones or []

def _evaluate(self, X: np.ndarray, out: dict, *args, **kwargs) -> None:
F = np.zeros((X.shape[0], 3))
G = np.zeros((X.shape[0], 1))
for k, genome in enumerate(X):
towers = _decode(list(genome), self.corridor_length, self.cfg, self.max_slots,
intermediate_stations=self.intermediate_stations)
intermediate_stations=self.intermediate_stations,
no_tower_zones=self.no_tower_zones)
alignment = Alignment(
towers=towers,
profile_fn=self.profile_fn,
cfg=self.cfg,
clearance_profile=self.clearance_profile,
no_tower_zones=self.no_tower_zones,
surface_fn=self.surface_fn,
forced_flyover_zones=self.forced_flyover_zones,
)
res = evaluate_alignment(alignment)
n_towers = len(towers)
Expand All @@ -134,20 +149,26 @@ def optimize_pareto(
clearance_profile: ClearanceProfile | None = None,
no_tower_zones: list | None = None,
intermediate_stations: Sequence[float] | None = None,
surface_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] | None = None,
forced_flyover_zones: list | None = None,
verbose: bool = False,
) -> ParetoFront:
"""Run NSGA-II and return the feasible non-dominated solutions.

``no_tower_zones`` / ``intermediate_stations`` behave exactly as in
:func:`ropeway.optimizer.optimize`.
``no_tower_zones`` / ``intermediate_stations`` / ``surface_fn`` /
``forced_flyover_zones`` behave exactly as in
:func:`ropeway.optimizer.optimize` — NSGA-II is now 12c/12d-aware.
"""
cfg = cfg or ConstraintConfig()
nsga = nsga or NSGAConfig()
no_tower_zones = no_tower_zones or []
intermediate_stations = list(intermediate_stations or [])
forced_flyover_zones = forced_flyover_zones or []
problem = _RopewayProblem(
profile_fn, corridor_length, cfg, nsga.max_intermediate_towers,
clearance_profile, no_tower_zones, intermediate_stations,
surface_fn=surface_fn,
forced_flyover_zones=forced_flyover_zones,
)
algorithm = NSGA2(
pop_size=nsga.population_size,
Expand All @@ -170,13 +191,16 @@ def optimize_pareto(
if g_row[0] > 1e-6:
continue # skip any infeasibles that slipped through
towers = _decode(list(genome), corridor_length, cfg, nsga.max_intermediate_towers,
intermediate_stations=intermediate_stations)
intermediate_stations=intermediate_stations,
no_tower_zones=no_tower_zones)
alignment = Alignment(
towers=towers,
profile_fn=profile_fn,
cfg=cfg,
clearance_profile=clearance_profile,
no_tower_zones=no_tower_zones,
surface_fn=surface_fn,
forced_flyover_zones=forced_flyover_zones,
)
eval_res = evaluate_alignment(alignment)
front.solutions.append(
Expand Down
135 changes: 135 additions & 0 deletions tests/test_nsga_phase12.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Phase 12e — NSGA-II is now 12c (surface_fn) and 12d (forced flyover) aware."""

from __future__ import annotations

import numpy as np
import pytest

from ropeway.dem import synthetic_profile
from ropeway.multi_optimizer import NSGAConfig, optimize_pareto
from ropeway.obstacles import ForcedFlyOverZone, NoTowerZone, ZoneKind
from ropeway.safety import ConstraintConfig


def _flat_profile(level: float = 50.0):
def fn(x):
x = np.asarray(x, dtype=float)
return np.full_like(x, level)
return fn


def _wall_surface(level: float, wall_h: float, d0: float, d1: float,
half_width: float = 30.0):
"""Flat ground with a wall straddling the centreline; offset clears it."""
def surf(distance, offset):
d = np.asarray(distance, dtype=float)
o = np.asarray(offset, dtype=float)
z = np.full(np.broadcast(d, o).shape, level, dtype=float)
on_wall = (d >= d0) & (d <= d1) & (np.abs(o) <= half_width)
return np.where(on_wall, level + wall_h, z)
return surf


# ---------------------------------------------------------------------------
# Backward compatibility — pure-vertical NSGA-II unchanged
# ---------------------------------------------------------------------------


def test_nsga_pure_vertical_still_produces_feasible_front():
"""Without 12c/12d kwargs the NSGA-II behaviour is unchanged."""
profile = synthetic_profile(length_m=2000.0, seed=7)
cfg = ConstraintConfig(
horizontal_tension_n=250_000.0, cable_weight_n_per_m=50.0,
seat_spacing_m=50.0, passengers_per_seat=8, max_span_m=1200.0,
)
nsga = NSGAConfig(max_intermediate_towers=6, population_size=40,
generations=15, seed=3)
front = optimize_pareto(profile.as_function(), profile.total_length,
cfg=cfg, nsga=nsga, verbose=False)
assert len(front.solutions) > 0
for s in front.solutions:
assert all(t.offset == 0.0 for t in s.alignment.towers)


# ---------------------------------------------------------------------------
# Phase 12c — NSGA-II accepts surface_fn and uses the offset gene
# ---------------------------------------------------------------------------


def test_nsga_accepts_surface_fn_and_corridor_half_width():
"""With ``corridor_half_width_m > 0`` and a ``surface_fn`` provided,
NSGA-II returns feasible solutions whose intermediate towers may
have non-zero lateral offsets."""
cfg = ConstraintConfig(
horizontal_tension_n=250_000.0, cable_weight_n_per_m=50.0,
seat_spacing_m=50.0, passengers_per_seat=8,
max_span_m=900.0, min_tower_height_m=5.0, max_tower_height_m=60.0,
corridor_half_width_m=100.0,
)
nsga = NSGAConfig(max_intermediate_towers=4, population_size=40,
generations=20, seed=11)
surf = _wall_surface(level=50.0, wall_h=400.0, d0=350.0, d1=450.0)
front = optimize_pareto(
_flat_profile(level=50.0), 800.0,
cfg=cfg, nsga=nsga,
surface_fn=surf,
verbose=False,
)
assert len(front.solutions) > 0
best = front.best_by(0)
# At least one intermediate tower had to step off the centreline to
# clear the wall; the GA-side test asserts the same on the GA path.
inter_offsets = [t.offset for t in best.alignment.towers[1:-1]]
assert any(abs(o) > 1.0 for o in inter_offsets)


# ---------------------------------------------------------------------------
# Phase 12d — NSGA-II honours forced flyover zones
# ---------------------------------------------------------------------------


def test_nsga_honours_forced_flyover_zone():
"""A tall fly-over requirement forces NSGA-II to lift the cable."""
cfg = ConstraintConfig(
horizontal_tension_n=400_000.0, cable_weight_n_per_m=50.0,
seat_spacing_m=50.0, passengers_per_seat=8,
max_span_m=500.0, min_tower_height_m=5.0, max_tower_height_m=100.0,
)
nsga = NSGAConfig(max_intermediate_towers=4, population_size=40,
generations=25, seed=42)
fz = ForcedFlyOverZone(200.0, 400.0, min_cable_elev_m=130.0,
name="freeway deck")
front = optimize_pareto(
_flat_profile(level=50.0), 600.0,
cfg=cfg, nsga=nsga,
forced_flyover_zones=[fz],
verbose=False,
)
assert len(front.solutions) > 0
best = front.best_by(0)
# No "fly-over" deficit in the violations list (best is feasible).
assert not any("fly-over" in v for v in best.result.report.violations)


def test_nsga_honours_no_tower_zone():
"""The decoder zone-aware behaviour must apply to NSGA-II too."""
cfg = ConstraintConfig(
horizontal_tension_n=250_000.0, cable_weight_n_per_m=50.0,
seat_spacing_m=50.0, passengers_per_seat=8, max_span_m=1200.0,
)
nsga = NSGAConfig(max_intermediate_towers=6, population_size=40,
generations=15, seed=5)
zone = NoTowerZone(400.0, 800.0, kind=ZoneKind.WATER, name="river")
front = optimize_pareto(
_flat_profile(level=50.0), 1500.0,
cfg=cfg, nsga=nsga,
no_tower_zones=[zone],
verbose=False,
)
assert len(front.solutions) > 0
for sol in front.solutions:
for t in sol.alignment.towers:
if t.is_station:
continue
assert not zone.contains(t.distance), \
f"tower at {t.distance:.0f} fell inside the forbidden zone"
Loading