From 953776b9c884e8da1aed11d1646015460309f8a6 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Mon, 28 Oct 2024 15:50:38 -0500 Subject: [PATCH 01/10] Add parallelization --- src/dscim/menu/main_recipe.py | 59 ++++++++--- src/dscim/menu/risk_aversion.py | 42 +++++++- src/dscim/menu/simple_storage.py | 47 +++++++++ src/dscim/utils/utils.py | 161 ++++++++++++++++--------------- 4 files changed, 218 insertions(+), 91 deletions(-) diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index 0ab531d7..61cdd22d 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -106,6 +106,7 @@ def __init__( extrap_formula=None, fair_dims=None, save_files=None, + geography=None, **kwargs, ): if scc_quantiles is None: @@ -193,6 +194,10 @@ def __init__( "global_consumption", "global_consumption_no_pulse", ] + + if "country_ISOs" in kwargs: + self.countries_mapping = pd.read_csv(kwargs["country_ISOs"]) + self.countries = self.countries_mapping.MatchedISO.dropna().unique() super().__init__( sector_path=sector_path, @@ -203,8 +208,11 @@ def __init__( eta=eta, subset_dict=subset_dict, ce_path=ce_path, + geography=geography, + kwargs=kwargs, ) + self.rho = rho self.eta = eta self.fit_type = fit_type @@ -231,6 +239,7 @@ def __init__( self.extrap_formula = extrap_formula self.fair_dims = fair_dims self.save_files = save_files + self.geography = geography self.__dict__.update(**kwargs) self.kwargs = kwargs @@ -439,7 +448,7 @@ def output_attrs(self): return meta @cachedproperty - def collapsed_pop(self): + def collapsed_pop(self, geography = None): """Collapse population according to discount type.""" if (self.discounting_type == "constant") or ("ramsey" in self.discounting_type): pop = self.pop @@ -447,6 +456,27 @@ def collapsed_pop(self): pop = self.pop.mean("model") elif "gwr" in self.discounting_type: pop = self.pop.mean(["model", "ssp"]) + + if geography == "ir": + pass + elif geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in dams_collapse.region.values: + territories.append(mapping_dict(region[:3])) + + pop = (pop + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif geography == "global": + pop = pop.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + return pop @abstractmethod @@ -484,27 +514,27 @@ def damage_function_points(self) -> pd.DataFrame: -------- pd.DataFrame """ - df = self.global_damages_calculation() + df = self.damages_calculation(geography = self.geography) - if "slr" in df.columns: - df = df.merge(self.climate.gmsl, on=["year", "slr"]) - if "gcm" in df.columns: - df = df.merge(self.climate.gmst, on=["year", "gcm", "rcp"]) + climate_vars = [] + if "slr" in df.coords: + climate_vars.append(self.climate.gmsl.set_index(['slr','year']).to_xarray()) + if "gcm" in df.coords: + climate_vars.append(self.climate.gmst.set_index(['gcm','rcp','year']).to_xarray()) + climate_ds = xr.merge(climate_vars) # removing illegal combinations from estimation - if any([i in df.ssp.unique() for i in ["SSP1", "SSP5"]]): + if any([i in df.ssp.values for i in ["SSP1", "SSP5"]]): self.logger.info("Dropping illegal model combinations.") - for var in [i for i in df.columns if i in ["anomaly", "gmsl"]]: - df.loc[ - ((df.ssp == "SSP1") & (df.rcp == "rcp85")) - | ((df.ssp == "SSP5") & (df.rcp == "rcp45")), - var, - ] = np.nan + xr.where(((df.ssp == "SSP1") & (df.rcp == "rcp85")) + | ((df.ssp == "SSP5") & (df.rcp == "rcp45")),np.nan,df) # agriculture lacks ACCESS0-1/rcp85 combo if "agriculture" in self.sector: self.logger.info("Dropping illegal model combinations for agriculture.") - df.loc[(df.gcm == "ACCESS1-0") & (df.rcp == "rcp85"), "anomaly"] = np.nan + xr.where((climate_ds.gcm == "ACCESS1-0") & (climate_ds.rcp == "rcp85"), np.nan, climate_ds) + + df = xr.merge([df, climate_ds]).sel(year = df.year) return df @@ -520,6 +550,7 @@ def damage_function_calculation(self, damage_function_points, global_consumption """ yrs = range(self.climate.pulse_year, self.ext_subset_end_year + 1) + params_list, preds_list = [], [] diff --git a/src/dscim/menu/risk_aversion.py b/src/dscim/menu/risk_aversion.py index a06f9030..a27cbadf 100644 --- a/src/dscim/menu/risk_aversion.py +++ b/src/dscim/menu/risk_aversion.py @@ -43,6 +43,46 @@ def calculated_damages(self) -> xr.DataArray: """Calculate damages (difference between CEs)""" return self.ce_no_cc - self.ce_cc + def damages_calculation(self, geography) -> xr.Dataset: + """Aggregate damages to country level + + Returns + -------- + pd.DataFrame + """ + + self.logger.info(f"Calculating damages") + + dams_collapse = self.calculated_damages * self.collapsed_pop + + if geography == "ir": + pass + elif geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in dams_collapse.region.values: + territories.append(mapping_dict[region[:3]]) + + dams_collapse = (dams_collapse + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif geography == "global": + dams_collapse = dams_collapse.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + + if "gwr" in self.discounting_type: + dams_collapse = dams_collapse.assign( + ssp=str(list(self.gdp.ssp.values)), + model=str(list(self.gdp.model.values)), + ) + + return dams_collapse.to_dataset(name = 'damages') + def global_damages_calculation(self) -> pd.DataFrame: """Aggregate damages to global level @@ -61,7 +101,7 @@ def global_damages_calculation(self) -> pd.DataFrame: ) return df - + def global_consumption_calculation(self, disc_type): """Calculate global consumption diff --git a/src/dscim/menu/simple_storage.py b/src/dscim/menu/simple_storage.py index 37dd7ce3..27333c53 100644 --- a/src/dscim/menu/simple_storage.py +++ b/src/dscim/menu/simple_storage.py @@ -306,7 +306,12 @@ def __init__( histclim=None, ce_path=None, subset_dict=None, + geography=None, + **kwargs, ): + if geography is None: + geography = "global" + self.sector_path = sector_path self.save_path = save_path self.gdppc_bottom_code = gdppc_bottom_code @@ -317,7 +322,15 @@ def __init__( self.histclim = histclim self.ce_path = ce_path self.eta = eta + self.__dict__.update(**kwargs) + self.geography = geography + self.kwargs = kwargs + + if "country_ISOs" in kwargs: + self.countries_mapping = pd.read_csv(kwargs["country_ISOs"]) + self.countries = self.countries_mapping.MatchedISO.dropna().unique() + self.logger = logging.getLogger(__name__) def cut(self, xr_array, end_year=2099): @@ -351,7 +364,41 @@ def cut(self, xr_array, end_year=2099): ) return xr_data + + @property + def geography_collapsed_econ_vars(self): + + pop_collapse = self.pop + + if self.geography == "ir": + pass + elif self.geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in pop_collapse.region.values: + territories.append(mapping_dict(region[:3])) + + pop_collapse = (pop_collapse + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif self.geography == "global": + pop_collapse = pop_collapse.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + + if "gwr" in self.discounting_type: + pop_collapse = pop_collapse.assign( + ssp=str(list(self.gdp.ssp.values)), + model=str(list(self.gdp.model.values)), + ) + + return pop_collapse.to_dataset(name = 'damages') + @property def cut_econ_vars(self): """Economic variables from SSP object""" diff --git a/src/dscim/utils/utils.py b/src/dscim/utils/utils.py index eff80d9a..e77b32c1 100644 --- a/src/dscim/utils/utils.py +++ b/src/dscim/utils/utils.py @@ -10,7 +10,38 @@ logger = logging.getLogger(__name__) -def modeler(df, formula, type_estimation, exog, quantiles=None): +def fit_project(chunk_reg, formula, type_estimation, quantiles, index_dims): + # individual block's mapping function + df_reg = chunk_reg.to_dataframe().reset_index() + + year_range = df_reg.year.values + + df_tot = [] + for year in year_range: + time_window = range(year - 2, year + 3) + df = df_reg[df_reg.year.isin(time_window)] + params = modeler( + df=df, + formula=formula, + type_estimation=type_estimation, + quantiles=quantiles, + ) + params.assign(year = year) + df_tot.append(params) + + params = pd.concat(df_tot) + for dim in index_dims: + if chunk_reg[dim].size == 1: + params[dim] = chunk_reg[dim].values + else: + params[dim] = str(list(chunk_reg[dim].values)) + + print(params, flush=True) + + params = params.set_index(index_dims + ['year',]) + return(params.to_xarray()) + +def modeler(df, formula, type_estimation, quantiles=None): """Wrapper function for statsmodels functions (OLS and quantiles) For the quantiles, the function will fit for all quantiles between 0 and 1 @@ -43,11 +74,11 @@ def modeler(df, formula, type_estimation, exog, quantiles=None): pd.DataFrame with predictions """ + import numpy as np + if type_estimation == "ols": mod = smf.ols(formula=formula, data=df).fit() params = pd.DataFrame(mod.params).T - y_hat = pd.DataFrame(dict(exog, y_hat=mod.predict(exog=exog))) - elif type_estimation == "quantreg": # silence the 'max recursions' warning with warnings.catch_warnings(): @@ -55,7 +86,7 @@ def modeler(df, formula, type_estimation, exog, quantiles=None): # set up the model mod = smf.quantreg(formula=formula, data=df) - q_params, q_y_hat = [], [] + q_params = [] # calculate each quantile regression # save parameters and predictions @@ -64,17 +95,12 @@ def modeler(df, formula, type_estimation, exog, quantiles=None): params_quantile = pd.DataFrame(quant_mod.params).T params_quantile = params_quantile.assign(q=quant) q_params.append(params_quantile) - - fitted = pd.DataFrame(dict(exog, y_hat=quant_mod.predict(exog=exog))) - q_y_hat.append(fitted.assign(q=quant)) - params = pd.concat(q_params) - y_hat = pd.DataFrame(pd.concat(q_y_hat)) else: raise NotImplementedError(f"{type_estimation} is not valid option!") - return params, y_hat + return params def get_weights(quantiles): @@ -330,26 +356,26 @@ def c_equivalence(array, dims, eta, weights=None, func_args=None, func=None): def model_outputs( - damage_function, + damage_function_points, extrapolation_type, formula, + map_dims, year_range, year_start_pred, quantiles, global_c=None, - min_anomaly=0, - max_anomaly=20, - step_anomaly=0.2, - min_gmsl=0, - max_gmsl=300, - step_gmsl=3, type_estimation="ols", ): """Estimate damage function coefficients and predictions using passed formula. This model is estimated for each year in our timeframe (2010 to 2099) and - using a rolling window across time (5-yr by default). + using a rolling window across time (5-yr by default). Dims are placed into + three categories: + - map_dims: these dimensions index the final coefficients + - years: as described above + - everything else: these dimensions populate the full number of points in + the regression (i.e. gcm places all 33 gcms into the regression) Damage functions are extrapolated for each year between 2099 and 2300. @@ -357,6 +383,10 @@ def model_outputs( ---------- damage_function: pandas.DataFrame A global damage function. + formula: str + Regression formula using R regression syntax. + map_dims: + Dims to index the final coefficients. type_estimation: str Type of model use for damage function fitting: `ols`, `quantreg` extrapolation_type : str @@ -377,68 +407,54 @@ def model_outputs( DataFrame with yearly damage functions (coefficients and predictions respectively). """ - + + map_chunks = {'year': -1} + out_chunks = {'year': -1} + + for dim in map_dims: + map_chunks[dim] = 1 + out_chunks[dim] = 1 + + for dim in set(damage_function_points.coords) - set(map_chunks.keys()): + map_chunks[dim] = -1 + # set year of prediction for global C extrapolation fix_global_c = year_start_pred - 1 - - # set exogenous variables for predictions - gmsl = np.arange(min_gmsl, max_gmsl, step_gmsl) - temps = np.arange(min_anomaly, max_anomaly, step_anomaly) - - if ("anomaly" in damage_function.columns) and ("gmsl" in damage_function.columns): - exog_X = pd.DataFrame(product(temps, gmsl)) - exog = dict(anomaly=exog_X.values[:, 0], gmsl=exog_X.values[:, 1]) - elif "anomaly" in damage_function.columns: - exog = dict(anomaly=temps) - elif "gmsl" in damage_function.columns: - exog = dict(gmsl=gmsl) - else: - print("Independent variables not found.") - - # Rolling window estimation (5-yr) - list_params, list_y_hats = [], [] - - for year in year_range: - time_window = range(year - 2, year + 3) - df = damage_function[damage_function.year.isin(time_window)] - params, y_hat = modeler( - df=df, - formula=formula, - type_estimation=type_estimation, - exog=exog, - quantiles=quantiles, - ) - - params, y_hat = params.assign(year=year), y_hat.assign(year=year) - list_params.append(params) - list_y_hats.append(y_hat) - - # Concatenate results - param_df = pd.concat(list_params) - y_hat_df = pd.concat(list_y_hats) - + + ds_template = xr.Dataset( + data_vars={ + 'anomaly':(map_dims + ['year',], np.empty([damage_function_points[dim].size for dim in map_dims + ['year',]])), + 'np.power(anomaly, 2)':(map_dims + ['year',], np.empty([damage_function_points[dim].size for dim in map_dims + ['year',]])), + }, + coords={dim: damage_function_points[dim].values for dim in map_dims + ['year',]}, + ).chunk(out_chunks) + # print(1) + ds_points = (damage_function_points + .chunk(map_chunks)) + # print(1) + param_df = xr.map_blocks(fit_project, + ds_points, + template = ds_template, + kwargs = dict( + formula = menu_item.formula, + type_estimation = menu_item.fit_type, + quantiles = menu_item.quantreg_quantiles, + index_dims = map_dims,)) + + extrapolation_type = "global_c_ratio" if extrapolation_type == "global_c_ratio": - # convert to xarray immediately - index = ["year", "q"] if type_estimation == "quantreg" else ["year"] - y_hat_df = y_hat_df.set_index( - [i for i in y_hat_df.columns if "y_hat" not in i] - ).to_xarray() - param_df = param_df.set_index(index).to_xarray() - # Calculate global consumption ratios to fixed year global_c_factors = ( global_c.sel(year=slice(fix_global_c + 1, None)) / global_c.sel(year=fix_global_c) ).squeeze() - + # Extrapolate by multiplying fixed params by ratios - extrap_preds = y_hat_df.sel(year=fix_global_c) * global_c_factors extrap_params = param_df.sel(year=fix_global_c) * global_c_factors - + # concatenate extrapolation and pre-2100 - preds = xr.concat([y_hat_df, extrap_preds], dim="year") parameters = xr.concat([param_df, extrap_params], dim="year") - + # For the local case we don't care about the time dimension, the # extrapolation will index on the last year available of damages (2099) # by design (changed using the fix_global_c parameter) and will use @@ -447,14 +463,7 @@ def model_outputs( if global_c_factors.year.dims == (): raise NotImplementedError("WATCH OUT! Local is scrapped.") - # Return all results - res = { - "parameters": parameters, - "preds": preds, - } - - return res - + return(parameters) def compute_damages(anomaly, betas, formula): """Calculate damages using FAIR anomalies (either control or pulse). From a7bbc1cfeb640bf05d955c3a4df3e34838b2acea Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Tue, 12 Nov 2024 17:40:43 -0600 Subject: [PATCH 02/10] Fix ir-level extrapolation --- src/dscim/menu/main_recipe.py | 50 +++++++++++++++++------- src/dscim/menu/risk_aversion.py | 10 ++++- src/dscim/preprocessing/input_damages.py | 2 + 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index 61cdd22d..87e05d18 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -107,6 +107,7 @@ def __init__( fair_dims=None, save_files=None, geography=None, + individual_region=None, **kwargs, ): if scc_quantiles is None: @@ -240,6 +241,7 @@ def __init__( self.fair_dims = fair_dims self.save_files = save_files self.geography = geography + self.individual_region = individual_region self.__dict__.update(**kwargs) self.kwargs = kwargs @@ -828,9 +830,14 @@ def global_consumption_per_capita(self, disc_type): """ # Calculate global consumption per capita - array_pc = self.global_consumption_calculation( - disc_type - ) / self.collapsed_pop.sum("region") + if self.individual_region is not None: + array_pc = self.global_consumption_calculation( + disc_type + ) / self.collapsed_pop.sel(region = self.individual_region) + else: + array_pc = self.global_consumption_calculation( + disc_type + ) / self.collapsed_pop.sum("region") if self.NAME == "equity": # equity recipe's growth is capped to @@ -863,15 +870,23 @@ def global_consumption(self): """Global consumption without climate change""" # rff simulation means that GDP already exists out to 2300 + individual_region = self.individual_region + if 2300 in self.gdp.year: self.logger.debug("Global consumption found up to 2300.") - global_cons = self.gdp.sum("region").rename("global_consumption") + if individual_region is not None: + global_cons = self.gdp.sel(region = individual_region).rename("global_consumption") + else: + global_cons = self.gdp.sum("region").rename("global_consumption") else: self.logger.info("Extrapolating global consumption.") # holding population constant # from 2100 to 2300 with 2099 values - pop = self.collapsed_pop.sum("region") + if individual_region is None: + pop = self.collapsed_pop.sum("region") + else: + pop = self.collapsed_pop.sel(region = individual_region) pop = pop.reindex( year=range(pop.year.min().values, self.ext_end_year + 1), method="ffill", @@ -988,18 +1003,25 @@ def gmsl_max(self): @save(name="global_consumption_no_pulse") def global_consumption_no_pulse(self): """Global consumption under FAIR control scenario.""" - + individual_region = self.individual_region fair_control = self.climate.fair_control if self.clip_gmsl: fair_control["gmsl"] = np.minimum(fair_control["gmsl"], self.gmsl_max) - damages = compute_damages( - fair_control, - betas=self.damage_function_coefficients, - formula=self.formula, - ) - + if individual_region is not None: + damages = compute_damages( + fair_control, + betas=self.damage_function_coefficients.sel(region = individual_region), + formula=self.formula, + ) + else: + damages = compute_damages( + fair_control, + betas=self.damage_function_coefficients, + formula=self.formula, + ) + cc_cons = self.global_consumption - damages gc_no_pulse = [] @@ -1019,7 +1041,7 @@ def global_consumption_no_pulse(self): @save(name="global_consumption_pulse") def global_consumption_pulse(self): """Global consumption under FAIR pulse scenario.""" - + individual_region = self.individual_region fair_pulse = self.climate.fair_pulse if self.clip_gmsl: @@ -1027,7 +1049,7 @@ def global_consumption_pulse(self): damages = compute_damages( fair_pulse, - betas=self.damage_function_coefficients, + betas=self.damage_function_coefficients.sel(region = individual_region), formula=self.formula, ) diff --git a/src/dscim/menu/risk_aversion.py b/src/dscim/menu/risk_aversion.py index a27cbadf..1e84b53b 100644 --- a/src/dscim/menu/risk_aversion.py +++ b/src/dscim/menu/risk_aversion.py @@ -110,8 +110,16 @@ def global_consumption_calculation(self, disc_type): xr.DataArray """ + if self.geography == 'ir': + gdp = self.gdp + elif self.geography == 'country': + #group gdp by some grouping and collapse + gdp = self.gdp.sum(dim=["region"]) + else: + gdp = self.gdp.sum(dim=["region"]) + if (disc_type == "constant") or ("ramsey" in disc_type): - global_cons_no_cc = self.gdp.sum(dim=["region"]) + global_cons_no_cc = gdp elif disc_type == "constant_model_collapsed": global_cons_no_cc = self.gdp.sum(dim=["region"]).mean(dim=["model"]) diff --git a/src/dscim/preprocessing/input_damages.py b/src/dscim/preprocessing/input_damages.py index e55f6bb0..b84f5808 100644 --- a/src/dscim/preprocessing/input_damages.py +++ b/src/dscim/preprocessing/input_damages.py @@ -808,6 +808,8 @@ def prep( if damages[v].dtype == object: damages[v] = damages[v].astype("unicode") + damages.coords['gcm'] = damages.coords['gcm'].astype('object') + if i == 0: damages.to_zarr( f"{outpath}/impacts-darwin-montecarlo-damages-v{mortality_version}.zarr", From 10a9393f0b4fe14cc32c2c68fcf13eaaf09b5953 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Wed, 20 Nov 2024 11:31:59 -0600 Subject: [PATCH 03/10] Fix extrapolation --- src/dscim/menu/main_recipe.py | 85 +++++++++++++++++++++++---------- src/dscim/menu/risk_aversion.py | 2 +- 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index 87e05d18..513a9c32 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -476,7 +476,7 @@ def collapsed_pop(self, geography = None): .assign_coords({'region':territories}) .groupby('region') .sum()) - elif geography == "global": + elif geography == "globe": pop = pop.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') return pop @@ -823,22 +823,37 @@ def global_consumption_calculation(self, disc_type): def global_consumption_per_capita(self, disc_type): """Global consumption per capita - + Returns ------- xr.DataArray """ - + # Calculate global consumption per capita - if self.individual_region is not None: - array_pc = self.global_consumption_calculation( - disc_type - ) / self.collapsed_pop.sel(region = self.individual_region) - else: - array_pc = self.global_consumption_calculation( - disc_type - ) / self.collapsed_pop.sum("region") - + array_pc = self.global_consumption_calculation( + disc_type + ) / self.collapsed_pop#.sum("region") + + if self.geography == "ir": + pass + elif self.geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in array_pc.region.values: + territories.append(mapping_dict[region[:3]]) + + array_pc = (array_pc + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif self.geography == "globe": + array_pc = array_pc.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + if self.NAME == "equity": # equity recipe's growth is capped to # risk aversion recipe's growth rates @@ -850,7 +865,7 @@ def global_consumption_per_capita(self, disc_type): method="growth_constant", cap=self.risk_aversion_growth_rates(), ) - + else: extrapolated = extrapolate( xr_array=array_pc, @@ -859,9 +874,9 @@ def global_consumption_per_capita(self, disc_type): interp_year=self.ext_end_year, method="growth_constant", ) - + complete_array = xr.concat([array_pc, extrapolated], dim="year") - + return complete_array @cachedproperty @@ -869,9 +884,9 @@ def global_consumption_per_capita(self, disc_type): def global_consumption(self): """Global consumption without climate change""" - # rff simulation means that GDP already exists out to 2300 individual_region = self.individual_region + # rff simulation means that GDP already exists out to 2300 if 2300 in self.gdp.year: self.logger.debug("Global consumption found up to 2300.") if individual_region is not None: @@ -884,7 +899,13 @@ def global_consumption(self): # holding population constant # from 2100 to 2300 with 2099 values if individual_region is None: - pop = self.collapsed_pop.sum("region") + pop = self.collapsed_pop + if self.geography == "ir": + pass + elif self.geography == "country": + pass + elif self.geography == "globe": + pop = self.collapsed_pop.sum("region") else: pop = self.collapsed_pop.sel(region = individual_region) pop = pop.reindex( @@ -893,9 +914,14 @@ def global_consumption(self): ) # Calculate global consumption back by - global_cons = ( - self.global_consumption_per_capita(self.discounting_type) * pop - ) + if individual_region is None: + global_cons = ( + self.global_consumption_per_capita(self.discounting_type) * pop + ) + else: + global_cons = ( + self.global_consumption_per_capita(self.discounting_type).sel(region = individual_region) * pop + ) # Add dimension # @TODO: remove this line altogether @@ -1047,12 +1073,19 @@ def global_consumption_pulse(self): if self.clip_gmsl: fair_pulse["gmsl"] = np.minimum(fair_pulse["gmsl"], self.gmsl_max) - damages = compute_damages( - fair_pulse, - betas=self.damage_function_coefficients.sel(region = individual_region), - formula=self.formula, - ) - + if individual_region is not None: + damages = compute_damages( + fair_pulse, + betas=self.damage_function_coefficients.sel(region = individual_region), + formula=self.formula, + ) + else: + damages = compute_damages( + fair_pulse, + betas=self.damage_function_coefficients, + formula=self.formula, + ) + cc_cons = self.global_consumption - damages gc_no_pulse = [] for wp in self.weitzman_parameter: diff --git a/src/dscim/menu/risk_aversion.py b/src/dscim/menu/risk_aversion.py index 1e84b53b..bbc92ed7 100644 --- a/src/dscim/menu/risk_aversion.py +++ b/src/dscim/menu/risk_aversion.py @@ -72,7 +72,7 @@ def damages_calculation(self, geography) -> xr.Dataset: .assign_coords({'region':territories}) .groupby('region') .sum()) - elif geography == "global": + elif geography == "globe": dams_collapse = dams_collapse.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') if "gwr" in self.discounting_type: From c83829ef72450c9ba4abcddb795ae7af16413a99 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Mon, 13 Jan 2025 14:43:11 -0600 Subject: [PATCH 04/10] Fix regional scc calculation --- src/dscim/menu/main_recipe.py | 18 ++++++++++-------- src/dscim/menu/risk_aversion.py | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index 513a9c32..102a6500 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -832,10 +832,10 @@ def global_consumption_per_capita(self, disc_type): # Calculate global consumption per capita array_pc = self.global_consumption_calculation( disc_type - ) / self.collapsed_pop#.sum("region") + ) if self.geography == "ir": - pass + array_pc = array_pc / self.collapsed_pop elif self.geography == "country": territories = [] mapping_dict = {} @@ -847,12 +847,14 @@ def global_consumption_per_capita(self, disc_type): for region in array_pc.region.values: territories.append(mapping_dict[region[:3]]) - array_pc = (array_pc - .assign_coords({'region':territories}) - .groupby('region') - .sum()) + pop = (self.collapsed_pop + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + + array_pc = (array_pc / pop) elif self.geography == "globe": - array_pc = array_pc.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + array_pc = (array_pc / self.collapsed_pop.sum("region")).assign_coords({'region':'globe'}).expand_dims('region') if self.NAME == "equity": # equity recipe's growth is capped to @@ -905,7 +907,7 @@ def global_consumption(self): elif self.geography == "country": pass elif self.geography == "globe": - pop = self.collapsed_pop.sum("region") + pop = pop.sum("region") else: pop = self.collapsed_pop.sel(region = individual_region) pop = pop.reindex( diff --git a/src/dscim/menu/risk_aversion.py b/src/dscim/menu/risk_aversion.py index bbc92ed7..e99783c1 100644 --- a/src/dscim/menu/risk_aversion.py +++ b/src/dscim/menu/risk_aversion.py @@ -122,7 +122,7 @@ def global_consumption_calculation(self, disc_type): global_cons_no_cc = gdp elif disc_type == "constant_model_collapsed": - global_cons_no_cc = self.gdp.sum(dim=["region"]).mean(dim=["model"]) + global_cons_no_cc = gdp.mean(dim=["model"]) elif "gwr" in disc_type: ce_cons = self.ce(self.gdppc, dims=["ssp", "model"]) From 27f0a914a1e30bbd7dfdbfef95582524c1f9d067 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Fri, 14 Feb 2025 16:05:48 -0600 Subject: [PATCH 05/10] Allow individual region damage function fits --- src/dscim/menu/baseline.py | 67 +++++++++++++++++++++++++++++-- src/dscim/menu/main_recipe.py | 74 +++++++++++------------------------ src/dscim/utils/utils.py | 6 +-- 3 files changed, 89 insertions(+), 58 deletions(-) diff --git a/src/dscim/menu/baseline.py b/src/dscim/menu/baseline.py index 892c3cc7..291afd10 100644 --- a/src/dscim/menu/baseline.py +++ b/src/dscim/menu/baseline.py @@ -1,5 +1,7 @@ +import pandas as pd +import xarray as xr from dscim.menu.main_recipe import MainRecipe - +import os class Baseline(MainRecipe): """Adding up option""" @@ -16,9 +18,68 @@ def ce_no_cc(self): def global_damages_calculation(self): """Call global damages""" return self.adding_up_damages.to_dataframe("damages").reset_index() - + + def damages_calculation(self, geography) -> xr.Dataset: + """Aggregate damages to country level + + Returns + -------- + pd.DataFrame + """ + + self.logger.info(f"Calculating damages") + if self.individual_region: + dams_collapse = self.calculated_damages * self.collapsed_pop.sel(region = self.individual_region, drop=True) + else: + dams_collapse = self.calculated_damages * self.collapsed_pop + + if geography == "ir": + pass + elif geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in dams_collapse.region.values: + territories.append(mapping_dict[region[:3]]) + + dams_collapse = (dams_collapse + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif geography == "globe": + dams_collapse = dams_collapse.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + + if "gwr" in self.discounting_type: + dams_collapse = dams_collapse.assign( + ssp=str(list(self.gdp.ssp.values)), + model=str(list(self.gdp.model.values)), + ) + + return dams_collapse.to_dataset(name = 'damages') + + @property def calculated_damages(self): - pass + mean_cc = f"{self.ce_path}/adding_up_cc.zarr" + mean_no_cc = f"{self.ce_path}/adding_up_no_cc.zarr" + + if os.path.exists(mean_cc) and os.path.exists(mean_no_cc): + self.logger.info( + f"Adding up aggregated damages found at {mean_cc}, {mean_no_cc}. These are being loaded..." + ) + damages = ( + (xr.open_zarr(mean_no_cc).no_cc - xr.open_zarr(mean_cc).cc) * self.pop + ) + else: + raise NotImplementedError( + f"Adding up reduced damages not found: {mean_no_cc}, {mean_cc}. Please reduce damages for for `adding_up`." + ) + if self.individual_region: + damages = damages.sel(region=self.individual_region,drop=True) + return self.cut(damages) def ce_cc_calculation(self): pass diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index 102a6500..5e04244b 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -460,7 +460,7 @@ def collapsed_pop(self, geography = None): pop = self.pop.mean(["model", "ssp"]) if geography == "ir": - pass + pass elif geography == "country": territories = [] mapping_dict = {} @@ -599,47 +599,27 @@ def damage_function_calculation(self, damage_function_points, global_consumption elif (self.discounting_type == "constant") or ( "ramsey" in self.discounting_type ): - for ssp, model in list( - product( - damage_function_points.ssp.unique(), - damage_function_points.model.unique(), - ) - ): - # Subset dataframe to specific SSP-IAM combination. - fit_subset = damage_function_points[ - (damage_function_points["ssp"] == ssp) - & (damage_function_points["model"] == model) - ] - - global_c_subset = global_consumption.sel({"ssp": ssp, "model": model}) - - # Fit damage function curves using the data subset - damage_function = model_outputs( - damage_function=fit_subset, - formula=self.formula, - type_estimation=self.fit_type, - global_c=global_c_subset, - extrapolation_type=self.ext_method, - quantiles=self.quantreg_quantiles, - year_range=yrs, - year_start_pred=self.ext_subset_end_year + 1, - ) - - # Add variables - params = damage_function["parameters"].expand_dims( - dict( - discount_type=[self.discounting_type], ssp=[ssp], model=[model] - ) - ) + # Subset dataframe to specific SSP-IAM combination. + fit_subset = damage_function_points + global_c_subset = global_consumption - preds = damage_function["preds"].expand_dims( - dict( - discount_type=[self.discounting_type], ssp=[ssp], model=[model] - ) - ) + # Fit damage function curves using the data subset + damage_function = model_outputs( + damage_function_points=fit_subset, + formula=self.formula, + map_dims=['ssp','model'], + type_estimation=self.fit_type, + global_c=global_c_subset, + extrapolation_type=self.ext_method, + quantiles=self.quantreg_quantiles, + year_range=yrs, + year_start_pred=self.ext_subset_end_year + 1, + ) + + # Add variables + params = damage_function - params_list.append(params) - preds_list.append(preds) + params_list.append(params) elif "gwr" in self.discounting_type: # Fit damage function across all SSP-IAM combinations, as expected @@ -667,20 +647,10 @@ def damage_function_calculation(self, damage_function_points, global_consumption ) ) - preds = damage_function["preds"].expand_dims( - dict( - discount_type=[self.discounting_type], - ssp=[str(list(self.gdp.ssp.values))], - model=[str(list(self.gdp.model.values))], - ) - ) - params_list.append(params) - preds_list.append(preds) return dict( params=xr.combine_by_coords(params_list), - preds=xr.combine_by_coords(preds_list), ) @cachedproperty @@ -892,7 +862,7 @@ def global_consumption(self): if 2300 in self.gdp.year: self.logger.debug("Global consumption found up to 2300.") if individual_region is not None: - global_cons = self.gdp.sel(region = individual_region).rename("global_consumption") + global_cons = self.gdp.sel(region = individual_region,drop=True).rename("global_consumption") else: global_cons = self.gdp.sum("region").rename("global_consumption") else: @@ -922,7 +892,7 @@ def global_consumption(self): ) else: global_cons = ( - self.global_consumption_per_capita(self.discounting_type).sel(region = individual_region) * pop + self.global_consumption_per_capita(self.discounting_type).sel(region = individual_region,drop=True) * pop ) # Add dimension diff --git a/src/dscim/utils/utils.py b/src/dscim/utils/utils.py index e77b32c1..12d0af9d 100644 --- a/src/dscim/utils/utils.py +++ b/src/dscim/utils/utils.py @@ -436,9 +436,9 @@ def model_outputs( ds_points, template = ds_template, kwargs = dict( - formula = menu_item.formula, - type_estimation = menu_item.fit_type, - quantiles = menu_item.quantreg_quantiles, + formula = formula, + type_estimation = type_estimation, + quantiles = quantiles, index_dims = map_dims,)) extrapolation_type = "global_c_ratio" From 549e0fe83cf9f4aa6489842bceaaa29d6dbde714 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Tue, 1 Apr 2025 14:50:11 -0500 Subject: [PATCH 06/10] Fix double counted pop --- src/dscim/menu/baseline.py | 51 +++++++++++- src/dscim/menu/main_recipe.py | 72 ++++++++++++----- src/dscim/menu/simple_storage.py | 2 +- src/dscim/utils/utils.py | 129 ++++++++++++++++++------------- 4 files changed, 174 insertions(+), 80 deletions(-) diff --git a/src/dscim/menu/baseline.py b/src/dscim/menu/baseline.py index 291afd10..4a7c199c 100644 --- a/src/dscim/menu/baseline.py +++ b/src/dscim/menu/baseline.py @@ -71,7 +71,7 @@ def calculated_damages(self): f"Adding up aggregated damages found at {mean_cc}, {mean_no_cc}. These are being loaded..." ) damages = ( - (xr.open_zarr(mean_no_cc).no_cc - xr.open_zarr(mean_cc).cc) * self.pop + (xr.open_zarr(mean_no_cc).no_cc - xr.open_zarr(mean_cc).cc) ) else: raise NotImplementedError( @@ -93,15 +93,58 @@ def ce_no_cc_calculation(self): def global_consumption_calculation(self, disc_type): """Calculate global consumption""" + if self.geography == 'ir': + gdp = self.gdp + elif self.geography == 'country': + #group gdp by some grouping and collapse + gdp = self.gdp.sum(dim=["region"]) + else: + gdp = self.gdp.sum(dim=["region"]) + if (disc_type == "constant") or ("ramsey" in disc_type): - global_cons_no_cc = self.gdp.sum(dim=["region"]) + global_cons_no_cc = gdp elif disc_type == "constant_model_collapsed": - global_cons_no_cc = self.gdp.sum(dim=["region"]).mean(dim=["model"]) + global_cons_no_cc = gdp.mean(dim=["model"]) elif "gwr" in disc_type: - global_cons_no_cc = self.gdp.sum(dim=["region"]).mean(dim=["model", "ssp"]) + global_cons_no_cc = gdp.mean(dim=["model", "ssp"]) global_cons_no_cc.name = f"global_cons_{disc_type}" return global_cons_no_cc + +# def global_consumption_calculation(self, disc_type): +# """Calculate global consumption + +# Returns +# ------- +# xr.DataArray +# """ + +# if self.geography == 'ir': +# gdp = self.gdp +# elif self.geography == 'country': +# #group gdp by some grouping and collapse +# gdp = self.gdp.sum(dim=["region"]) +# else: +# gdp = self.gdp.sum(dim=["region"]) + +# if (disc_type == "constant") or ("ramsey" in disc_type): +# global_cons_no_cc = gdp + +# elif disc_type == "constant_model_collapsed": +# global_cons_no_cc = gdp.mean(dim=["model"]) + +# elif "gwr" in disc_type: +# ce_cons = self.ce(self.gdppc, dims=["ssp", "model"]) +# global_cons_no_cc = (ce_cons * self.collapsed_pop).sum(dim=["region"]) + +# # Convert to array in case xarray becames temperamental. This is a hack +# # that need to be changed +# if isinstance(global_cons_no_cc, xr.Dataset): +# global_cons_no_cc = global_cons_no_cc.to_array() + +# global_cons_no_cc.name = f"global_cons_{disc_type}" + +# return global_cons_no_cc diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index 5e04244b..f26b76e8 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -540,6 +540,7 @@ def damage_function_points(self) -> pd.DataFrame: return df + def damage_function_calculation(self, damage_function_points, global_consumption): """The damage function model fit may be : (1) ssp specific, (2) ssp-model specific, (3) unique across ssp-model. This depends on the type of discounting. In each case the input data passed to the fitting functions and the formatting of the returned @@ -552,7 +553,6 @@ def damage_function_calculation(self, damage_function_points, global_consumption """ yrs = range(self.climate.pulse_year, self.ext_subset_end_year + 1) - params_list, preds_list = [], [] @@ -599,27 +599,47 @@ def damage_function_calculation(self, damage_function_points, global_consumption elif (self.discounting_type == "constant") or ( "ramsey" in self.discounting_type ): - # Subset dataframe to specific SSP-IAM combination. - fit_subset = damage_function_points - global_c_subset = global_consumption + for ssp, model in list( + product( + damage_function_points.ssp.unique(), + damage_function_points.model.unique(), + ) + ): + # Subset dataframe to specific SSP-IAM combination. + fit_subset = damage_function_points[ + (damage_function_points["ssp"] == ssp) + & (damage_function_points["model"] == model) + ] - # Fit damage function curves using the data subset - damage_function = model_outputs( - damage_function_points=fit_subset, - formula=self.formula, - map_dims=['ssp','model'], - type_estimation=self.fit_type, - global_c=global_c_subset, - extrapolation_type=self.ext_method, - quantiles=self.quantreg_quantiles, - year_range=yrs, - year_start_pred=self.ext_subset_end_year + 1, - ) - - # Add variables - params = damage_function + global_c_subset = global_consumption.sel({"ssp": ssp, "model": model}) - params_list.append(params) + # Fit damage function curves using the data subset + damage_function = model_outputs( + damage_function=fit_subset, + formula=self.formula, + type_estimation=self.fit_type, + global_c=global_c_subset, + extrapolation_type=self.ext_method, + quantiles=self.quantreg_quantiles, + year_range=yrs, + year_start_pred=self.ext_subset_end_year + 1, + ) + + # Add variables + params = damage_function["parameters"].expand_dims( + dict( + discount_type=[self.discounting_type], ssp=[ssp], model=[model] + ) + ) + + preds = damage_function["preds"].expand_dims( + dict( + discount_type=[self.discounting_type], ssp=[ssp], model=[model] + ) + ) + + params_list.append(params) + preds_list.append(preds) elif "gwr" in self.discounting_type: # Fit damage function across all SSP-IAM combinations, as expected @@ -647,10 +667,20 @@ def damage_function_calculation(self, damage_function_points, global_consumption ) ) + preds = damage_function["preds"].expand_dims( + dict( + discount_type=[self.discounting_type], + ssp=[str(list(self.gdp.ssp.values))], + model=[str(list(self.gdp.model.values))], + ) + ) + params_list.append(params) + preds_list.append(preds) return dict( params=xr.combine_by_coords(params_list), + preds=xr.combine_by_coords(preds_list), ) @cachedproperty @@ -879,7 +909,7 @@ def global_consumption(self): elif self.geography == "globe": pop = pop.sum("region") else: - pop = self.collapsed_pop.sel(region = individual_region) + pop = self.collapsed_pop.sel(region = individual_region, drop=True) pop = pop.reindex( year=range(pop.year.min().values, self.ext_end_year + 1), method="ffill", diff --git a/src/dscim/menu/simple_storage.py b/src/dscim/menu/simple_storage.py index 27333c53..5e9b4b76 100644 --- a/src/dscim/menu/simple_storage.py +++ b/src/dscim/menu/simple_storage.py @@ -96,7 +96,7 @@ def gmst(self): @property def gmsl(self): """Cached GMSL anomalies""" - gmsl = xr.open_zarr(self.gmsl_path).gmsl.to_dataframe().reset_index() + gmsl = xr.open_zarr(self.gmsl_path).to_dataframe().reset_index() return gmsl diff --git a/src/dscim/utils/utils.py b/src/dscim/utils/utils.py index 12d0af9d..18d2a15b 100644 --- a/src/dscim/utils/utils.py +++ b/src/dscim/utils/utils.py @@ -41,7 +41,7 @@ def fit_project(chunk_reg, formula, type_estimation, quantiles, index_dims): params = params.set_index(index_dims + ['year',]) return(params.to_xarray()) -def modeler(df, formula, type_estimation, quantiles=None): +def modeler(df, formula, type_estimation, exog, quantiles=None): """Wrapper function for statsmodels functions (OLS and quantiles) For the quantiles, the function will fit for all quantiles between 0 and 1 @@ -74,11 +74,11 @@ def modeler(df, formula, type_estimation, quantiles=None): pd.DataFrame with predictions """ - import numpy as np - if type_estimation == "ols": mod = smf.ols(formula=formula, data=df).fit() params = pd.DataFrame(mod.params).T + y_hat = pd.DataFrame(dict(exog, y_hat=mod.predict(exog=exog))) + elif type_estimation == "quantreg": # silence the 'max recursions' warning with warnings.catch_warnings(): @@ -86,7 +86,7 @@ def modeler(df, formula, type_estimation, quantiles=None): # set up the model mod = smf.quantreg(formula=formula, data=df) - q_params = [] + q_params, q_y_hat = [], [] # calculate each quantile regression # save parameters and predictions @@ -95,12 +95,17 @@ def modeler(df, formula, type_estimation, quantiles=None): params_quantile = pd.DataFrame(quant_mod.params).T params_quantile = params_quantile.assign(q=quant) q_params.append(params_quantile) + + fitted = pd.DataFrame(dict(exog, y_hat=quant_mod.predict(exog=exog))) + #q_y_hat.append(fitted.assign(q=quant)) + params = pd.concat(q_params) + #y_hat = pd.DataFrame(pd.concat(q_y_hat)) else: raise NotImplementedError(f"{type_estimation} is not valid option!") - return params + return params, None#y_hat def get_weights(quantiles): @@ -356,26 +361,26 @@ def c_equivalence(array, dims, eta, weights=None, func_args=None, func=None): def model_outputs( - damage_function_points, + damage_function, extrapolation_type, formula, - map_dims, year_range, year_start_pred, quantiles, global_c=None, + min_anomaly=0, + max_anomaly=20, + step_anomaly=0.2, + min_gmsl=0, + max_gmsl=300, + step_gmsl=3, type_estimation="ols", ): """Estimate damage function coefficients and predictions using passed formula. This model is estimated for each year in our timeframe (2010 to 2099) and - using a rolling window across time (5-yr by default). Dims are placed into - three categories: - - map_dims: these dimensions index the final coefficients - - years: as described above - - everything else: these dimensions populate the full number of points in - the regression (i.e. gcm places all 33 gcms into the regression) + using a rolling window across time (5-yr by default). Damage functions are extrapolated for each year between 2099 and 2300. @@ -383,10 +388,6 @@ def model_outputs( ---------- damage_function: pandas.DataFrame A global damage function. - formula: str - Regression formula using R regression syntax. - map_dims: - Dims to index the final coefficients. type_estimation: str Type of model use for damage function fitting: `ols`, `quantreg` extrapolation_type : str @@ -407,54 +408,68 @@ def model_outputs( DataFrame with yearly damage functions (coefficients and predictions respectively). """ - - map_chunks = {'year': -1} - out_chunks = {'year': -1} - - for dim in map_dims: - map_chunks[dim] = 1 - out_chunks[dim] = 1 - - for dim in set(damage_function_points.coords) - set(map_chunks.keys()): - map_chunks[dim] = -1 - + # set year of prediction for global C extrapolation fix_global_c = year_start_pred - 1 - - ds_template = xr.Dataset( - data_vars={ - 'anomaly':(map_dims + ['year',], np.empty([damage_function_points[dim].size for dim in map_dims + ['year',]])), - 'np.power(anomaly, 2)':(map_dims + ['year',], np.empty([damage_function_points[dim].size for dim in map_dims + ['year',]])), - }, - coords={dim: damage_function_points[dim].values for dim in map_dims + ['year',]}, - ).chunk(out_chunks) - # print(1) - ds_points = (damage_function_points - .chunk(map_chunks)) - # print(1) - param_df = xr.map_blocks(fit_project, - ds_points, - template = ds_template, - kwargs = dict( - formula = formula, - type_estimation = type_estimation, - quantiles = quantiles, - index_dims = map_dims,)) - - extrapolation_type = "global_c_ratio" + + # set exogenous variables for predictions + gmsl = np.arange(min_gmsl, max_gmsl, step_gmsl) + temps = np.arange(min_anomaly, max_anomaly, step_anomaly) + + if ("anomaly" in damage_function.columns) and ("gmsl" in damage_function.columns): + exog_X = pd.DataFrame(product(temps, gmsl)) + exog = dict(anomaly=exog_X.values[:, 0], gmsl=exog_X.values[:, 1]) + elif "anomaly" in damage_function.columns: + exog = dict(anomaly=temps) + elif "gmsl" in damage_function.columns: + exog = dict(gmsl=gmsl) + else: + print("Independent variables not found.") + + # Rolling window estimation (5-yr) + list_params, list_y_hats = [], [] + + for year in year_range: + time_window = range(year - 2, year + 3) + df = damage_function[damage_function.year.isin(time_window)] + params, y_hat = modeler( + df=df, + formula=formula, + type_estimation=type_estimation, + exog=exog, + quantiles=quantiles, + ) + + params, y_hat = params.assign(year=year), y_hat.assign(year=year) + list_params.append(params) + list_y_hats.append(y_hat) + + # Concatenate results + param_df = pd.concat(list_params) + y_hat_df = pd.concat(list_y_hats) + if extrapolation_type == "global_c_ratio": + # convert to xarray immediately + index = ["year", "q"] if type_estimation == "quantreg" else ["year"] + y_hat_df = y_hat_df.set_index( + [i for i in y_hat_df.columns if "y_hat" not in i] + ).to_xarray() + param_df = param_df.set_index(index).to_xarray() + # Calculate global consumption ratios to fixed year global_c_factors = ( global_c.sel(year=slice(fix_global_c + 1, None)) / global_c.sel(year=fix_global_c) ).squeeze() - + # Extrapolate by multiplying fixed params by ratios + #extrap_preds = y_hat_df.sel(year=fix_global_c) * global_c_factors extrap_params = param_df.sel(year=fix_global_c) * global_c_factors - + # concatenate extrapolation and pre-2100 + #preds = xr.concat([y_hat_df, extrap_preds], dim="year") parameters = xr.concat([param_df, extrap_params], dim="year") - + # For the local case we don't care about the time dimension, the # extrapolation will index on the last year available of damages (2099) # by design (changed using the fix_global_c parameter) and will use @@ -463,7 +478,13 @@ def model_outputs( if global_c_factors.year.dims == (): raise NotImplementedError("WATCH OUT! Local is scrapped.") - return(parameters) + # Return all results + res = { + "parameters": parameters, + "preds": None, + } + + return res def compute_damages(anomaly, betas, formula): """Calculate damages using FAIR anomalies (either control or pulse). From 97040f3a2626d91dbea0988080beab923b101c92 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Tue, 15 Apr 2025 13:39:35 -0500 Subject: [PATCH 07/10] Add global default --- src/dscim/menu/main_recipe.py | 38 ++++++++++++++++++++++++-------- src/dscim/menu/simple_storage.py | 4 ++-- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index f26b76e8..a8b7fa0f 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -106,7 +106,7 @@ def __init__( extrap_formula=None, fair_dims=None, save_files=None, - geography=None, + geography="globe", individual_region=None, **kwargs, ): @@ -507,9 +507,7 @@ def ce_no_cc(self): """Certainty equivalent of consumption without climate change damages""" return self.ce_no_cc_calculation() - @cachedproperty - @save(name="damage_function_points") - def damage_function_points(self) -> pd.DataFrame: + def damage_function_points_xr(self) -> pd.DataFrame: """Global damages by RCP/GCM or SLR Returns @@ -539,6 +537,19 @@ def damage_function_points(self) -> pd.DataFrame: df = xr.merge([df, climate_ds]).sel(year = df.year) return df + + @cachedproperty + @save(name="damage_function_points") + def damage_function_points(self) -> pd.DataFrame: + """Global damages by RCP/GCM or SLR + + Returns + -------- + pd.DataFrame + """ + pts = self.damage_function_points_xr().to_dataframe().reset_index() + + return pts def damage_function_calculation(self, damage_function_points, global_consumption): @@ -1331,11 +1342,20 @@ def calculate_stream_discount_factors(self, discounting_type, fair_aggregation): # holding population constant # from 2100 to 2300 with 2099 values - full_pop = self.collapsed_pop.sum("region") - full_pop = full_pop.reindex( - year=range(full_pop.year.min().values, self.ext_end_year + 1), - method="ffill", - ) + + individual_region = self.individual_region + if individual_region is None: + full_pop = self.collapsed_pop.sum("region") + full_pop = full_pop.reindex( + year=range(full_pop.year.min().values, self.ext_end_year + 1), + method="ffill", + ) + else: + full_pop = self.collapsed_pop.sel(region = individual_region,drop=True) + full_pop = full_pop.reindex( + year=range(full_pop.year.min().values, self.ext_end_year + 1), + method="ffill", + ) # for aggregations other than uncollapsed, # we need to collapse over pop dimensions diff --git a/src/dscim/menu/simple_storage.py b/src/dscim/menu/simple_storage.py index 5e9b4b76..a6e8ae9f 100644 --- a/src/dscim/menu/simple_storage.py +++ b/src/dscim/menu/simple_storage.py @@ -431,8 +431,8 @@ def gdppc(self): def adding_up_damages(self): """This property calls pre-calculated adding-up IR-level 'mean' over batches.""" - mean_cc = f"{self.ce_path}/adding_up_cc.zarr" - mean_no_cc = f"{self.ce_path}/adding_up_no_cc.zarr" + mean_cc = f"{self.ce_path}/adding_up_cc_eta{self.eta}.zarr" + mean_no_cc = f"{self.ce_path}/adding_up_no_cc_eta{self.eta}.zarr" if os.path.exists(mean_cc) and os.path.exists(mean_no_cc): self.logger.info( From 63eaa7b157bc7f58a20f1b96e523f579bb3e5864 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Wed, 11 Jun 2025 14:19:10 -0400 Subject: [PATCH 08/10] Revert "Add global default" This reverts commit 97040f3a2626d91dbea0988080beab923b101c92. --- src/dscim/menu/main_recipe.py | 38 ++++++++------------------------ src/dscim/menu/simple_storage.py | 4 ++-- 2 files changed, 11 insertions(+), 31 deletions(-) diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index a8b7fa0f..f26b76e8 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -106,7 +106,7 @@ def __init__( extrap_formula=None, fair_dims=None, save_files=None, - geography="globe", + geography=None, individual_region=None, **kwargs, ): @@ -507,7 +507,9 @@ def ce_no_cc(self): """Certainty equivalent of consumption without climate change damages""" return self.ce_no_cc_calculation() - def damage_function_points_xr(self) -> pd.DataFrame: + @cachedproperty + @save(name="damage_function_points") + def damage_function_points(self) -> pd.DataFrame: """Global damages by RCP/GCM or SLR Returns @@ -537,19 +539,6 @@ def damage_function_points_xr(self) -> pd.DataFrame: df = xr.merge([df, climate_ds]).sel(year = df.year) return df - - @cachedproperty - @save(name="damage_function_points") - def damage_function_points(self) -> pd.DataFrame: - """Global damages by RCP/GCM or SLR - - Returns - -------- - pd.DataFrame - """ - pts = self.damage_function_points_xr().to_dataframe().reset_index() - - return pts def damage_function_calculation(self, damage_function_points, global_consumption): @@ -1342,20 +1331,11 @@ def calculate_stream_discount_factors(self, discounting_type, fair_aggregation): # holding population constant # from 2100 to 2300 with 2099 values - - individual_region = self.individual_region - if individual_region is None: - full_pop = self.collapsed_pop.sum("region") - full_pop = full_pop.reindex( - year=range(full_pop.year.min().values, self.ext_end_year + 1), - method="ffill", - ) - else: - full_pop = self.collapsed_pop.sel(region = individual_region,drop=True) - full_pop = full_pop.reindex( - year=range(full_pop.year.min().values, self.ext_end_year + 1), - method="ffill", - ) + full_pop = self.collapsed_pop.sum("region") + full_pop = full_pop.reindex( + year=range(full_pop.year.min().values, self.ext_end_year + 1), + method="ffill", + ) # for aggregations other than uncollapsed, # we need to collapse over pop dimensions diff --git a/src/dscim/menu/simple_storage.py b/src/dscim/menu/simple_storage.py index a6e8ae9f..5e9b4b76 100644 --- a/src/dscim/menu/simple_storage.py +++ b/src/dscim/menu/simple_storage.py @@ -431,8 +431,8 @@ def gdppc(self): def adding_up_damages(self): """This property calls pre-calculated adding-up IR-level 'mean' over batches.""" - mean_cc = f"{self.ce_path}/adding_up_cc_eta{self.eta}.zarr" - mean_no_cc = f"{self.ce_path}/adding_up_no_cc_eta{self.eta}.zarr" + mean_cc = f"{self.ce_path}/adding_up_cc.zarr" + mean_no_cc = f"{self.ce_path}/adding_up_no_cc.zarr" if os.path.exists(mean_cc) and os.path.exists(mean_no_cc): self.logger.info( From 20f11f521d958fdb8b96280c1c9626f04148895c Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Wed, 11 Jun 2025 14:20:39 -0400 Subject: [PATCH 09/10] Add alternative risk neutral etas --- src/dscim/menu/simple_storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dscim/menu/simple_storage.py b/src/dscim/menu/simple_storage.py index 5e9b4b76..51f2ad91 100644 --- a/src/dscim/menu/simple_storage.py +++ b/src/dscim/menu/simple_storage.py @@ -431,9 +431,9 @@ def gdppc(self): def adding_up_damages(self): """This property calls pre-calculated adding-up IR-level 'mean' over batches.""" - mean_cc = f"{self.ce_path}/adding_up_cc.zarr" - mean_no_cc = f"{self.ce_path}/adding_up_no_cc.zarr" - + mean_cc = f"{self.ce_path}/adding_up_cc_eta{self.eta}.zarr" + mean_no_cc = f"{self.ce_path}/adding_up_no_cc_eta{self.eta}.zarr" + if os.path.exists(mean_cc) and os.path.exists(mean_no_cc): self.logger.info( f"Adding up aggregated damages found at {mean_cc}, {mean_no_cc}. These are being loaded..." From eedd397ddee7730237bebf6990df5158ef9c9c57 Mon Sep 17 00:00:00 2001 From: Jonah Gilbert Date: Wed, 18 Jun 2025 11:12:57 -0400 Subject: [PATCH 10/10] Add eta to risk neutral damages --- src/dscim/menu/baseline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dscim/menu/baseline.py b/src/dscim/menu/baseline.py index 4a7c199c..8455c84d 100644 --- a/src/dscim/menu/baseline.py +++ b/src/dscim/menu/baseline.py @@ -63,8 +63,8 @@ def damages_calculation(self, geography) -> xr.Dataset: @property def calculated_damages(self): - mean_cc = f"{self.ce_path}/adding_up_cc.zarr" - mean_no_cc = f"{self.ce_path}/adding_up_no_cc.zarr" + mean_cc = f"{self.ce_path}/adding_up_cc_eta{self.eta}.zarr" + mean_no_cc = f"{self.ce_path}/adding_up_no_cc_eta{self.eta}.zarr" if os.path.exists(mean_cc) and os.path.exists(mean_no_cc): self.logger.info(