diff --git a/bluemath_tk/distributions/extreme_correction.py b/bluemath_tk/distributions/extreme_correction.py index e20cf2e..87f9006 100644 --- a/bluemath_tk/distributions/extreme_correction.py +++ b/bluemath_tk/distributions/extreme_correction.py @@ -4,6 +4,7 @@ import numpy as np import scipy.stats as stats import xarray as xr +from statsmodels.distributions.empirical_distribution import ECDF from ..core.io import BlueMathModel from ..distributions.gev import GEV @@ -24,18 +25,18 @@ class ExtremeCorrection(BlueMathModel): def __init__( self, corr_config: dict, - pot_config: dict, + pot_config: dict = {}, method: str = "pot", conf_level: float = 0.95, debug: bool = False, ): """ - Extreme value correction for sampled datasets using + upper-tail correction technique for synthetic datasets using Generalized Extreme Value (GEV) or Peaks Over Threshold (POT) approaches. This class applies upper-tail corrections to sampled datasets by fitting extreme value distributions to historical observations - and adjusting the sampled extremes accordingly. See V. Collado (2025) [1]. + and adjusting the sampled extremes accordingly. See Collado et al. (2026) [1]. Parameters ---------- @@ -78,6 +79,13 @@ def __init__( - "pot" : Peaks Over Threshold using GPD distribution. conf_level : float, default=0.95 Confidence level for return period confidence intervals. + + References + ---------- + [1] Collado, V., Méndez, F.J. & Mínguez, R. Upper-tail correction of + multivariate synthetic environmental series using annual maxima. + Stoch Environ Res Risk Assess 40, 92 (2026). + https://doi.org/10.1007/s00477-026-03215-0 """ super().__init__() @@ -101,6 +109,7 @@ def __init__( # If GEV (loc, scale, shape) # If GPD (threshold, scale, shape) self.parameters = np.empty(3) + self.threshold = 1e10 # Fix threshold in case "am" method is used # Confidence level self.conf = conf_level @@ -238,6 +247,7 @@ def fit( def transform( self, data_sim: xr.Dataset, + quantile: float = 0.9, prob: str = "unif", random_state: int = 0, siglevel: float = 0.05, @@ -249,6 +259,10 @@ def transform( ---------- data_sim : xr.Dataset Dataset with synthetic data + quantile : float, default=0.9 + Quantile to apply the correction. Only values above this quantile will be + corrected. + By default, the correction is applied to the upper 10% of the data prob : str, default="unif" Type of probabilities consider to random correct the AM If "unif", a sorted random uniform is considered @@ -274,51 +288,60 @@ def transform( self.sim_am_data_sorted = np.sort(self.sim_am_data) self.n_year_sim = self.sim_am_data.shape[0] - # Avoid correct when AM is 0 - self.am_idx_0 = 0 - for idx, value in enumerate(np.sort(self.am_data)): - if value == 0: - self.am_index_0 += 1 - else: - break + # Avoid correct when AM is 0 or lower than the threshold or the quantile given + self.am_idx_0 = np.argmin( + (self.sim_am_data_sorted == 0) + | ( + ECDF(self.sim_pit_data)(self.sim_am_data_sorted) + < ECDF(self.pit_data)(self.threshold) + ) + ) # Test if the correction has to be applied test_result = self.test() self.p_value = test_result.get("P-value") if self.p_value > siglevel: self.logger.info( - f"Synthetic data comes from fitted distribution (P-value: {self.p_value:.4%})" + f"Synthetic data comes from fitted distribution (P-value: {self.p_value:.6f})" ) self.sim_am_data_corr = self.sim_am_data self.sim_pit_data_corrected = self.sim_pit_data - return + else: - self.sim_am_data_corr = np.zeros(self.n_year_sim) + self.logger.info( + f"Synthetic data does not come from fitted distribution (P-value: {self.p_value:.6f})" + ) + # Initialize sim_am_data_corr + self.sim_am_data_corr = np.zeros(self.n_year_sim - self.am_idx_0) - # Define probs + # Define probs using uniform or ECDF if prob == "unif": self.rprob_sim = np.sort( - np.random.uniform(low=0, high=1, size=self.n_year_sim) + np.random.uniform( + low=0, + high=1, + size=self.n_year_sim - self.am_idx_0, + ) ) else: - self.rprob_sim = np.arange(1, self.n_year_sim + 1) / ( - self.n_year_sim + 1 + self.rprob_sim = np.arange(1, self.n_year_sim + 1 - self.am_idx_0) / ( + self.n_year_sim + 1 - self.am_idx_0 ) # ECDF # Apply correction on AM if self.method == "pot": # TODO: Añadir funciones de POT - self.sim_am_data_corr[self.am_idx_0 :] = GPDPoiss.qf( - self.rprob_sim[self.am_idx_0 :], + self.sim_am_data_corr = GPDPoiss.qf( + self.rprob_sim, threshold=self.parameters[0], scale=self.parameters[1], shape=self.parameters[2], poisson=self.poiss_parameter, ) elif self.method == "am": - self.sim_am_data_corr[self.am_idx_0 :] = GEV.qf( - self.rprob_sim[self.am_idx_0 :], + self.sim_am_data_corr = GEV.qf( + self.rprob_sim, loc=self.parameters[0], scale=self.parameters[1], shape=self.parameters[2], @@ -326,16 +349,23 @@ def transform( # Apply correction in pit data if self.n_year_sim > 1: + q_threshold = np.quantile(self.sim_pit_data, quantile) + self.sim_pit_data_corrected = np.interp( self.sim_pit_data, # x-coords to interpolate np.append( - min(self.sim_pit_data), self.sim_am_data_sorted + [min(self.sim_pit_data), q_threshold], + self.sim_am_data_sorted[self.am_idx_0 :], ), # x-coords of data points np.append( - min(self.sim_pit_data), self.sim_am_data_corr + [min(self.sim_pit_data), q_threshold], self.sim_am_data_corr ), # y-coords of data points ) + self.sim_am_data_corr_aux = self.sim_am_data.copy() + self.sim_am_data_corr_aux[self.am_idx_0 :] = self.sim_am_data_corr + self.sim_am_data_corr = self.sim_am_data_corr_aux + output = self._preprocess_output(data=data_sim) return output @@ -344,6 +374,7 @@ def fit_transform( self, data_hist: xr.Dataset, data_sim: xr.Dataset, + quantile: float = 0.9, bmus: list[bool, str] = [False, ""], prob: str = "unif", plot_diagnostic: bool = False, @@ -361,7 +392,8 @@ def fit_transform( data_sim : xr.Dataset Dataset with synthetic data bmus : list[bool, str], default=[False, ""] - Whether to apply the correction by BMUS, if given the name of bmus variable should be given + Whether to apply the correction by BMUS, if given the name of bmus variable + should be given prob : str, default="unif" Type of probabilities consider to random correct the AM If "unif", a sorted random uniform is considered @@ -378,7 +410,9 @@ def fit_transform( """ self.fit(data_hist=data_hist, plot_diagnostic=plot_diagnostic) - return self.transform(data_sim=data_sim, prob=prob, random_state=random_state) + return self.transform( + data_sim=data_sim, quantile=quantile, prob=prob, random_state=random_state + ) def _preprocess_data( self, @@ -396,9 +430,11 @@ def _preprocess_data( data : xr.Dataset Data to apply correction var : list[str] - List of variables to apply the correction technique. FUTURE WORK: INCLUDE MORE THAN ONE + List of variables to apply the correction technique. + TODO: FUTURE WORK: INCLUDE MORE THAN ONE bmus : list[bool, str], default=[False, ""] - List to decide if the correction must be applied by WT and if so name of the variable + List to decide if the correction must be applied by WT and if so name of the + variable join_sims : bool, default=True Whether to joint all the simulations in one array @@ -417,20 +453,14 @@ def _preprocess_data( if join_sims and sim: n_sims = data.get("n_sim").values - pit_data = np.array([]) - am_data = np.array([]) - for sim in n_sims: - pit_data = np.append(pit_data, data.get(f"{var}").sel(n_sim=sim).values) - am_data = np.append( - am_data, - data.get(f"{var}").sel(n_sim=sim).groupby("time.year").max().values, - ) + pit_data = data.get(f"{var}").values.flatten() + am_data = data.get(f"{var}").groupby("time.year").max().values.flatten() else: - pit_data = data.get(f"{var}").values.T - am_data = data.get(f"{var}").groupby("time.year").max().values.T + pit_data = data.get(f"{var}").values.flatten() + am_data = data.get(f"{var}").groupby("time.year").max().values.flatten() return pit_data, am_data - + def _preprocess_output(self, data: xr.Dataset) -> xr.Dataset: """ Preprocess the output dataset @@ -447,18 +477,23 @@ def _preprocess_output(self, data: xr.Dataset) -> xr.Dataset: """ n_sim = data.get("n_sim").values.shape[0] n_time = data.get("time").values.shape[0] - sim_pit_data_corrected_reshaped = self.sim_pit_data_corrected.reshape(n_sim, n_time) + sim_pit_data_corrected_reshaped = self.sim_pit_data_corrected.reshape( + n_sim, n_time + ) - data[f"{self.var}_corr"] = (data[f"{self.var}"].dims, sim_pit_data_corrected_reshaped) + data[f"{self.var}_corr"] = ( + data[f"{self.var}"].dims, + sim_pit_data_corrected_reshaped, + ) return data - def test(self) -> dict: """ - Cramer Von-Mises test to check the GOF of fitted distribution + Bootstrap Cramer Von-Mises test to check the GOF of fitted distribution - Test to check the Goodness-of-Fit of the historical fitted distribution with the synthetic data. + Test to check the Goodness-of-Fit of the historical fitted distribution with the + synthetic data. Null Hypothesis: sampled AM comes from the fitted extreme distribution. Returns @@ -470,39 +505,76 @@ def test(self) -> dict: ----- The test is applied in the AM since the correction procedure is applied in the AM """ + Bootstrap_test = 500 + # Historical parameters estimated + u, sigma, xi = self.parameters + mu = u + sigma/xi * (self.poiss_parameter ** xi - 1) + psi = sigma * self.poiss_parameter ** xi + + CvM_statistic = ( + 1/(12*self.n_year_sim) + + np.sum( + (np.sort( + GEV.cdf(self.sim_am_data, loc=mu, scale=psi, shape=xi) + ) - + (2 * np.arange(1, self.n_year_sim +1) - 1) / + (2 * self.n_year_sim))**2 + ) + ) - if self.method == "pot": - gev_location = ( - self.parameters[0] - + ( - self.parameters[1] - * (1 - self.poiss_parameter ** self.parameters[2]) - ) - / self.parameters[2] + # Bootstrap calibration + boot_statistic = np.zeros(Bootstrap_test) + + for b in range(Bootstrap_test): + # 1. Simulate AM of size n_hist from GEV with historical parameters + uniform_random_hist = np.random.uniform(size=self.n_year) + simulated_am_boot_hist = GEV.qf( + uniform_random_hist, + loc=mu, + scale=psi, + shape=xi + ) + + # 2. Fit GEV to simulated AM + [mu_bootfit, psi_bootfit, xi_bootfit] = GEV.fit( + simulated_am_boot_hist + ).params + + # 3. Simulate AM of size n_sim from GEV with historical parameters + uniform_random_sim = np.random.uniform(size=self.n_year_sim) + simulated_am_boot_sim = GEV.qf( + uniform_random_sim, + loc=mu_bootfit, + scale=psi_bootfit, + shape=xi_bootfit ) - gev_scale = self.parameters[1] * self.poiss_parameter ** self.parameters[2] - - # POT test - # res_test = stats.cramervonmises(self.sim_pot_data, - # cdf=stats.genpareto.cdf, - # args=(self.parameters[2], self.parameters[0], self.parameters[1]) - # ) - - # AM test to derived GEV from GPD-Poisson - res_test = stats.cramervonmises( - self.sim_am_data, - cdf=stats.genextreme.cdf, - args=(self.parameters[2], gev_location, gev_scale), + + # 4. Compute bootstrap synthetic AM values using the bootfit + u_bootfit = GEV.cdf( + simulated_am_boot_sim, + loc=mu_bootfit, + scale=psi_bootfit, + shape=xi_bootfit, ) - return {"Statistic": res_test.statistic, "P-value": res_test.pvalue} - elif self.method == "am": - res_test = stats.cramervonmises( - self.sim_am_data, - cdf=stats.genextreme.cdf, - args=(self.parameters[2], self.parameters[0], self.parameters[1]), + # 5. Compute CvM test for the bootstrap sample + boot_statistic[b] = ( + 1/(12*self.n_year_sim) + + np.sum( + (np.sort( + u_bootfit + ) - + (2*np.arange(1, self.n_year_sim + 1) - 1) / + (2*self.n_year_sim))**2 + ) ) - return {"Statistic": res_test.statistic, "P-value": res_test.pvalue} + + bootstrap_p_value = ( + (1 + np.sum(boot_statistic >= CvM_statistic)) / + (1 + Bootstrap_test) + ) + + return {"Statistic": CvM_statistic, "P-value": bootstrap_p_value} def plot(self) -> tuple[list[plt.Figure], list[plt.Axes]]: """ @@ -520,6 +592,10 @@ def plot(self) -> tuple[list[plt.Figure], list[plt.Axes]]: figs.append(fig2) axes.append(ax2) + fig3, ax3 = self.ecdf_plot() + figs.append(fig3) + axes.append(ax3) + return figs, axes def hist_retper_plot(self) -> tuple[plt.Figure, plt.Axes]: @@ -744,6 +820,47 @@ def sim_retper_plot(self) -> tuple[plt.Figure, plt.Axes]: return fig, ax + def ecdf_plot(self) -> tuple[plt.Figure, plt.Axes]: + """ + Empirical Cumulative Distribution Function plot for historical and + synthetic before and after the correction. + + Returns + ------- + fig + plt.Figure + ax + plt.Axes + """ + fig, ax = plt.subplots(1, 1, figsize=(7, 7)) + # Historical ECDF + ax.ecdf( + self.pit_data, label="Historical", alpha=0.9, linewidth=2.5, color="gray" + ) + # Synthetic ECDF + ax.ecdf( + self.sim_pit_data, label="Before", alpha=0.9, linewidth=2.5, color="black" + ) + ax.ecdf( + self.sim_pit_data_corrected, + label="After", + alpha=0.9, + linewidth=2.5, + color="black", + linestyle="dashed", + ) + + # ax.set_xlim(self.threshold, 12) + ax.set_ylim(0.0, 1.0) + ax.set_xlabel(f"{self.var}") + ax.set_ylabel("Probability") + ax.grid() + ax.legend(loc="lower right") + ax.tick_params(axis="both", which="major") + ax.tick_params(axis="both", which="minor") + + return fig, ax + def correlations(self) -> dict: """ Rank based correlations between sampled and corrected sampled data diff --git a/bluemath_tk/distributions/gev.py b/bluemath_tk/distributions/gev.py index cceaa7b..bde79cd 100644 --- a/bluemath_tk/distributions/gev.py +++ b/bluemath_tk/distributions/gev.py @@ -73,9 +73,9 @@ def name() -> str: @staticmethod def nparams() -> int: """ - Number of parameters of GEV + Return Number of parameters of GEV """ - return int(3) + return 3 @staticmethod def param_names() -> List[str]: @@ -139,7 +139,7 @@ def cdf( Shape parameter. Returns - ---------- + ------- p : np.ndarray Probability @@ -279,7 +279,7 @@ def fit(data: np.ndarray, **kwargs) -> FitResult: If not provided, default fitting options will be used. Returns - ---------- + ------- FitResult Result of the fit containing the parameters loc, scale, shape, success status, and negative log-likelihood value. @@ -296,7 +296,7 @@ def random( random_state: int = None, ) -> np.ndarray: """ - Generates random values from GEV distribution + Generate random values from GEV distribution Parameters ---------- @@ -314,7 +314,7 @@ def random( If None, do not use random stat. Returns - ---------- + ------- x : np.ndarray Random values from GEV distribution @@ -427,7 +427,7 @@ def variance(loc: float = 0.0, scale: float = 1.0, shape: float = 0.0) -> float: @staticmethod def std(loc: float = 0.0, scale: float = 1.0, shape: float = 0.0) -> float: """ - Standard deviation + Compute Standard deviation Parameters ---------- @@ -440,7 +440,7 @@ def std(loc: float = 0.0, scale: float = 1.0, shape: float = 0.0) -> float: Shape parameter. Returns - ---------- + ------- std : np.ndarray Standard Deviation of GEV with the given parameters diff --git a/tests/distributions/test_extr_corr.py b/tests/distributions/test_extr_corr.py index aa6962a..c13b094 100644 --- a/tests/distributions/test_extr_corr.py +++ b/tests/distributions/test_extr_corr.py @@ -279,8 +279,8 @@ def test_plot_methods(self): # Test plot method figs, axes = ec.plot() - self.assertEqual(len(figs), 2) - self.assertEqual(len(axes), 2) + self.assertEqual(len(figs), 3) + self.assertEqual(len(axes), 3) def test_preprocess_data_single_dataset(self): """Test data preprocessing with single dataset"""