diff --git a/simpeg_drivers-assets/uijson/plate_simulation.ui.json b/simpeg_drivers-assets/uijson/plate_simulation.ui.json index c07ac68f..dbbf357b 100644 --- a/simpeg_drivers-assets/uijson/plate_simulation.ui.json +++ b/simpeg_drivers-assets/uijson/plate_simulation.ui.json @@ -18,6 +18,12 @@ "enabled": true, "tooltip": "Forward modelling SimPEG Group with at least the topography and survey set" }, + "use_leroi": { + "main": true, + "label": "Use LeroiAir", + "value": false, + "tooltip": "If checked, plate simulation will use LeroiAir to simulate the plate response." + }, "name": { "main": true, "label": "Label", diff --git a/simpeg_drivers/plate_simulation/driver.py b/simpeg_drivers/plate_simulation/driver.py index afbddfe4..a773aff2 100644 --- a/simpeg_drivers/plate_simulation/driver.py +++ b/simpeg_drivers/plate_simulation/driver.py @@ -11,6 +11,7 @@ from __future__ import annotations import sys +from copy import deepcopy from pathlib import Path import numpy as np @@ -21,6 +22,7 @@ from geoh5py.data import FloatData, ReferencedData from geoh5py.objects import Octree, Points, Surface from geoh5py.shared.utils import fetch_active_workspace +from geoh5py.ui_json.input_file import InputFile from simpeg_drivers.driver import ( InversionDriver, @@ -28,12 +30,14 @@ validate_workers, ) from simpeg_drivers.options import BaseForwardOptions, ModelTypeEnum +from simpeg_drivers.plate_simulation.leroi_air.driver import LeroiAirDriver +from simpeg_drivers.plate_simulation.leroi_air.options import LeroiAirOptions from simpeg_drivers.plate_simulation.models.events import Anomaly, Erosion, Overburden from simpeg_drivers.plate_simulation.models.series import DikeSwarm, Geology from simpeg_drivers.plate_simulation.options import PlateSimulationOptions from simpeg_drivers.utils.synthetics.meshes import get_octree_mesh from simpeg_drivers.utils.utils import ( - driver_class_from_name, + driver_class_from_dict, start_dask_run, validate_out_group, ) @@ -67,17 +71,17 @@ def __init__( self._survey: Points | None = None self._mesh: Octree | None = None self._model: FloatData | None = None - self._simulation_parameters: BaseForwardOptions | None = None - self._simulation_driver: InversionDriver | None = None self._client: Client | bool = validate_client(client) self._workers: list[tuple[str]] = validate_workers(self._client, workers) + self._simulation_driver: InversionDriver | None = None + self.simulation_parameters: BaseForwardOptions = self._initialize_forward_opts() def run(self) -> InversionDriver: """Create octree mesh, fill model, and simulate.""" - with fetch_active_workspace(self.params.geoh5, mode="r+"): - self.simulation_driver.run() - self.update_monitoring_directory(self._out_group) + self.simulation_driver.run() + self.simulation_parameters.update_out_group_options() + self.update_monitoring_directory(self._out_group) logger.info("done.") logger.handlers.clear() @@ -87,47 +91,17 @@ def run(self) -> InversionDriver: @property def simulation_driver(self) -> InversionDriver: if self._simulation_driver is None: - with fetch_active_workspace(self.params.geoh5, mode="r+"): - self.simulation_parameters.mesh = self.mesh - self.simulation_parameters.models.starting_model = self.model - - if not isinstance( - self.simulation_parameters.active_cells.topography_object, - Surface | Points, - ): - raise ValueError( - "The topography object of the forward simulation must be a 'Surface'." - ) - - self.simulation_parameters.out_group = None - driver_class = driver_class_from_name( - self.simulation_parameters.inversion_type, forward_only=True - ) - self._simulation_driver = driver_class( - self.simulation_parameters, - client=self._client, - workers=self._workers, - ) - self._simulation_driver.out_group.parent = self._out_group + if self.params.use_leroi: + _ = self.plates # Saves MaxwellPlate(s) when no octree/model + self._simulation_driver = self._get_leroi_driver() + else: + self._simulation_driver = self._get_simpeg_driver() return self._simulation_driver - @property - def simulation_parameters(self) -> BaseForwardOptions: - if self._simulation_parameters is None: - self._simulation_parameters = self.params.simulation_parameters() - if self._simulation_parameters.physical_property == "conductivity": - self._simulation_parameters.models.model_type = ( - ModelTypeEnum.resistivity - ) - return self._simulation_parameters - @property def survey(self): - if self._survey is None: - self._survey = self.simulation_parameters.data_object - - return self._survey + return self.simulation_parameters.data_object @property def topography(self) -> Surface | Points: @@ -156,23 +130,10 @@ def plates(self) -> list[Plate]: self.params.model.plate_options.spacing, self.params.model.plate_options.geometry.direction, ) - return self._plates - - @property - def mesh(self) -> Octree: - """Returns an octree mesh built from mesh parameters.""" - if self._mesh is None: - self._mesh = self.make_mesh() - - return self._mesh - - @property - def model(self) -> FloatData: - """Returns the model built from model parameters.""" - if self._model is None: - self._model = self.make_model() + for plate in self._plates: + plate.to_maxwell_plate(self.params.geoh5, parent=self._out_group) - return self._model + return self._plates def make_mesh(self) -> Octree: """ @@ -182,19 +143,17 @@ def make_mesh(self) -> Octree: """ logger.info("making the mesh...") - with fetch_active_workspace(self.params.geoh5, mode="r+") as geoh5: - surfaces = [p.surface(geoh5) for p in self.plates] - mesh = get_octree_mesh( - opts=self.params.mesh, - survey=self.survey, - topography=self.simulation_parameters.active_cells.topography_object, - plates=surfaces, - name=self.params.mesh.name, - ) - - mesh.parent = self._out_group + surfaces = [p.surface(self.params.geoh5) for p in self.plates] + self._mesh = get_octree_mesh( + opts=self.params.mesh, + survey=self.survey, + topography=self.simulation_parameters.active_cells.topography_object, + plates=surfaces, + name="Octree", + ) + self._mesh.parent = self._out_group - return mesh + return self._mesh def make_model(self) -> FloatData: """Create background + plate and overburden model from parameters.""" @@ -221,7 +180,7 @@ def make_model(self) -> FloatData: scenario = Geology( workspace=self.params.geoh5, - mesh=self.mesh, + mesh=self.simulation_parameters.mesh, background=self.params.model.background, history=[dikes, overburden, erosion], ) @@ -234,7 +193,7 @@ def make_model(self) -> FloatData: if physical_property == "conductivity": physical_property = "resistivity" - model = self.mesh.add_data( + model = self._mesh.add_data( { "geology": { "type": "referenced", @@ -250,7 +209,7 @@ def make_model(self) -> FloatData: for k, v in physical_property_map.items(): starting_model_values[geology == k] = v - starting_model = self.mesh.add_data( + starting_model = self.simulation_parameters.mesh.add_data( {"starting_model": {"values": starting_model_values}} ) @@ -309,6 +268,73 @@ def start_dask_run( """ start_dask_run(cls, json_path, n_workers=n_workers, n_threads=n_threads) + def _get_simpeg_driver(self): + + if not isinstance( + self.simulation_parameters.active_cells.topography_object, + Surface | Points, + ): + raise ValueError( + "The topography object of the forward simulation must be a 'Surface'." + ) + + driver_class = driver_class_from_dict(self.simulation_parameters.__dict__) + self.simulation_parameters.mesh = self.make_mesh() + self.simulation_parameters.models.starting_model = self.make_model() + self._simulation_driver = driver_class( + self.simulation_parameters, + client=self._client, + workers=self._workers, + ) + + return self._simulation_driver + + def _get_leroi_driver(self): + leroi_opts = LeroiAirOptions.from_plate_simulation_options( + self.params.model, self.simulation_parameters + ) + driver = LeroiAirDriver(leroi_opts) + return driver + + def _collect_simulation_opts(self) -> BaseForwardOptions: + """Collect template simulation options.""" + + simulation_options = deepcopy(self.params.simulation.options) + simulation_options["geoh5"] = self.params.geoh5 + + # TODO replace InputFile.data with UIJson.to_params + input_file = InputFile(ui_json=simulation_options, validate=False) + driver = driver_class_from_dict(input_file.data) + + return driver._params_class.build(input_file.data) # pylint: disable=protected-access + + def _initialize_forward_opts(self) -> BaseForwardOptions: + """Initialize the forward simulation options with mesh and model.""" + + opts = self._collect_simulation_opts() + + update = {} + models_update = {} + if opts.physical_property == "conductivity": + # TODO: validate this logic + models_update["model_type"] = ModelTypeEnum.resistivity + if not self.params.use_leroi: + update["mesh"] = None + models_update["starting_model"] = None + update["models"] = opts.models.model_copy(update=models_update) + + out_group = validate_out_group(opts) + out_group = out_group.copy( + parent=self.out_group, + copy_children=False, + copy_relatives=False, + ) + update["out_group"] = out_group + forward_opts = opts.model_copy(update=update) + forward_opts.update_out_group_options() + + return forward_opts + if __name__ == "__main__": file = Path(sys.argv[1]) diff --git a/simpeg_drivers/plate_simulation/leroi_air/__init__.py b/simpeg_drivers/plate_simulation/leroi_air/__init__.py new file mode 100644 index 00000000..1b788624 --- /dev/null +++ b/simpeg_drivers/plate_simulation/leroi_air/__init__.py @@ -0,0 +1,9 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2023-2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' diff --git a/simpeg_drivers/plate_simulation/leroi_air/driver.py b/simpeg_drivers/plate_simulation/leroi_air/driver.py new file mode 100644 index 00000000..143dd2ef --- /dev/null +++ b/simpeg_drivers/plate_simulation/leroi_air/driver.py @@ -0,0 +1,59 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2023-2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +import subprocess +from pathlib import Path + +from .interface import LeroiAirInterface +from .options import LeroiAirOptions + + +class LeroiAirDriver: + """Orchestrates a LeroiAir forward simulation from input preparation to geoh5 output.""" + + def __init__( + self, + options: LeroiAirOptions, + ) -> None: + """Initialize with simulation options.""" + self.options = options + self.interface = LeroiAirInterface(options) + + @property + def project_path(self) -> Path: + """Directory containing the geoh5 workspace file.""" + return self.options.survey.entity.workspace.h5file.parent + + def run_leroi(self) -> subprocess.CompletedProcess: + """Run the LeroiAir executable and raise on non-zero exit.""" + result = subprocess.run( + ["LeroiAir550_JR", "LeroiAir"], + cwd=self.project_path, + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + raise RuntimeError( + f"LeroiAir failed with return code {result.returncode}.\n" + f"stderr:\n{result.stderr}\n" + f"stdout:\n{result.stdout}" + ) + return result + + def run(self) -> None: + """Write input, run LeroiAir, and save simulated data to geoh5.""" + self.interface.input.write_cfl_file(self.project_path / "LeroiAir.cfl") + self.run_leroi() + self.interface.output.save_to_geoh5( + outfile=self.project_path / "LeroiAir.out", + out_group=self.options.out_group, + normalization=1e-9, + ) diff --git a/simpeg_drivers/plate_simulation/leroi_air/interface.py b/simpeg_drivers/plate_simulation/leroi_air/interface.py new file mode 100644 index 00000000..4357164f --- /dev/null +++ b/simpeg_drivers/plate_simulation/leroi_air/interface.py @@ -0,0 +1,348 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2023-2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +from __future__ import annotations + +from functools import cached_property +from pathlib import Path +from typing import Any, Literal + +import numpy as np +from geoh5py.groups import UIJsonGroup +from geoh5py.shared.utils import fetch_active_workspace + +from .options import LeroiAirOptions, SurveyOptions + + +class LeroiAirInterface: + def __init__(self, opts: LeroiAirOptions): + self.input = LeroiAirInput(opts) + self.output = LeroiAirOutput(opts.survey) + + +class LeroiAirInput: + """LeroiAir control file formatting from geoh5py objects and options.""" + + version: str = "8.0" + + def __init__(self, opts: LeroiAirOptions): + self.opts = opts + + @cached_property + def aliased_values(self) -> dict[str, Any]: + """Serves .cfl input file aliases and corresponding data to line formatter.""" + return { + "TDFD": 1 if self.opts.domain == "time" else 2, + "DO3D": 0 if self.opts.layered_earth_only else 1, + "PRFL": 1, + "ISTOP": 0, + "ISW": 1, + "NSX": len(self.opts.survey.ontime_waveform), + "STEP": int(self.opts.step), + "UNITS": 1, + "NCHNL": len(self.opts.survey.channels), + "KRXW": 2, + "REFTYM": self.opts.survey.timing_mark, + "OFFTIME": self.opts.survey.offtime, + "TXON": self.opts.survey.ontime_waveform[:, 0], + "TXAMP": self.opts.survey.ontime_waveform[:, 1], + "TMS": self.opts.survey.timing_mark + np.array(self.opts.survey.channels), + "WIDTH": self.opts.survey.channel_widths, + "TXCLN": 0.0, + "CMP": 3, + "KPPM": 0, + "NPPF": 3, + "TXAREA": 1.0, + "NTRN": 1, + "ZRX0": 0.0, + "XRX0": 0.0, + "YRX0": 0.0, + "NSTAT": self.opts.survey.n_stations, + "SURVEY": 2, + "BAROMTRC": 1, + "LINE_TAG": 0, + "EAST": self.opts.survey.entity.locations[:, 0], + "NORTH": self.opts.survey.entity.locations[:, 1], + "ALT": self.opts.survey.drape_height(self.opts.topo), + "NLAYER": self.opts.n_layers, + "NPLATE": self.opts.n_plates, + "NLITH": self.opts.n_layers + self.opts.n_plates, + "GND_LVL": 0.0, + "RES": self.opts.resistivities, + "SIG_T": self.opts.conductivity_thicknesses, + "RMU": np.ones_like(self.opts.resistivities), + "REPS": np.ones_like(self.opts.resistivities), + "CHRG": np.zeros_like(self.opts.resistivities), + "CTAU": np.zeros_like(self.opts.resistivities), + "CFREQ": np.ones_like(self.opts.resistivities), + "LITH": 1 + np.arange(self.opts.n_layers, dtype=int), + "LITHP": 1 + np.arange(self.opts.n_plates, dtype=int) + self.opts.n_layers, + "THICK": self.opts.layer_thicknesses, + "CELLW": self.opts.cell_size, + "IPLATE": 1, + "CNTR_East": [g.easting for g in self.opts.plate_geometries], + "CNTR_North": [g.northing for g in self.opts.plate_geometries], + "PLTOP": [-1 * g.elevation for g in self.opts.plate_geometries], + "PLNGTH": [g.strike_length for g in self.opts.plate_geometries], + "DPWDTH": [g.dip_length for g in self.opts.plate_geometries], + "DZM": [g.direction for g in self.opts.plate_geometries], + "DIP": [g.dip for g in self.opts.plate_geometries], + } + + def _format_value(self, value: int | float) -> str: + """ + Format a scalar value as an integer of bounded precision float. + + :param value: Value to format. + """ + match value: + case int() | np.integer(): + return str(int(value)) + case float() | np.floating(): + return self._format_float(value) + case _: + return str(value) + + def _format_float(self, value: float) -> str: + """ + Format a float, truncating to float_precision only when needed. + + :param value: Value to format as a bounded precision float. + """ + _, _, decimals = str(value).partition(".") + if len(decimals) > self.opts.float_precision: + return f"{value:.{self.opts.float_precision}f}" + return str(value) + + def _format_scalar_params(self, params: list[str]) -> str: + """ + Format one scalar value per param onto a single line. + + :param params: Parameter names to format a single line of scalar data. + """ + values = [self._format_value(self.aliased_values[k]) for k in params] + return f"{' '.join(values)} \t ! {', '.join(params)}" + + def _format_vector_param(self, param: str) -> str: + """ + Format all elements of a single vector param onto a single line. + + :param param: Parameter name to format as a single line of vector data. + """ + values = [self._format_value(v) for v in self.aliased_values[param]] + return f"{' '.join(values)} \t ! {param}" + + def format_line(self, params: str | list[str]) -> str: + """ + Format one or more param values on a single line. + + :param params: Parameter names to format as a single line. + """ + if isinstance(params, str): + return self._format_vector_param(params) + return self._format_scalar_params(params) + + def format_multi_line(self, params: list[str]) -> str: + """ + Format one or more vector param values as a row-per-entry table. + + :param params: Parameter names to format as a multi-line. + """ + columns = [self.aliased_values[k] for k in params] + rows = [ + " ".join(self._format_value(v) for v in row) + for row in zip(*columns, strict=True) + ] + return "\n".join(rows) + "\t ! " + ", ".join(params) + + @property + def record_2(self) -> str: + return self.format_line(["TDFD", "DO3D", "PRFL", "ISTOP"]) + + @property + def record_3(self) -> str: + return self.format_line( + ["ISW", "NSX", "STEP", "UNITS", "NCHNL", "KRXW", "OFFTIME"] + ) + + @property + def record_4(self) -> str: + return self.format_multi_line(["TXON", "TXAMP"]) + + @property + def record_5(self) -> str: + return self.format_line("TMS") + + @property + def record_6(self) -> str: + return self.format_line("WIDTH") + + @property + def record_7(self) -> str: + return self.format_line(["TXCLN", "CMP", "KPPM"]) + + @property + def record_7p1(self) -> str: + return self.format_line(["NPPF"]) + + @property + def record_7p2(self) -> str: + return self.format_line(["TXAREA", "NTRN"]) + + @property + def record_8(self) -> str: + return self.format_line(["ZRX0", "XRX0", "YRX0"]) + + @property + def record_9(self) -> str: + return self.format_line(["NSTAT", "SURVEY", "BAROMTRC", "LINE_TAG"]) + + @property + def record_9p1(self) -> str: + return self.format_multi_line(["EAST", "NORTH", "ALT"]) + + @property + def record_10(self) -> str: + return self.format_line(["NLAYER", "NPLATE", "NLITH", "GND_LVL"]) + + @property + def record_11(self) -> str: + return self.format_multi_line( + ["RES", "SIG_T", "RMU", "REPS", "CHRG", "CTAU", "CFREQ"] + ) + + @property + def record_12(self) -> str: + return self.format_multi_line(["LITH", "THICK"]) + + @property + def record_13(self) -> str: + return self.format_line(["CELLW"]) + + @property + def record_14(self) -> str: + return self.format_multi_line(["LITHP", "CNTR_East", "CNTR_North", "PLTOP"]) + + @property + def record_15(self) -> str: + return self.format_multi_line(["PLNGTH", "DPWDTH", "DZM", "DIP"]) + + def format_cfl_file(self) -> str: + """ + Generates lines of text for an .cfl input file to run LeroiAir. + + Collects appropriate 'Records' and adds lines to the input file one by one. + """ + lines = [] + lines.append(self.opts.title) + lines.append(self.record_2) + lines.append(self.record_3) + lines.append(self.record_4) + lines.append(self.record_5) + lines.append(self.record_6) + lines.append(self.record_7) + if self.aliased_values["KPPM"] > 0: + lines.append(self.record_7p1) + lines.append(self.record_7p2) + lines.append(self.record_8) + lines.append(self.record_9) + lines.append(self.record_9p1) + lines.append(self.record_10) + lines.append(self.record_11) + lines.append(self.record_12) + lines.append(self.record_13) + lines.append(self.record_14) + lines.append(self.record_15) + + return "\n".join(lines) + "\n" + + def write_cfl_file(self, filepath: Path) -> None: + """ + Write the formatted .cfl input file to disk. + + :param filepath: Path where the .cfl file will be written. + """ + with open(filepath, mode="w", encoding="utf-8") as f: + f.write(self.format_cfl_file()) + + +class LeroiAirOutput: + """LeroiAir output file parsing and saving to geoh5py objects/data.""" + + _COMPONENT_ANCHORS: dict[str, str] = { + "crossline": "TRANSVERSE COMPONENT", + "inline": "IN-LINE COMPONENT", + "vertical": "VERTICAL COMPONENT", + } + + def __init__(self, opts: SurveyOptions): + self.opts = opts + + def _find_data_start(self, chunk: list[str]) -> int: + """ + Return the index of the first station data row within a section chunk. + + :param chunk: Chunk of data containing a header to index lines from. + """ + header_idx = next( + i + for i, line in enumerate(chunk) + if all(k in line for k in ["EAST", "NORTH", "ALT"]) + ) + return header_idx + 2 + + def _slice_data_lines(self, lines: list[str], anchor: str) -> list[str]: + """ + Slice the station data rows that follow the given section header. + + :param lines: Lines to slice. + :param anchor: String marking the start of a chunk of output data. + """ + anchor_idx = next(i for i, line in enumerate(lines) if anchor in line) + chunk = lines[anchor_idx:] + data_start = self._find_data_start(chunk) + return chunk[data_start : data_start + self.opts.n_stations] + + def _extract_data( + self, outfile: str | Path, component: Literal["inline", "crossline", "vertical"] + ) -> np.ndarray: + """ + Extract channel data for a single component from a LeroiAir .out file. + + :param outfile: Path to the output file from a LeroiAir run. + :param component: Component to extract. + """ + lines = Path(outfile).read_text(encoding="utf-8", errors="replace").splitlines() + data_lines = self._slice_data_lines(lines, self._COMPONENT_ANCHORS[component]) + return np.array([line.split() for line in data_lines], dtype=float)[:, 4:] + + def save_to_geoh5( + self, outfile: str | Path, out_group: UIJsonGroup, normalization: float = 1 + ): + """ + Save LeroiAir simulated data on the provided survey to geoh5. + + :param outfile: Path to output file from a LeroiAir run. + :param out_group: Group where a copy of the survey will be saved + along with all the data computed by LeroiAir. + :param normalization: Normalization multiplied against the data + before saving. + """ + + survey = self.opts.entity.copy(parent=out_group, copy_children=False) + for component in "inline", "crossline", "vertical": + data = self._extract_data(outfile=outfile, component=component) + entities = survey.add_data( + { + f"fwd {component} [{i}]": {"values": data[:, i] * normalization} + for i in range(len(self.opts.channels)) + } + ) + + survey.create_property_group(name=component, properties=entities) diff --git a/simpeg_drivers/plate_simulation/leroi_air/options.py b/simpeg_drivers/plate_simulation/leroi_air/options.py new file mode 100644 index 00000000..b8486bc8 --- /dev/null +++ b/simpeg_drivers/plate_simulation/leroi_air/options.py @@ -0,0 +1,233 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2023-2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +from __future__ import annotations + +from typing import Literal, Self + +import numpy as np +from geoapps_utils.modelling.plates import PlateModel +from geoh5py.groups import SimPEGGroup +from geoh5py.objects.surveys.electromagnetics.airborne_tem import AirborneTEMReceivers +from pydantic import BaseModel, ConfigDict, model_validator +from scipy.interpolate import LinearNDInterpolator + +from simpeg_drivers.components.topography import InversionTopography +from simpeg_drivers.electromagnetics.time_domain.options import TDEMForwardOptions +from simpeg_drivers.plate_simulation.options import ModelOptions + + +TIME_UNIT_CONVERSION = { + "Seconds (s)": 1e3, + "Milliseconds (ms)": 1, + "Microseconds (us)": 1e-3, + "Nanoseconds (ns)": 1e-6, +} + + +class SurveyOptions(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + entity: AirborneTEMReceivers + + @property + def waveform(self) -> np.ndarray: + """Waveform in expected units of milliseconds.""" + waveform = self.entity.waveform.copy() + waveform[:, 0] *= TIME_UNIT_CONVERSION[self.entity.unit] + return waveform + + @property + def timing_mark(self) -> float: + """Timing mark in expected units of milliseconds.""" + timing_mark = self.entity.timing_mark + timing_mark *= TIME_UNIT_CONVERSION[self.entity.unit] + return timing_mark + + @property + def channels(self) -> np.ndarray: + """Channels in expected units of milliseconds.""" + channels = self.entity.channels.copy() + channels *= TIME_UNIT_CONVERSION[self.entity.unit] + return channels + + @property + def frequency(self) -> float: + """ + Transmitter frequency. + + Can be set by the user in the metadata. If not set, it will be assumed + that the waveform property contains a full halfcycle and the frequency + will be calculated as the reciprocal of the provided waveform time span. + """ + frequency = self.entity.metadata.get("frequency", None) + if frequency is None: + half_cycle_seconds = (self.waveform[-1, 0] - self.waveform[0, 0]) / 1e3 + frequency = 1 / (2 * half_cycle_seconds) + + return frequency + + @property + def channel_widths(self) -> np.ndarray: + """Time channel widths.""" + channel_widths = self.entity.metadata.get("channel_widths", None) + if channel_widths is None: + channel_widths = np.diff(np.r_[0, self.channels]) + else: + channel_widths *= TIME_UNIT_CONVERSION[self.entity.unit] + return channel_widths + + @property + def _ontime(self) -> float: + """Time at which the transmitter current turns off.""" + return float(self.waveform[self._offtime_mask(), 0][0]) + + @property + def offtime(self) -> float: + """ + Time at which the transmitter current is zero. + + This offtime is based on system frequency and may include times that + are not accounted for by the waveform. + """ + half_cycle = 1000 / (2 * self.frequency) + zero_current_ind = np.where(self.waveform[:, 1] == 0)[0] + first_zero = 1 if self.waveform[0, 1] == 0.0 else 0 + ontime = self.waveform[zero_current_ind][first_zero, 0] + + return half_cycle - ontime + + @property + def ontime_waveform(self) -> np.ndarray: + """On-time waveform including leading and trailing 0 current times.""" + ontime_waveform = self.waveform[~self._offtime_mask(), :] + endpoint = self.waveform[self._offtime_mask()][0, :] + return np.vstack([ontime_waveform, endpoint]) + + @property + def n_stations(self) -> int: + """Number of survey stations at which time channel data will be simulated.""" + return len(self.entity.locations) + + def drape_height(self, topo: np.ndarray) -> np.ndarray: + """ + Survey height over topography. + + :param topo: Topography array of x, y, elevation. + + :returns Array of survey height over topography. + """ + + survey_locs = self.entity.locations.copy() + topo_interp = LinearNDInterpolator(topo[:, :2], topo[:, 2]) + topo_at_survey_locations = topo_interp(survey_locs[:, 0], survey_locs[:, 1]) + + return survey_locs[:, 2] - topo_at_survey_locations + + def _offtime_mask(self) -> np.ndarray: + """Mask selecting off-time rows from the waveform array.""" + mask = np.isclose(self.waveform[:, 1], 0) + mask[0] = False + return mask + + +class LeroiAirOptions(BaseModel): + """Configuration for a LeroiAir airborne TEM forward simulation.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + title: str = "LeroiAir modelling for plate-simulation package." + out_group: SimPEGGroup + survey: SurveyOptions + topo: np.ndarray + layer_resistivities: list[float] + layer_thicknesses: list[float] + plate_resistivities: list[float] + plate_geometries: list[PlateModel] + cell_size: float = 10.0 + step: bool = True + domain: Literal["time", "frequency"] = "time" + layered_earth_only: bool = False + float_precision: int = 4 + + @model_validator(mode="after") + def validate_layer_lengths_match(self) -> Self: + """Ensure layer resistivities and thicknesses have equal length.""" + if len(self.layer_resistivities) != len(self.layer_thicknesses): + raise ValueError( + "layer_resistivities and layer_thicknesses must have the same length." + ) + return self + + @model_validator(mode="after") + def validate_plate_lengths_match(self) -> Self: + """Ensure plate resistivities and geometries have equal length.""" + if len(self.plate_resistivities) != len(self.plate_geometries): + raise ValueError( + "plate_resistivities and plate_geometries must have the same length." + ) + return self + + @classmethod + def from_plate_simulation_options( + cls, model: ModelOptions, simulation: TDEMForwardOptions + ) -> Self: + """Construct from a :class:`PlateSimulationOptions` instance.""" + + survey = simulation.data_object + + if simulation.active_cells.topography_object is None: + raise NotImplementedError( + "Passing active cells directly does not currently work for simulating " + "plates using the LeroiAir option. Simulation options must contain a " + "topography object." + ) + + topo_xyz = InversionTopography(survey.workspace, simulation).locations + + return cls( + survey=SurveyOptions(entity=survey), + topo=topo_xyz, + layer_resistivities=[ + model.overburden_options.overburden_property, + model.background, + ], + layer_thicknesses=[model.overburden_options.thickness, 9999], + plate_resistivities=[model.plate_options.plate_property], + plate_geometries=[model.plate_options.geometry], + step="dBdt" in simulation.get("data_units", "dBdt"), + out_group=simulation.out_group, + ) + + @property + def n_layers(self) -> int: + """Number of background layers.""" + return len(self.layer_resistivities) + + @property + def n_plates(self) -> int: + """Number of plates.""" + return len(self.plate_geometries) + + @property + def resistivities(self) -> np.ndarray: + """All resistivities.""" + return np.hstack([self.layer_resistivities, self.plate_resistivities]) + + @property + def conductivity_thicknesses(self) -> np.ndarray: + """All conductivity thicknesses.""" + sigma = [] + layer_conductivities = 1 / np.array(self.layer_resistivities) + sigma.append(self.layer_thicknesses * layer_conductivities) + plate_conductivities = 1 / np.array(self.plate_resistivities) + sigma.append([g.width for g in self.plate_geometries] * plate_conductivities) + + return np.hstack(sigma) diff --git a/simpeg_drivers/plate_simulation/options.py b/simpeg_drivers/plate_simulation/options.py index 0db21ce7..693e56af 100644 --- a/simpeg_drivers/plate_simulation/options.py +++ b/simpeg_drivers/plate_simulation/options.py @@ -8,18 +8,20 @@ # ' # ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' -from copy import deepcopy from pathlib import Path from typing import ClassVar from geoapps_utils.base import Options from geoh5py.groups import SimPEGGroup, UIJsonGroup from geoh5py.ui_json import InputFile +from pydantic import model_validator from simpeg_drivers import assets_path -from simpeg_drivers.options import BaseForwardOptions +from simpeg_drivers.potential_fields.gravity.options import GravityForwardOptions +from simpeg_drivers.potential_fields.magnetic_vector import ( + MagneticVectorForwardOptions, +) from simpeg_drivers.utils.synthetics.meshes import MeshOptions -from simpeg_drivers.utils.utils import driver_class_from_dict from .models.options import ModelOptions @@ -47,26 +49,13 @@ class PlateSimulationOptions(Options): mesh: MeshOptions model: ModelOptions simulation: SimPEGGroup | UIJsonGroup - - def simulation_parameters(self) -> BaseForwardOptions: - """ - Create SimPEG parameters from the simulation options. - - A new SimPEGGroup is created inside the out_group to store the - result of the forward simulation. - """ - simulation_options = deepcopy(self.simulation.options) - simulation_options["geoh5"] = self.geoh5 - - input_file = InputFile(ui_json=simulation_options, validate=False) - if input_file.ui_json is None: - raise ValueError("Input file must have ui_json set.") - - input_file.ui_json["mesh"]["value"] = None - - if input_file.data is None: - raise ValueError("Input file data must be set.") - - driver = driver_class_from_dict(input_file.data) - - return driver._params_class.build(input_file.data) # pylint: disable=protected-access + use_leroi: bool = False + + @model_validator(mode="before") + @classmethod + def use_leroi_em_only(cls, data) -> dict: + run_command = data["simulation"].options["run_command"] + is_tem = "time_domain.forward" in run_command + use_leroi = data.get("use_leroi", False) and is_tem + data["use_leroi"] = use_leroi + return data diff --git a/simpeg_drivers/utils/utils.py b/simpeg_drivers/utils/utils.py index 7b8810d5..816db37c 100644 --- a/simpeg_drivers/utils/utils.py +++ b/simpeg_drivers/utils/utils.py @@ -50,6 +50,7 @@ if TYPE_CHECKING: from simpeg_drivers.components.data import InversionData + from simpeg_drivers.driver import InversionDriver def mask_vertices_and_cells( diff --git a/tests/plate_simulation/leroi_air/__init__.py b/tests/plate_simulation/leroi_air/__init__.py new file mode 100644 index 00000000..b938fdce --- /dev/null +++ b/tests/plate_simulation/leroi_air/__init__.py @@ -0,0 +1,53 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +import numpy as np +from geoapps_utils.modelling.plates import PlateModel +from geoh5py.shared.utils import fetch_active_workspace + +from simpeg_drivers.plate_simulation.leroi_air.options import ( + LeroiAirOptions, + SurveyOptions, +) +from simpeg_drivers.utils.synthetics.surveys.time_domain.airborne_tdem import ( + generate_airborne_tdem_survey, +) + + +def generate_plate_options(workspace): + x = np.linspace(-1000, 1000, 81) + y = np.linspace(-1000, 1000, 81) + X, Y = np.meshgrid(x, y) + Z = np.full_like(X, 20.0) + + with fetch_active_workspace(workspace) as geoh5: + survey = generate_airborne_tdem_survey(geoh5, X=X, Y=Y, Z=Z) + layer_resistivities = [1500.0, 2000.0, 5000.0] + layer_thicknesses = [50.0, 1000.0, 2000.0] + plate_resistivities = [100.0] + plate_geometries = [ + PlateModel( + strike_length=200, + dip_length=100, + width=10, + direction=90, + dip=45, + ) + ] + topo = np.column_stack([X.flatten(), Y.flatten(), np.zeros(X.size)]) + opts = LeroiAirOptions( + survey=SurveyOptions(entity=survey), + topo=topo, + layer_resistivities=layer_resistivities, + layer_thicknesses=layer_thicknesses, + plate_resistivities=plate_resistivities, + plate_geometries=plate_geometries, + ) + return opts diff --git a/tests/plate_simulation/leroi_air/conftest.py b/tests/plate_simulation/leroi_air/conftest.py new file mode 100644 index 00000000..8ed2318f --- /dev/null +++ b/tests/plate_simulation/leroi_air/conftest.py @@ -0,0 +1,86 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +from unittest.mock import MagicMock + +import pytest +from geoh5py import Workspace + +from simpeg_drivers.plate_simulation.leroi_air.interface import ( + LeroiAirInput, + LeroiAirOutput, +) + +from . import generate_plate_options + + +N_CHANNELS = 3 +N_STATIONS = 2 + +FAKE_OUT = """\ + VERTICAL COMPONENT - nT/s + + TRANSMITTER POSITION CHNL 1 CHNL 2 CHNL 3 + EAST NORTH ALT 2.300 2.600 3.200 + + 1 -100 0 13 13.3 14.4 15.5 + 2 -87 0 13 16.6 17.7 18.8 +------------------------------------------------------------------------------------- + + + IN-LINE COMPONENT - nT/s + + TRANSMITTER POSITION CHNL 1 CHNL 2 CHNL 3 + EAST NORTH ALT 2.300 2.600 3.200 + + 1 -100 0 13 7.7 8.8 9.9 + 2 -87 0 13 10.0 11.1 12.2 +------------------------------------------------------------------------------------- + + + TRANSVERSE COMPONENT - nT/s + + TRANSMITTER POSITION CHNL 1 CHNL 2 CHNL 3 + EAST NORTH ALT 2.300 2.600 3.200 + + 1 -100 0 13 1.1 2.2 3.3 + 2 -87 0 13 4.4 5.5 6.6 +------------------------------------------------------------------------------------- +""" + + +@pytest.fixture +def mock_input(): + opts = MagicMock() + opts.float_precision = 4 + return LeroiAirInput(opts) + + +@pytest.fixture +def mock_output(): + opts = MagicMock() + opts.n_stations = N_STATIONS + opts.float_precision = 4 + return LeroiAirOutput(opts=opts) + + +@pytest.fixture(scope="module") +def plate_options(tmp_path_factory): + tmp_path = tmp_path_factory.mktemp("leroi_air_options") + with Workspace(tmp_path / "test.geoh5") as geoh5: + opts = generate_plate_options(workspace=geoh5) + return opts + + +@pytest.fixture +def fake_outfile(tmp_path): + path = tmp_path / "leroi.out" + path.write_text(FAKE_OUT, encoding="utf-8") + return path diff --git a/tests/plate_simulation/leroi_air/driver_test.py b/tests/plate_simulation/leroi_air/driver_test.py new file mode 100644 index 00000000..106b4b92 --- /dev/null +++ b/tests/plate_simulation/leroi_air/driver_test.py @@ -0,0 +1,44 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +import subprocess +from unittest.mock import MagicMock, patch + +import pytest +from geoh5py import Workspace + +from simpeg_drivers.plate_simulation.leroi_air.driver import LeroiAirDriver + +from . import generate_plate_options + + +# pylint: disable=protected-access + + +def test_leroi_air_driver_run_raises_on_subprocess_failure(tmp_path): + with Workspace(tmp_path / "test.geoh5") as geoh5: + opts = generate_plate_options(workspace=geoh5) + + driver = LeroiAirDriver(opts) + driver._interface = MagicMock() + failed_result = MagicMock( + spec=subprocess.CompletedProcess, returncode=1, stderr="err", stdout="out" + ) + + with patch( + "simpeg_drivers.plate_simulation.leroi_air.driver.subprocess.run", + return_value=failed_result, + ): + with pytest.raises(RuntimeError) as exc_info: + driver.run() + + assert "return code 1" in str(exc_info.value) + assert "err" in str(exc_info.value) + assert "out" in str(exc_info.value) diff --git a/tests/plate_simulation/leroi_air/interface_test.py b/tests/plate_simulation/leroi_air/interface_test.py new file mode 100644 index 00000000..124f81b5 --- /dev/null +++ b/tests/plate_simulation/leroi_air/interface_test.py @@ -0,0 +1,158 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +import numpy as np +from geoh5py import Workspace + +from simpeg_drivers.plate_simulation.leroi_air.interface import ( + LeroiAirInput, + LeroiAirOutput, +) + +from . import generate_plate_options +from .conftest import FAKE_OUT, N_CHANNELS, N_STATIONS + + +# pylint: disable=protected-access + + +def test_line_formatting(tmp_path): + with Workspace(tmp_path / "test.geoh5") as geoh5: + opts = generate_plate_options(geoh5) + + leroi_input = LeroiAirInput(opts=opts) + + line = leroi_input.format_line(["TXCLN", "CMP", "KPPM"]) + assert line == "0.0 3 0 \t ! TXCLN, CMP, KPPM" + + line = leroi_input.format_line("TMS") + assert line == "2.3 2.6 3.2 \t ! TMS" + + multi_line = leroi_input.format_multi_line(["TMS", "WIDTH"]) + assert multi_line == "2.3 0.3\n2.6 0.3\n3.2 0.6\t ! TMS, WIDTH" + + +def test_format_cfl_file(tmp_path): + with Workspace(tmp_path / "test.geoh5") as geoh5: + opts = generate_plate_options(geoh5) + + leroi_input = LeroiAirInput(opts=opts) + cfl_str = leroi_input.format_cfl_file() + + # title + assert opts.title in cfl_str + + # record_2: TDFD, DO3D, PRFL, ISTOP + assert "1 1 1 0 \t ! TDFD, DO3D, PRFL, ISTOP" in cfl_str + + # record_3: ISW, NSX, STEP, UNITS, NCHNL, KRXW, OFFTIME + assert "1 8 1 1 3 2 1.6 \t ! ISW, NSX, STEP, UNITS, NCHNL, KRXW, OFFTIME" in cfl_str + + # record_4: TXON, TXAMP (multi-line waveform table) + assert ( + "0.0 0.0\n" + "0.5 0.3333\n" + "1.0 0.6667\n" + "1.5 1.0\n" + "1.6 0.9\n" + "1.7 0.6000\n" + "1.8 0.3000\n" + "1.9 0.0\t ! TXON, TXAMP" + ) in cfl_str + + # record_5: TMS (time channel midpoints) + assert "2.3 2.6 3.2 \t ! TMS" in cfl_str + + # record_6: WIDTH (time channel widths) + assert "0.3 0.3 0.6 \t ! WIDTH" in cfl_str + + # record_7: TXCLN, CMP, KPPM + assert "0.0 3 0 \t ! TXCLN, CMP, KPPM" in cfl_str + + # record_7p1 (NPPF) is only written when KPPM > 0; default KPPM=0 so absent + assert "3 \t ! NPPF" not in cfl_str + + # record_7p2: TXAREA, NTRN + assert "1.0 1 \t ! TXAREA, NTRN" in cfl_str + + # record_8: ZRX0, XRX0, YRX0 + assert "0.0 0.0 0.0 \t ! ZRX0, XRX0, YRX0" in cfl_str + + # record_9: NSTAT, SURVEY, BAROMTRC, LINE_TAG + assert "6561 2 1 0 \t ! NSTAT, SURVEY, BAROMTRC, LINE_TAG" in cfl_str + + # record_9p1: EAST, NORTH, ALT — too long to assert explicitly + + # record_10: NLAYER, NPLATE, NLITH, GND_LVL + assert "3 1 4 0.0 \t ! NLAYER, NPLATE, NLITH, GND_LVL" in cfl_str + + # record_11: RES, SIG_T, RMU, REPS, CHRG, CTAU, CFREQ (multi-line lithology table) + assert ( + "1500.0 0.0333 1.0 1.0 0.0 0.0 1.0\n" + "2000.0 0.5 1.0 1.0 0.0 0.0 1.0\n" + "5000.0 0.4 1.0 1.0 0.0 0.0 1.0\n" + "100.0 0.1 1.0 1.0 0.0 0.0 1.0\t ! RES, SIG_T, RMU, REPS, CHRG, CTAU, CFREQ" + ) in cfl_str + + # record_12: LITH, THICK (multi-line layer table) + assert ("1 50.0\n2 1000.0\n3 2000.0\t ! LITH, THICK") in cfl_str + + # record_13: CELLW + assert "10.0 \t ! CELLW" in cfl_str + + # record_14: LITHP, CNTR_East, CNTR_North, PLTOP + assert "4 0.0 0.0 -0.0\t ! LITHP, CNTR_East, CNTR_North, PLTOP" in cfl_str + + # record_15: PLNGTH, DPWDTH, DZM, DIP + assert "200.0 100.0 90.0 45.0\t ! PLNGTH, DPWDTH, DZM, DIP" in cfl_str + + +def test_slice_data_lines_returns_correct_station_rows(mock_output): + lines = FAKE_OUT.splitlines() + anchor = LeroiAirOutput._COMPONENT_ANCHORS["crossline"] + rows = mock_output._slice_data_lines(lines, anchor) + assert len(rows) == N_STATIONS + assert rows[0].split()[4] == "1.1" + assert rows[1].split()[4] == "4.4" + + +def test_extract_data_returns_channel_columns_only(mock_output, fake_outfile): + data = mock_output._extract_data(fake_outfile, component="crossline") + assert data.shape == (N_STATIONS, N_CHANNELS) + + +def test_extract_data_parses_transverse_component_values(mock_output, fake_outfile): + data = mock_output._extract_data(fake_outfile, component="crossline") + np.testing.assert_array_almost_equal(data[0], [1.1, 2.2, 3.3]) + np.testing.assert_array_almost_equal(data[1], [4.4, 5.5, 6.6]) + + +def test_extract_data_parses_inline_component_values(mock_output, fake_outfile): + data = mock_output._extract_data(fake_outfile, component="inline") + np.testing.assert_array_almost_equal(data[0], [7.7, 8.8, 9.9]) + np.testing.assert_array_almost_equal(data[1], [10.0, 11.1, 12.2]) + + +def test_extract_data_parses_vertical_component_values(mock_output, fake_outfile): + data = mock_output._extract_data(fake_outfile, component="vertical") + np.testing.assert_array_almost_equal(data[0], [13.3, 14.4, 15.5]) + np.testing.assert_array_almost_equal(data[1], [16.6, 17.7, 18.8]) + + +def test_format_value_int(mock_input): + assert mock_input._format_value(3) == "3" + assert mock_input._format_value(np.int64(7)) == "7" + + +def test_format_value_float(mock_input): + assert mock_input._format_value(1.5) == "1.5" + assert mock_input._format_value(1.123456789) == "1.1235" + mock_input.opts.float_precision = 2 + assert mock_input._format_value(3.141592653) == "3.14" diff --git a/tests/plate_simulation/leroi_air/options_test.py b/tests/plate_simulation/leroi_air/options_test.py new file mode 100644 index 00000000..93c3e380 --- /dev/null +++ b/tests/plate_simulation/leroi_air/options_test.py @@ -0,0 +1,95 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +import numpy as np +import pytest +from geoh5py import Workspace +from pydantic import ValidationError + +from simpeg_drivers.plate_simulation.leroi_air.options import LeroiAirOptions + +from . import generate_plate_options + + +def test_options_channel_widths(plate_options): + assert np.allclose(plate_options.survey.channel_widths, [0.3, 0.3, 0.6]) + + +def test_options_channel_widths_from_metadata(tmp_path): + explicit_widths = [0.1, 0.2, 0.3] + with Workspace(tmp_path / "test.geoh5") as geoh5: + opts = generate_plate_options(workspace=geoh5) + meta = opts.survey.entity.metadata + meta["channel_widths"] = explicit_widths + opts.survey.entity.metadata = meta + + assert np.allclose(opts.survey.channel_widths, explicit_widths) + + +def test_options_frequency(plate_options): + assert np.isclose(plate_options.survey.frequency, 142.857143) + + +def test_options_frequency_from_metadata(tmp_path): + with Workspace(tmp_path / "test.geoh5") as geoh5: + opts = generate_plate_options(workspace=geoh5) + meta = opts.survey.entity.metadata + meta["frequency"] = 0.15 + opts.survey.entity.metadata = meta + + assert np.isclose(opts.survey.frequency, 0.15) + + +def test_options_offtime(plate_options): + assert np.isclose(plate_options.survey.offtime, 1.6) + + +def test_options_ontime_waveform(plate_options): + assert np.allclose( + plate_options.survey.ontime_waveform, + plate_options.survey.entity.waveform[0:8, :], + ) + + +def test_options_conductivity_thicknesses(plate_options): + assert np.allclose( + plate_options.conductivity_thicknesses, np.array([0.0333333, 0.5, 0.4, 0.1]) + ) + + +def test_options_resistivities(plate_options): + assert np.allclose(plate_options.resistivities, [1500.0, 2000.0, 5000.0, 100.0]) + + +def test_options_drape_height(plate_options): + assert np.allclose(plate_options.survey.drape_height(plate_options.topo), 20.0) + + +def test_options_validate_layer_lengths_mismatch(plate_options): + with pytest.raises(ValidationError, match="layer_resistivities"): + LeroiAirOptions( + survey=plate_options.survey, + topo=plate_options.topo, + layer_resistivities=[1500.0], + layer_thicknesses=[50.0, 1000.0], + plate_resistivities=plate_options.plate_resistivities, + plate_geometries=plate_options.plate_geometries, + ) + + +def test_options_validate_plate_lengths_mismatch(plate_options): + with pytest.raises(ValidationError, match="plate_resistivities"): + LeroiAirOptions( + survey=plate_options.survey, + topo=plate_options.topo, + layer_resistivities=plate_options.layer_resistivities, + layer_thicknesses=plate_options.layer_thicknesses, + plate_resistivities=[100.0, 200.0], + plate_geometries=plate_options.plate_geometries, + ) diff --git a/tests/plate_simulation/models/plates_test.py b/tests/plate_simulation/models/plates_test.py index 8ed4d3a1..74ed84d3 100644 --- a/tests/plate_simulation/models/plates_test.py +++ b/tests/plate_simulation/models/plates_test.py @@ -58,18 +58,9 @@ def test_vertical_east_striking_plate(tmp_path): vertical_east_striking.extent[1, 2] - vertical_east_striking.extent[0, 2], 500.0, ) - assert np.isclose( - vertical_east_striking.vertices[:, 0].mean(), - 0.0, # pylint: disable=no-member - ) - assert np.isclose( - vertical_east_striking.vertices[:, 1].mean(), - 0.0, # pylint: disable=no-member - ) - assert np.isclose( - vertical_east_striking.vertices[:, 2].mean(), - -250.0, # pylint: disable=no-member - ) + assert np.isclose(vertical_east_striking.vertices[:, 0].mean(), 0.0) + assert np.isclose(vertical_east_striking.vertices[:, 1].mean(), 0.0) + assert np.isclose(vertical_east_striking.vertices[:, 2].mean(), -250.0) def test_dipping_plates_all_quadrants(tmp_path): @@ -102,13 +93,13 @@ def test_dipping_plates_all_quadrants(tmp_path): def test_replicate_even(tmp_path): with Workspace(tmp_path / "test.geoh5") as workspace: options = PlateModel( - strike_length=2.0, - dip_length=2.0, - width=2.0, + strike_length=1.0, + dip_length=1.0, + width=1.0, direction=0.0, dip=0.0, easting=0.0, - northing=-1.0, + northing=0.0, elevation=0.0, ) plate = Plate(options) @@ -117,24 +108,24 @@ def test_replicate_even(tmp_path): assert plates[1].surface(workspace).vertices is not None assert np.allclose( plates[0].surface(workspace).vertices.mean(axis=0), - np.array([-5.0, 0.0, 0.0]), + np.array([-5.0, 0.5, 0.0]), ) assert np.allclose( plates[1].surface(workspace).vertices.mean(axis=0), - np.array([5.0, 0.0, 0.0]), + np.array([5.0, 0.5, 0.0]), ) def test_replicate_odd(tmp_path): with Workspace(tmp_path / "test.geoh5") as workspace: options = PlateModel( - strike_length=2.0, - dip_length=2.0, - width=2.0, + strike_length=1.0, + dip_length=1.0, + width=1.0, direction=0.0, dip=0.0, easting=0.0, - northing=-1.0, + northing=0.0, elevation=0.0, ) plate = Plate(options) @@ -144,13 +135,13 @@ def test_replicate_odd(tmp_path): assert plates[2].surface(workspace).vertices is not None assert np.allclose( plates[0].surface(workspace).vertices.mean(axis=0), - np.array([0.0, -5.0, 0.0]), + np.array([0.0, -4.5, 0.0]), ) assert np.allclose( plates[1].surface(workspace).vertices.mean(axis=0), - np.array([0.0, 0.0, 0.0]), + np.array([0.0, 0.5, 0.0]), ) assert np.allclose( plates[2].surface(workspace).vertices.mean(axis=0), - np.array([0.0, 5.0, 0.0]), + np.array([0.0, 5.5, 0.0]), ) diff --git a/tests/plate_simulation/runtest/conftest.py b/tests/plate_simulation/runtest/conftest.py new file mode 100644 index 00000000..dddedfa3 --- /dev/null +++ b/tests/plate_simulation/runtest/conftest.py @@ -0,0 +1,33 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +from __future__ import annotations + +import os +import sys +from pathlib import Path + +import pytest + + +@pytest.fixture(autouse=True, scope="session") +def conda_scripts_on_path(): + """Ensure the conda env Scripts directory is on PATH. + PyCharm (and other IDEs) launch pytest with the configured Python interpreter + but do not fully activate the conda environment, so the interpreter's sibling + Scripts/ directory is absent from PATH. This fixture adds it once per session + so that subprocess calls to executables installed there (e.g. LeroiAir550_JR) + resolve correctly without hard-coding any paths in the tests themselves. + """ + scripts_dir = str(Path(sys.executable).parent / "Scripts") + original_path = os.environ.get("PATH", "") + if scripts_dir not in original_path.split(os.pathsep): + os.environ["PATH"] = scripts_dir + os.pathsep + original_path + yield + os.environ["PATH"] = original_path diff --git a/tests/plate_simulation/runtest/driver_test.py b/tests/plate_simulation/runtest/driver_test.py index 297237c4..026c1746 100644 --- a/tests/plate_simulation/runtest/driver_test.py +++ b/tests/plate_simulation/runtest/driver_test.py @@ -94,17 +94,6 @@ def test_plate_simulation_params_from_input_file(tmp_path, caplog): assert "Overburden thickness exceeds the plate depth" in caplog.text assert isinstance(params.simulation, SimPEGGroup) - simulation_parameters = params.simulation_parameters() - - assert simulation_parameters.inversion_type == "gravity" - assert simulation_parameters.forward_only - assert simulation_parameters.geoh5.h5file == geoh5.h5file - assert ( - simulation_parameters.active_cells.topography_object.uid - == components.topography.uid - ) - assert simulation_parameters.data_object.uid == components.survey.uid - assert isinstance(params.mesh, MeshOptions) assert params.mesh.u_cell_size == 10.0 assert params.mesh.v_cell_size == 10.0 diff --git a/tests/plate_simulation/runtest/gravity_test.py b/tests/plate_simulation/runtest/gravity_test.py index 4f7d618d..25401f6c 100644 --- a/tests/plate_simulation/runtest/gravity_test.py +++ b/tests/plate_simulation/runtest/gravity_test.py @@ -12,7 +12,6 @@ from geoapps_utils.modelling.plates import PlateModel from geoh5py.groups import SimPEGGroup -from simpeg_drivers.plate_simulation.driver import PlateSimulationDriver from simpeg_drivers.plate_simulation.models.options import ( ModelOptions, OverburdenOptions, @@ -31,71 +30,71 @@ SurveyOptions, SyntheticsComponentsOptions, ) +from tests.utils.runtests import run_driver_from_ui_json from tests.utils.targets import get_workspace def test_gravity_plate_simulation(tmp_path): - opts = SyntheticsComponentsOptions( - method="gravity", - survey=SurveyOptions(n_stations=8, n_lines=8, drape=5.0), - mesh=SyntheticsMeshOptions(), - model=SyntheticsModelOptions(anomaly=0.0), - ) - with get_workspace(tmp_path / "inversion_test.ui.geoh5") as geoh5: - components = SyntheticsComponents(geoh5, options=opts) - mesh_params = MeshOptions( - u_cell_size=10.0, - v_cell_size=10.0, - w_cell_size=10.0, - padding_distance=1500.0, - depth_core=600.0, - max_distance=200.0, - survey_refinement=[4, 6], - topography_refinement=[0, 1], - plate_refinement=[4, 2], - ) - overburden_params = OverburdenOptions(thickness=50.0, overburden_property=0.2) - - plate_params = PlateOptions( - name="plate", - geometry=PlateModel( - elevation=100.0, - width=100.0, - strike_length=100.0, - dip_length=100.0, - dip=0.0, - direction=0.0, - ), - plate_property=0.5, + with get_workspace(tmp_path / "inversion_test.ui.geoh5") as geoh5: + geometry = PlateModel( + elevation=100.0, + width=100.0, + strike_length=100.0, + dip_length=100.0, + dip=0.0, + direction=0.0, ) - model_params = ModelOptions( - name="density", - background=0.0, - overburden_options=overburden_params, - plate_options=plate_params, + opts = SyntheticsComponentsOptions( + method="gravity", + survey=SurveyOptions(n_stations=8, n_lines=8, drape=5.0), + mesh=SyntheticsMeshOptions(), + model=SyntheticsModelOptions(anomaly=0.0), ) + components = SyntheticsComponents(geoh5, options=opts) + out_group = SimPEGGroup.create(geoh5, name="Gravity forward") options = GravityForwardOptions.build( topography_object=components.topography, data_object=components.survey, geoh5=geoh5, starting_model=0.1, + out_group=out_group, ) - - gravity_inversion = SimPEGGroup.create(geoh5) - gravity_inversion.options = options.serialize() + options.update_out_group_options() params = PlateSimulationOptions( - title="test", - run_command="run", geoh5=geoh5, - mesh=mesh_params, - model=model_params, - simulation=gravity_inversion, + mesh=MeshOptions( + u_cell_size=10.0, + v_cell_size=10.0, + w_cell_size=10.0, + padding_distance=1500.0, + depth_core=600.0, + max_distance=200.0, + survey_refinement=[4, 6], + topography_refinement=[0, 1], + plate_refinement=[4, 2], + name="mesh", + ), + model=ModelOptions( + name="density", + background=0.0, + overburden_options=OverburdenOptions( + thickness=50.0, overburden_property=0.2 + ), + plate_options=PlateOptions( + name="plate", + geometry=geometry, + plate_property=0.5, + ), + ), + simulation=options.out_group, ) - driver = PlateSimulationDriver(params) - driver.run() - assert np.nanmax(driver.model.values) == 0.5 + driver = run_driver_from_ui_json(params) + + assert ( + np.nanmax(driver.simulation_parameters.models.starting_model.values) == 0.5 + ) diff --git a/tests/plate_simulation/runtest/leroi_test.py b/tests/plate_simulation/runtest/leroi_test.py new file mode 100644 index 00000000..3da33648 --- /dev/null +++ b/tests/plate_simulation/runtest/leroi_test.py @@ -0,0 +1,139 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +import os + +import pytest +from geoapps_utils.modelling.plates import PlateModel +from geoh5py import Workspace +from geoh5py.groups import SimPEGGroup +from geoh5py.objects import AirborneTEMReceivers, MaxwellPlate, Octree + +from simpeg_drivers.electromagnetics.time_domain import ( + TDEMForwardOptions, +) +from simpeg_drivers.plate_simulation.models.options import ( + ModelOptions, + OverburdenOptions, + PlateOptions, +) +from simpeg_drivers.plate_simulation.options import MeshOptions, PlateSimulationOptions +from simpeg_drivers.utils.synthetics.driver import SyntheticsComponents +from simpeg_drivers.utils.synthetics.options import ( + MeshOptions as SyntheticsMeshOptions, +) +from simpeg_drivers.utils.synthetics.options import ( + ModelOptions as SyntheticsModelOptions, +) +from simpeg_drivers.utils.synthetics.options import ( + SurveyOptions, + SyntheticsComponentsOptions, +) +from tests.utils.runtests import run_driver_from_ui_json +from tests.utils.targets import get_workspace + + +IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" + + +@pytest.mark.skipif( + IN_GITHUB_ACTIONS, + reason="Test requires a LeroiAir550_JR.exe in the environments Scripts directory that is not stored in GitHub.", +) +def test_leroi_run(tmp_path): + + with get_workspace(tmp_path / "leroi_test.geoh5") as geoh5: + geometry = PlateModel( + easting=0.0, + northing=0.0, + elevation=100.0, + width=10.0, + strike_length=200.0, + dip_length=100.0, + dip=80.0, + direction=90.0, + ) + + opts = SyntheticsComponentsOptions( + method="airborne tdem", + survey=SurveyOptions(width=800.0, n_stations=32, n_lines=4, drape=40.0), + mesh=SyntheticsMeshOptions(), + model=SyntheticsModelOptions( + anomaly=1 / 10, + background=1 / 5000, + plate=geometry, + ), + ) + components = SyntheticsComponents(geoh5, options=opts) + out_group = SimPEGGroup.create(geoh5, name="TEM forward") + options = TDEMForwardOptions.build( + topography_object=components.topography, + data_object=components.survey, + geoh5=geoh5, + starting_model=components.model, + z_channel_bool=True, + out_group=out_group, + ) + options.update_out_group_options() + + params = PlateSimulationOptions( + geoh5=geoh5, + mesh=MeshOptions( + u_cell_size=25.0, + v_cell_size=25.0, + w_cell_size=25.0, + padding_distance=1000.0, + depth_core=600.0, + max_distance=200.0, + survey_refinement=[2, 4], + topography_refinement=[0, 1], + plate_refinement=[2, 2], + ), + model=ModelOptions( + background=2000.0, + overburden_options=OverburdenOptions( + thickness=50.0, + overburden_property=1500.0, + ), + plate_options=PlateOptions( + name="plate", + geometry=geometry, + plate_property=1.0, + ), + ), + simulation=options.out_group, + use_leroi=True, + ) + + run_driver_from_ui_json(params) + + with Workspace(tmp_path / "leroi_test.geoh5") as geoh5: + plate_simulation_group = geoh5.get_entity("Plate Simulation")[0] + forward_group = plate_simulation_group.get_entity("TEM forward")[0] + + survey = forward_group.get_entity("survey")[0] + assert isinstance(survey, AirborneTEMReceivers) + + maxwell_plate = plate_simulation_group.get_entity("Maxwell Plate")[0] + assert isinstance(maxwell_plate, MaxwellPlate) + + expected_channels = [ + "fwd inline [0]", + "fwd inline [1]", + "fwd inline [2]", + "fwd crossline [0]", + "fwd crossline [1]", + "fwd crossline [2]", + "fwd vertical [0]", + "fwd vertical [1]", + "fwd vertical [2]", + ] + for channel in expected_channels: + assert survey.get_entity(channel)[0] is not None diff --git a/tests/utils/runtests.py b/tests/utils/runtests.py new file mode 100644 index 00000000..cab10791 --- /dev/null +++ b/tests/utils/runtests.py @@ -0,0 +1,24 @@ +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' +# Copyright (c) 2026 Mira Geoscience Ltd. ' +# ' +# This file is part of simpeg-drivers package. ' +# ' +# simpeg-drivers is distributed under the terms and conditions of the MIT License ' +# (see LICENSE file at the root of this source code package). ' +# ' +# ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' + +from geoapps_utils.base import Driver, Options +from geoapps_utils.run import run_from_uijson + + +def run_driver_from_ui_json(params: Options, name: str = "runtest.ui.json") -> Driver: + """ + Serialize params to a ui.json file and run. + + :param params: Options instance to serialize. + :param name: Name of the ui.json file to write. + """ + path = params.geoh5.h5file.parent / name + params.write_ui_json(path) + return run_from_uijson(path)