diff --git a/deepmd/dpmodel/utils/env_mat_stat.py b/deepmd/dpmodel/utils/env_mat_stat.py index 37a69ea1b1..238e395104 100644 --- a/deepmd/dpmodel/utils/env_mat_stat.py +++ b/deepmd/dpmodel/utils/env_mat_stat.py @@ -128,11 +128,10 @@ def iter( device=array_api_compat.device(data[0]["coord"]), ) for system in data: - coord, atype, box, natoms = ( + coord, atype, box = ( system["coord"], system["atype"], system["box"], - system["natoms"], ) ( extended_coord, diff --git a/deepmd/dpmodel/utils/stat.py b/deepmd/dpmodel/utils/stat.py index 1cbaad0275..34c500d7c8 100644 --- a/deepmd/dpmodel/utils/stat.py +++ b/deepmd/dpmodel/utils/stat.py @@ -14,6 +14,9 @@ from deepmd.dpmodel.common import ( to_numpy_array, ) +from deepmd.dpmodel.utils.exclude_mask import ( + AtomExcludeMask, +) from deepmd.utils.out_stat import ( compute_stats_do_not_distinguish_types, compute_stats_from_atomic, @@ -245,10 +248,8 @@ def compute_output_stats( system["find_atom_" + kk] > 0.0 ): atomic_sampled_idx[kk].append(idx) - elif (("find_" + kk) in system) and (system["find_" + kk] > 0.0): + if (("find_" + kk) in system) and (system["find_" + kk] > 0.0): global_sampled_idx[kk].append(idx) - else: - continue # use index to gather model predictions for the corresponding systems. model_pred_g = ( @@ -291,7 +292,7 @@ def compute_output_stats( ) # compute stat - bias_atom_g, std_atom_g = compute_output_stats_global( + bias_atom_g, std_atom_g = _compute_output_stats_global( sampled, ntypes, keys, @@ -302,7 +303,7 @@ def compute_output_stats( intensive, model_pred_g, ) - bias_atom_a, std_atom_a = compute_output_stats_atomic( + bias_atom_a, std_atom_a = _compute_output_stats_atomic( sampled, ntypes, keys, @@ -335,7 +336,7 @@ def compute_output_stats( return bias_atom_e, std_atom_e -def compute_output_stats_global( +def _compute_output_stats_global( sampled: list[dict], ntypes: int, keys: list[str], @@ -359,14 +360,21 @@ def compute_output_stats_global( for kk in keys } - natoms_key = "natoms" - input_natoms = { - kk: [ - to_numpy_array(sampled[idx][natoms_key]) - for idx in global_sampled_idx.get(kk, []) - ] - for kk in keys - } + data_mixed_type = "real_natoms_vec" in sampled[0] + natoms_key = "natoms" if not data_mixed_type else "real_natoms_vec" + input_natoms = {} + for kk in keys: + kk_natoms = [] + for idx in global_sampled_idx.get(kk, []): + nn = to_numpy_array(sampled[idx][natoms_key]) + if "atom_exclude_types" in sampled[idx]: + nn = nn.copy() + type_mask = AtomExcludeMask( + ntypes, sampled[idx]["atom_exclude_types"] + ).get_type_mask() + nn[:, 2:] *= type_mask.reshape(1, -1) + kk_natoms.append(nn) + input_natoms[kk] = kk_natoms # shape: (nframes, ndim) merged_output = { @@ -453,7 +461,7 @@ def rmse(x: np.ndarray) -> float: return bias_atom_e, std_atom_e -def compute_output_stats_atomic( +def _compute_output_stats_atomic( sampled: list[dict], ntypes: int, keys: list[str], diff --git a/deepmd/pd/utils/env_mat_stat.py b/deepmd/pd/utils/env_mat_stat.py index aed5259a50..53c41a9edc 100644 --- a/deepmd/pd/utils/env_mat_stat.py +++ b/deepmd/pd/utils/env_mat_stat.py @@ -107,11 +107,10 @@ def iter( "last_dim should be 1 for raial-only or 4 for full descriptor." ) for system in data: - coord, atype, box, natoms = ( + coord, atype, box = ( system["coord"], system["atype"], system["box"], - system["natoms"], ) ( extended_coord, diff --git a/deepmd/pd/utils/stat.py b/deepmd/pd/utils/stat.py index 23c6c508a1..b6d635833c 100644 --- a/deepmd/pd/utils/stat.py +++ b/deepmd/pd/utils/stat.py @@ -167,11 +167,10 @@ def _compute_model_predict( model_predict = {kk: [] for kk in keys} for system in sampled: nframes = system["coord"].shape[0] - coord, atype, box, natoms = ( + coord, atype, box = ( system["coord"], system["atype"], system["box"], - system["natoms"], ) fparam = system.get("fparam", None) aparam = system.get("aparam", None) @@ -324,12 +323,9 @@ def compute_output_stats( system["find_atom_" + kk] > 0.0 ): atomic_sampled_idx[kk].append(idx) - elif (("find_" + kk) in system) and (system["find_" + kk] > 0.0): + if (("find_" + kk) in system) and (system["find_" + kk] > 0.0): global_sampled_idx[kk].append(idx) - else: - continue - # use index to gather model predictions for the corresponding systems. model_pred_g = ( @@ -372,20 +368,22 @@ def compute_output_stats( ) # compute stat - bias_atom_g, std_atom_g = compute_output_stats_global( + bias_atom_g, std_atom_g = _compute_output_stats_global( sampled, ntypes, keys, rcond, preset_bias, - model_pred_g, + global_sampled_idx, stats_distinguish_types, intensive, + model_pred_g, ) - bias_atom_a, std_atom_a = compute_output_stats_atomic( + bias_atom_a, std_atom_a = _compute_output_stats_atomic( sampled, ntypes, keys, + atomic_sampled_idx, model_pred_a, ) @@ -416,58 +414,52 @@ def compute_output_stats( return bias_atom_e, std_atom_e -def compute_output_stats_global( +def _compute_output_stats_global( sampled: list[dict], ntypes: int, keys: list[str], rcond: float | None = None, preset_bias: dict[str, list[paddle.Tensor | None]] | None = None, - model_pred: dict[str, np.ndarray] | None = None, + global_sampled_idx: dict | None = None, stats_distinguish_types: bool = True, intensive: bool = False, + model_pred: dict[str, np.ndarray] | None = None, ) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray]]: """This function only handle stat computation from reduced global labels.""" - # return directly if model predict is empty for global - if model_pred == {}: + # return directly if no global samples + if global_sampled_idx is None or all( + len(v) == 0 for v in global_sampled_idx.values() + ): return {}, {} # get label dict from sample; for each key, only picking the system with global labels. outputs = { - kk: [ - system[kk] - for system in sampled - if kk in system and system.get(f"find_{kk}", 0) > 0 - ] + kk: [to_numpy_array(sampled[idx][kk]) for idx in global_sampled_idx.get(kk, [])] for kk in keys } data_mixed_type = "real_natoms_vec" in sampled[0] natoms_key = "natoms" if not data_mixed_type else "real_natoms_vec" - for system in sampled: - if "atom_exclude_types" in system: - type_mask = AtomExcludeMask( - ntypes, system["atom_exclude_types"] - ).get_type_mask() - system[natoms_key][:, 2:] *= type_mask.unsqueeze(0) - - input_natoms = { - kk: [ - item[natoms_key] - for item in sampled - if kk in item and item.get(f"find_{kk}", 0) > 0 - ] - for kk in keys - } + input_natoms = {} + for kk in keys: + kk_natoms = [] + for idx in global_sampled_idx.get(kk, []): + nn = to_numpy_array(sampled[idx][natoms_key]) + if "atom_exclude_types" in sampled[idx]: + nn = nn.copy() + type_mask = AtomExcludeMask( + ntypes, sampled[idx]["atom_exclude_types"] + ).get_type_mask() + nn[:, 2:] *= to_numpy_array(type_mask).reshape(1, -1) + kk_natoms.append(nn) + input_natoms[kk] = kk_natoms # shape: (nframes, ndim) merged_output = { - kk: to_numpy_array(paddle.concat(outputs[kk])) - for kk in keys - if len(outputs[kk]) > 0 + kk: np.concatenate(outputs[kk]) for kk in keys if len(outputs[kk]) > 0 } # shape: (nframes, ntypes) - merged_natoms = { - kk: to_numpy_array(paddle.concat(input_natoms[kk])[:, 2:]) + kk: np.concatenate(input_natoms[kk])[:, 2:] for kk in keys if len(input_natoms[kk]) > 0 } @@ -550,53 +542,55 @@ def rmse(x: np.ndarray) -> float: return bias_atom_e, std_atom_e -def compute_output_stats_atomic( +def _compute_output_stats_atomic( sampled: list[dict], ntypes: int, keys: list[str], + atomic_sampled_idx: dict | None = None, model_pred: dict[str, np.ndarray] | None = None, ) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray]]: + """Compute output statistics from atomic labels.""" + # return directly if no atomic samples + if atomic_sampled_idx is None or all( + len(v) == 0 for v in atomic_sampled_idx.values() + ): + return {}, {} + # get label dict from sample; for each key, only picking the system with atomic labels. outputs = { kk: [ - system["atom_" + kk] - for system in sampled - if ("atom_" + kk) in system and system.get(f"find_atom_{kk}", 0) > 0 + to_numpy_array(sampled[idx]["atom_" + kk]) + for idx in atomic_sampled_idx.get(kk, []) ] for kk in keys } natoms = { kk: [ - system["atype"] - for system in sampled - if ("atom_" + kk) in system and system.get(f"find_atom_{kk}", 0) > 0 + to_numpy_array(sampled[idx]["atype"]) + for idx in atomic_sampled_idx.get(kk, []) ] for kk in keys } # reshape outputs [nframes, nloc * ndim] --> reshape to [nframes * nloc, 1, ndim] for concatenation # reshape natoms [nframes, nloc] --> reshape to [nframes * nolc, 1] for concatenation - natoms = {k: [sys_v.reshape([-1, 1]) for sys_v in v] for k, v in natoms.items()} + natoms = {k: [sys_v.reshape(-1, 1) for sys_v in v] for k, v in natoms.items()} outputs = { k: [ - sys.reshape([natoms[k][sys_idx].shape[0], 1, -1]) + sys.reshape(natoms[k][sys_idx].shape[0], 1, -1) for sys_idx, sys in enumerate(v) ] for k, v in outputs.items() } merged_output = { - kk: to_numpy_array(paddle.concat(outputs[kk])) - for kk in keys - if len(outputs[kk]) > 0 + kk: np.concatenate(outputs[kk]) for kk in keys if len(outputs[kk]) > 0 } merged_natoms = { - kk: to_numpy_array(paddle.concat(natoms[kk])) - for kk in keys - if len(natoms[kk]) > 0 + kk: np.concatenate(natoms[kk]) for kk in keys if len(natoms[kk]) > 0 } # reshape merged data to [nf, nloc, ndim] merged_output = { - kk: merged_output[kk].reshape([*merged_natoms[kk].shape, -1]) + kk: merged_output[kk].reshape((*merged_natoms[kk].shape, -1)) for kk in merged_output } diff --git a/deepmd/pt/utils/stat.py b/deepmd/pt/utils/stat.py index cf82461a7e..b2824d0ac4 100644 --- a/deepmd/pt/utils/stat.py +++ b/deepmd/pt/utils/stat.py @@ -167,11 +167,10 @@ def _compute_model_predict( model_predict = {kk: [] for kk in keys} for system in sampled: nframes = system["coord"].shape[0] - coord, atype, box, natoms = ( + coord, atype, box = ( system["coord"], system["atype"], system["box"], - system["natoms"], ) fparam = system.get("fparam", None) aparam = system.get("aparam", None) @@ -324,12 +323,9 @@ def compute_output_stats( system["find_atom_" + kk] > 0.0 ): atomic_sampled_idx[kk].append(idx) - elif (("find_" + kk) in system) and (system["find_" + kk] > 0.0): + if (("find_" + kk) in system) and (system["find_" + kk] > 0.0): global_sampled_idx[kk].append(idx) - else: - continue - # use index to gather model predictions for the corresponding systems. model_pred_g = ( @@ -372,20 +368,22 @@ def compute_output_stats( ) # compute stat - bias_atom_g, std_atom_g = compute_output_stats_global( + bias_atom_g, std_atom_g = _compute_output_stats_global( sampled, ntypes, keys, rcond, preset_bias, - model_pred_g, + global_sampled_idx, stats_distinguish_types, intensive, + model_pred_g, ) - bias_atom_a, std_atom_a = compute_output_stats_atomic( + bias_atom_a, std_atom_a = _compute_output_stats_atomic( sampled, ntypes, keys, + atomic_sampled_idx, model_pred_a, ) @@ -416,58 +414,52 @@ def compute_output_stats( return bias_atom_e, std_atom_e -def compute_output_stats_global( +def _compute_output_stats_global( sampled: list[dict], ntypes: int, keys: list[str], rcond: float | None = None, preset_bias: dict[str, list[np.ndarray | None]] | None = None, - model_pred: dict[str, np.ndarray] | None = None, + global_sampled_idx: dict | None = None, stats_distinguish_types: bool = True, intensive: bool = False, + model_pred: dict[str, np.ndarray] | None = None, ) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray]]: """This function only handle stat computation from reduced global labels.""" - # return directly if model predict is empty for global - if model_pred == {}: + # return directly if no global samples + if global_sampled_idx is None or all( + len(v) == 0 for v in global_sampled_idx.values() + ): return {}, {} # get label dict from sample; for each key, only picking the system with global labels. outputs = { - kk: [ - system[kk] - for system in sampled - if kk in system and system.get(f"find_{kk}", 0) > 0 - ] + kk: [to_numpy_array(sampled[idx][kk]) for idx in global_sampled_idx.get(kk, [])] for kk in keys } data_mixed_type = "real_natoms_vec" in sampled[0] natoms_key = "natoms" if not data_mixed_type else "real_natoms_vec" - for system in sampled: - if "atom_exclude_types" in system: - type_mask = AtomExcludeMask( - ntypes, system["atom_exclude_types"] - ).get_type_mask() - system[natoms_key][:, 2:] *= type_mask.unsqueeze(0) - - input_natoms = { - kk: [ - item[natoms_key] - for item in sampled - if kk in item and item.get(f"find_{kk}", 0) > 0 - ] - for kk in keys - } + input_natoms = {} + for kk in keys: + kk_natoms = [] + for idx in global_sampled_idx.get(kk, []): + nn = to_numpy_array(sampled[idx][natoms_key]) + if "atom_exclude_types" in sampled[idx]: + nn = nn.copy() + type_mask = AtomExcludeMask( + ntypes, sampled[idx]["atom_exclude_types"] + ).get_type_mask() + nn[:, 2:] *= to_numpy_array(type_mask).reshape(1, -1) + kk_natoms.append(nn) + input_natoms[kk] = kk_natoms # shape: (nframes, ndim) merged_output = { - kk: to_numpy_array(torch.cat(outputs[kk])) - for kk in keys - if len(outputs[kk]) > 0 + kk: np.concatenate(outputs[kk]) for kk in keys if len(outputs[kk]) > 0 } # shape: (nframes, ntypes) - merged_natoms = { - kk: to_numpy_array(torch.cat(input_natoms[kk])[:, 2:]) + kk: np.concatenate(input_natoms[kk])[:, 2:] for kk in keys if len(input_natoms[kk]) > 0 } @@ -547,26 +539,32 @@ def rmse(x: np.ndarray) -> float: return bias_atom_e, std_atom_e -def compute_output_stats_atomic( +def _compute_output_stats_atomic( sampled: list[dict], ntypes: int, keys: list[str], + atomic_sampled_idx: dict | None = None, model_pred: dict[str, np.ndarray] | None = None, ) -> tuple[dict[str, np.ndarray], dict[str, np.ndarray]]: + """Compute output statistics from atomic labels.""" + # return directly if no atomic samples + if atomic_sampled_idx is None or all( + len(v) == 0 for v in atomic_sampled_idx.values() + ): + return {}, {} + # get label dict from sample; for each key, only picking the system with atomic labels. outputs = { kk: [ - system["atom_" + kk] - for system in sampled - if ("atom_" + kk) in system and system.get(f"find_atom_{kk}", 0) > 0 + to_numpy_array(sampled[idx]["atom_" + kk]) + for idx in atomic_sampled_idx.get(kk, []) ] for kk in keys } natoms = { kk: [ - system["atype"] - for system in sampled - if ("atom_" + kk) in system and system.get(f"find_atom_{kk}", 0) > 0 + to_numpy_array(sampled[idx]["atype"]) + for idx in atomic_sampled_idx.get(kk, []) ] for kk in keys } @@ -582,12 +580,10 @@ def compute_output_stats_atomic( } merged_output = { - kk: to_numpy_array(torch.cat(outputs[kk])) - for kk in keys - if len(outputs[kk]) > 0 + kk: np.concatenate(outputs[kk]) for kk in keys if len(outputs[kk]) > 0 } merged_natoms = { - kk: to_numpy_array(torch.cat(natoms[kk])) for kk in keys if len(natoms[kk]) > 0 + kk: np.concatenate(natoms[kk]) for kk in keys if len(natoms[kk]) > 0 } # reshape merged data to [nf, nloc, ndim] merged_output = { diff --git a/source/tests/consistent/utils/__init__.py b/source/tests/consistent/utils/__init__.py new file mode 100644 index 0000000000..6ceb116d85 --- /dev/null +++ b/source/tests/consistent/utils/__init__.py @@ -0,0 +1 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later diff --git a/source/tests/consistent/utils/test_stat.py b/source/tests/consistent/utils/test_stat.py new file mode 100644 index 0000000000..d2a7a6c1b6 --- /dev/null +++ b/source/tests/consistent/utils/test_stat.py @@ -0,0 +1,523 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +"""Cross-backend consistency tests for compute_output_stats.""" + +from collections import ( + defaultdict, +) + +import numpy as np +import pytest + +from deepmd.dpmodel.utils.stat import ( + _compute_output_stats_atomic as compute_output_stats_atomic_dp, +) +from deepmd.dpmodel.utils.stat import ( + _compute_output_stats_global as compute_output_stats_global_dp, +) +from deepmd.dpmodel.utils.stat import compute_output_stats as compute_output_stats_dp + +from ..common import ( + INSTALLED_PD, + INSTALLED_PT, +) + +if INSTALLED_PT: + import torch + + from deepmd.pt.utils.stat import ( + _compute_output_stats_atomic as compute_output_stats_atomic_pt, + ) + from deepmd.pt.utils.stat import ( + _compute_output_stats_global as compute_output_stats_global_pt, + ) + from deepmd.pt.utils.stat import compute_output_stats as compute_output_stats_pt + from deepmd.pt.utils.utils import to_numpy_array as to_numpy_array_pt +if INSTALLED_PD: + import paddle + + from deepmd.pd.utils.stat import ( + _compute_output_stats_atomic as compute_output_stats_atomic_pd, + ) + from deepmd.pd.utils.stat import ( + _compute_output_stats_global as compute_output_stats_global_pd, + ) + from deepmd.pd.utils.stat import compute_output_stats as compute_output_stats_pd + from deepmd.pd.utils.utils import to_numpy_array as to_numpy_array_pd + +NTYPES = 2 +NFRAMES = 2 +NLOC = 4 + + +def _make_data( + has_global: bool, + has_atomic: bool, + mixed_type: bool, + exclude_types: list[int], +) -> tuple[list[dict], dict, dict]: + """Build stat data with numpy arrays and precomputed indices. + + Returns + ------- + sampled : list[dict] + Data with numpy arrays. + global_sampled_idx : dict + Precomputed indices for global labels. + atomic_sampled_idx : dict + Precomputed indices for atomic labels. + """ + rng = np.random.default_rng(42) + + # atype: [nframes, nloc] + atype = np.array([[0, 0, 1, 1], [0, 1, 1, 0]], dtype=np.int64) + + # natoms: [nframes, 2+ntypes] = [nloc_total, nloc_real, count_type0, count_type1] + natoms = np.array([[4, 4, 2, 2], [4, 4, 2, 2]], dtype=np.int64) + + if mixed_type: + # For mixed type, atype may have different counts per frame, + # but natoms is padded uniformly. real_natoms_vec has actual counts. + atype = np.array([[0, 0, 1, 1], [0, 1, 1, 1]], dtype=np.int64) + real_natoms_vec = np.array([[4, 4, 2, 2], [4, 4, 1, 3]], dtype=np.int64) + + # Atomic labels: [nframes, nloc, 1] + atom_energy = rng.normal(size=(NFRAMES, NLOC, 1)) + # Global labels: sum of atom_energy per frame -> [nframes, 1] + energy = atom_energy.sum(axis=1) + + keys = ["energy"] + + # Build a single system dict (both frames in one system) + system_np: dict = { + "atype": atype, + "natoms": natoms.copy(), + } + if mixed_type: + system_np["real_natoms_vec"] = real_natoms_vec.copy() + + if has_global: + system_np["energy"] = energy + system_np["find_energy"] = np.float32(1.0) + if has_atomic: + system_np["atom_energy"] = atom_energy + system_np["find_atom_energy"] = np.float32(1.0) + if exclude_types: + system_np["atom_exclude_types"] = exclude_types + + sampled = [system_np] + + # Precompute indices (same logic used by all backends' compute_output_stats) + atomic_sampled_idx: dict = defaultdict(list) + global_sampled_idx: dict = defaultdict(list) + for kk in keys: + for idx, s in enumerate(sampled): + if ("find_atom_" + kk) in s and s["find_atom_" + kk] > 0.0: + atomic_sampled_idx[kk].append(idx) + if ("find_" + kk) in s and s["find_" + kk] > 0.0: + global_sampled_idx[kk].append(idx) + + return sampled, global_sampled_idx, atomic_sampled_idx + + +def _np_to_torch(sampled: list[dict]) -> list[dict]: + """Convert numpy sampled data to torch tensors.""" + result = [] + for d in sampled: + out = {} + for k, v in d.items(): + if isinstance(v, np.ndarray): + out[k] = torch.from_numpy(v.copy()) + elif isinstance(v, np.float32): + out[k] = v + else: + out[k] = v + result.append(out) + return result + + +def _np_to_paddle(sampled: list[dict]) -> list[dict]: + """Convert numpy sampled data to paddle tensors.""" + result = [] + for d in sampled: + out = {} + for k, v in d.items(): + if isinstance(v, np.ndarray): + out[k] = paddle.to_tensor(v.copy()) + elif isinstance(v, np.float32): + out[k] = v + else: + out[k] = v + result.append(out) + return result + + +@pytest.mark.skipif(not INSTALLED_PT, reason="PyTorch is not installed") +class TestComputeOutputStatConsistencyPT: + """Cross-backend consistency tests for compute_output_stats_global/atomic: dp vs pt.""" + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_global(self, mixed_type, exclude_types) -> None: + """compute_output_stats_global dp vs pt.""" + sampled, global_idx, _ = _make_data( + has_global=True, + has_atomic=False, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pt = _np_to_torch(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_global_dp( + sampled, NTYPES, keys, global_sampled_idx=global_idx + ) + pt_bias, pt_std = compute_output_stats_global_pt( + sampled_pt, NTYPES, keys, global_sampled_idx=global_idx + ) + + for kk in keys: + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pt_bias[kk], rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pt_std[kk], rtol=1e-10, atol=1e-10) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_atomic(self, mixed_type, exclude_types) -> None: + """compute_output_stats_atomic dp vs pt.""" + sampled, _, atomic_idx = _make_data( + has_global=False, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pt = _np_to_torch(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_atomic_dp( + sampled, NTYPES, keys, atomic_sampled_idx=atomic_idx + ) + pt_bias, pt_std = compute_output_stats_atomic_pt( + sampled_pt, NTYPES, keys, atomic_sampled_idx=atomic_idx + ) + + for kk in keys: + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pt_bias[kk], rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pt_std[kk], rtol=1e-10, atol=1e-10) + + +@pytest.mark.skipif(not INSTALLED_PT, reason="PyTorch is not installed") +class TestComputeOutputStatFullConsistencyPT: + """Cross-backend consistency tests for the top-level compute_output_stats: dp vs pt.""" + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_global_only(self, mixed_type, exclude_types) -> None: + """Global labels only through full compute_output_stats.""" + sampled, _, _ = _make_data( + has_global=True, + has_atomic=False, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pt = _np_to_torch(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_dp(sampled, NTYPES, keys) + pt_bias, pt_std = compute_output_stats_pt(sampled_pt, NTYPES, keys) + + for kk in keys: + pt_bias_np = to_numpy_array_pt(pt_bias[kk]) + pt_std_np = to_numpy_array_pt(pt_std[kk]) + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pt_bias_np, rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pt_std_np, rtol=1e-10, atol=1e-10) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_atomic_only(self, mixed_type, exclude_types) -> None: + """Atomic labels only through full compute_output_stats.""" + sampled, _, _ = _make_data( + has_global=False, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pt = _np_to_torch(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_dp(sampled, NTYPES, keys) + pt_bias, pt_std = compute_output_stats_pt(sampled_pt, NTYPES, keys) + + for kk in keys: + pt_bias_np = to_numpy_array_pt(pt_bias[kk]) + pt_std_np = to_numpy_array_pt(pt_std[kk]) + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pt_bias_np, rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pt_std_np, rtol=1e-10, atol=1e-10) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_both_global_and_atomic(self, mixed_type, exclude_types) -> None: + """Both global and atomic labels through full compute_output_stats.""" + sampled, _, _ = _make_data( + has_global=True, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pt = _np_to_torch(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_dp(sampled, NTYPES, keys) + pt_bias, pt_std = compute_output_stats_pt(sampled_pt, NTYPES, keys) + + for kk in keys: + pt_bias_np = to_numpy_array_pt(pt_bias[kk]) + pt_std_np = to_numpy_array_pt(pt_std[kk]) + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pt_bias_np, rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pt_std_np, rtol=1e-10, atol=1e-10) + + +@pytest.mark.skipif(not INSTALLED_PD, reason="Paddle is not installed") +class TestComputeOutputStatConsistencyPD: + """Cross-backend consistency tests for compute_output_stats_global/atomic: dp vs pd.""" + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_global(self, mixed_type, exclude_types) -> None: + """compute_output_stats_global dp vs pd.""" + sampled, global_idx, _ = _make_data( + has_global=True, + has_atomic=False, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pd = _np_to_paddle(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_global_dp( + sampled, NTYPES, keys, global_sampled_idx=global_idx + ) + pd_bias, pd_std = compute_output_stats_global_pd( + sampled_pd, NTYPES, keys, global_sampled_idx=global_idx + ) + + for kk in keys: + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pd_bias[kk], rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pd_std[kk], rtol=1e-10, atol=1e-10) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_atomic(self, mixed_type, exclude_types) -> None: + """compute_output_stats_atomic dp vs pd.""" + sampled, _, atomic_idx = _make_data( + has_global=False, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pd = _np_to_paddle(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_atomic_dp( + sampled, NTYPES, keys, atomic_sampled_idx=atomic_idx + ) + pd_bias, pd_std = compute_output_stats_atomic_pd( + sampled_pd, NTYPES, keys, atomic_sampled_idx=atomic_idx + ) + + for kk in keys: + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pd_bias[kk], rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pd_std[kk], rtol=1e-10, atol=1e-10) + + +@pytest.mark.skipif(not INSTALLED_PD, reason="Paddle is not installed") +class TestComputeOutputStatFullConsistencyPD: + """Cross-backend consistency tests for the top-level compute_output_stats: dp vs pd.""" + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_global_only(self, mixed_type, exclude_types) -> None: + """Global labels only through full compute_output_stats.""" + sampled, _, _ = _make_data( + has_global=True, + has_atomic=False, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pd = _np_to_paddle(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_dp(sampled, NTYPES, keys) + pd_bias, pd_std = compute_output_stats_pd(sampled_pd, NTYPES, keys) + + for kk in keys: + pd_bias_np = to_numpy_array_pd(pd_bias[kk]) + pd_std_np = to_numpy_array_pd(pd_std[kk]) + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pd_bias_np, rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pd_std_np, rtol=1e-10, atol=1e-10) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_atomic_only(self, mixed_type, exclude_types) -> None: + """Atomic labels only through full compute_output_stats.""" + sampled, _, _ = _make_data( + has_global=False, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pd = _np_to_paddle(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_dp(sampled, NTYPES, keys) + pd_bias, pd_std = compute_output_stats_pd(sampled_pd, NTYPES, keys) + + for kk in keys: + pd_bias_np = to_numpy_array_pd(pd_bias[kk]) + pd_std_np = to_numpy_array_pd(pd_std[kk]) + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pd_bias_np, rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pd_std_np, rtol=1e-10, atol=1e-10) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + @pytest.mark.parametrize("exclude_types", [[], [1]]) # atom_exclude_types + def test_both_global_and_atomic(self, mixed_type, exclude_types) -> None: + """Both global and atomic labels through full compute_output_stats.""" + sampled, _, _ = _make_data( + has_global=True, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=exclude_types, + ) + sampled_pd = _np_to_paddle(sampled) + keys = ["energy"] + + dp_bias, dp_std = compute_output_stats_dp(sampled, NTYPES, keys) + pd_bias, pd_std = compute_output_stats_pd(sampled_pd, NTYPES, keys) + + for kk in keys: + pd_bias_np = to_numpy_array_pd(pd_bias[kk]) + pd_std_np = to_numpy_array_pd(pd_std[kk]) + assert dp_bias[kk].shape[0] == NTYPES + np.testing.assert_allclose(dp_bias[kk], pd_bias_np, rtol=1e-10, atol=1e-10) + np.testing.assert_allclose(dp_std[kk], pd_std_np, rtol=1e-10, atol=1e-10) + + +@pytest.mark.skipif(not INSTALLED_PT, reason="PyTorch is not installed") +class TestComputeOutputStatNoMutationPT: + """Verify that stat functions do not mutate input sampled data (pt).""" + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + def test_global_no_mutation(self, mixed_type) -> None: + """compute_output_stats_global must not mutate input with exclude_types.""" + sampled, global_idx, _ = _make_data( + has_global=True, + has_atomic=False, + mixed_type=mixed_type, + exclude_types=[1], + ) + sampled_pt = _np_to_torch(sampled) + keys = ["energy"] + natoms_key = "real_natoms_vec" if mixed_type else "natoms" + + dp_natoms_before = sampled[0][natoms_key].copy() + pt_natoms_before = sampled_pt[0][natoms_key].clone() + + compute_output_stats_global_dp( + sampled, NTYPES, keys, global_sampled_idx=global_idx + ) + compute_output_stats_global_pt( + sampled_pt, NTYPES, keys, global_sampled_idx=global_idx + ) + + np.testing.assert_array_equal(sampled[0][natoms_key], dp_natoms_before) + np.testing.assert_array_equal( + sampled_pt[0][natoms_key].numpy(), pt_natoms_before.numpy() + ) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + def test_full_no_mutation(self, mixed_type) -> None: + """compute_output_stats must not mutate input with exclude_types.""" + sampled, _, _ = _make_data( + has_global=True, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=[1], + ) + sampled_pt = _np_to_torch(sampled) + keys = ["energy"] + natoms_key = "real_natoms_vec" if mixed_type else "natoms" + + dp_natoms_before = sampled[0][natoms_key].copy() + pt_natoms_before = sampled_pt[0][natoms_key].clone() + + compute_output_stats_dp(sampled, NTYPES, keys) + compute_output_stats_pt(sampled_pt, NTYPES, keys) + + np.testing.assert_array_equal(sampled[0][natoms_key], dp_natoms_before) + np.testing.assert_array_equal( + sampled_pt[0][natoms_key].numpy(), pt_natoms_before.numpy() + ) + + +@pytest.mark.skipif(not INSTALLED_PD, reason="Paddle is not installed") +class TestComputeOutputStatNoMutationPD: + """Verify that stat functions do not mutate input sampled data (pd).""" + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + def test_global_no_mutation(self, mixed_type) -> None: + """compute_output_stats_global must not mutate input with exclude_types.""" + sampled, global_idx, _ = _make_data( + has_global=True, + has_atomic=False, + mixed_type=mixed_type, + exclude_types=[1], + ) + sampled_pd = _np_to_paddle(sampled) + keys = ["energy"] + natoms_key = "real_natoms_vec" if mixed_type else "natoms" + + dp_natoms_before = sampled[0][natoms_key].copy() + pd_natoms_before = sampled_pd[0][natoms_key].numpy().copy() + + compute_output_stats_global_dp( + sampled, NTYPES, keys, global_sampled_idx=global_idx + ) + compute_output_stats_global_pd( + sampled_pd, NTYPES, keys, global_sampled_idx=global_idx + ) + + np.testing.assert_array_equal(sampled[0][natoms_key], dp_natoms_before) + np.testing.assert_array_equal( + sampled_pd[0][natoms_key].numpy(), pd_natoms_before + ) + + @pytest.mark.parametrize("mixed_type", [False, True]) # mixed_type + def test_full_no_mutation(self, mixed_type) -> None: + """compute_output_stats must not mutate input with exclude_types.""" + sampled, _, _ = _make_data( + has_global=True, + has_atomic=True, + mixed_type=mixed_type, + exclude_types=[1], + ) + sampled_pd = _np_to_paddle(sampled) + keys = ["energy"] + natoms_key = "real_natoms_vec" if mixed_type else "natoms" + + dp_natoms_before = sampled[0][natoms_key].copy() + pd_natoms_before = sampled_pd[0][natoms_key].numpy().copy() + + compute_output_stats_dp(sampled, NTYPES, keys) + compute_output_stats_pd(sampled_pd, NTYPES, keys) + + np.testing.assert_array_equal(sampled[0][natoms_key], dp_natoms_before) + np.testing.assert_array_equal( + sampled_pd[0][natoms_key].numpy(), pd_natoms_before + )