diff --git a/src/dscim/menu/baseline.py b/src/dscim/menu/baseline.py index 892c3cc7..8455c84d 100644 --- a/src/dscim/menu/baseline.py +++ b/src/dscim/menu/baseline.py @@ -1,5 +1,7 @@ +import pandas as pd +import xarray as xr from dscim.menu.main_recipe import MainRecipe - +import os class Baseline(MainRecipe): """Adding up option""" @@ -16,9 +18,68 @@ def ce_no_cc(self): def global_damages_calculation(self): """Call global damages""" return self.adding_up_damages.to_dataframe("damages").reset_index() - + + def damages_calculation(self, geography) -> xr.Dataset: + """Aggregate damages to country level + + Returns + -------- + pd.DataFrame + """ + + self.logger.info(f"Calculating damages") + if self.individual_region: + dams_collapse = self.calculated_damages * self.collapsed_pop.sel(region = self.individual_region, drop=True) + else: + dams_collapse = self.calculated_damages * self.collapsed_pop + + if geography == "ir": + pass + elif geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in dams_collapse.region.values: + territories.append(mapping_dict[region[:3]]) + + dams_collapse = (dams_collapse + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif geography == "globe": + dams_collapse = dams_collapse.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + + if "gwr" in self.discounting_type: + dams_collapse = dams_collapse.assign( + ssp=str(list(self.gdp.ssp.values)), + model=str(list(self.gdp.model.values)), + ) + + return dams_collapse.to_dataset(name = 'damages') + + @property def calculated_damages(self): - pass + mean_cc = f"{self.ce_path}/adding_up_cc_eta{self.eta}.zarr" + mean_no_cc = f"{self.ce_path}/adding_up_no_cc_eta{self.eta}.zarr" + + if os.path.exists(mean_cc) and os.path.exists(mean_no_cc): + self.logger.info( + f"Adding up aggregated damages found at {mean_cc}, {mean_no_cc}. These are being loaded..." + ) + damages = ( + (xr.open_zarr(mean_no_cc).no_cc - xr.open_zarr(mean_cc).cc) + ) + else: + raise NotImplementedError( + f"Adding up reduced damages not found: {mean_no_cc}, {mean_cc}. Please reduce damages for for `adding_up`." + ) + if self.individual_region: + damages = damages.sel(region=self.individual_region,drop=True) + return self.cut(damages) def ce_cc_calculation(self): pass @@ -32,15 +93,58 @@ def ce_no_cc_calculation(self): def global_consumption_calculation(self, disc_type): """Calculate global consumption""" + if self.geography == 'ir': + gdp = self.gdp + elif self.geography == 'country': + #group gdp by some grouping and collapse + gdp = self.gdp.sum(dim=["region"]) + else: + gdp = self.gdp.sum(dim=["region"]) + if (disc_type == "constant") or ("ramsey" in disc_type): - global_cons_no_cc = self.gdp.sum(dim=["region"]) + global_cons_no_cc = gdp elif disc_type == "constant_model_collapsed": - global_cons_no_cc = self.gdp.sum(dim=["region"]).mean(dim=["model"]) + global_cons_no_cc = gdp.mean(dim=["model"]) elif "gwr" in disc_type: - global_cons_no_cc = self.gdp.sum(dim=["region"]).mean(dim=["model", "ssp"]) + global_cons_no_cc = gdp.mean(dim=["model", "ssp"]) global_cons_no_cc.name = f"global_cons_{disc_type}" return global_cons_no_cc + +# def global_consumption_calculation(self, disc_type): +# """Calculate global consumption + +# Returns +# ------- +# xr.DataArray +# """ + +# if self.geography == 'ir': +# gdp = self.gdp +# elif self.geography == 'country': +# #group gdp by some grouping and collapse +# gdp = self.gdp.sum(dim=["region"]) +# else: +# gdp = self.gdp.sum(dim=["region"]) + +# if (disc_type == "constant") or ("ramsey" in disc_type): +# global_cons_no_cc = gdp + +# elif disc_type == "constant_model_collapsed": +# global_cons_no_cc = gdp.mean(dim=["model"]) + +# elif "gwr" in disc_type: +# ce_cons = self.ce(self.gdppc, dims=["ssp", "model"]) +# global_cons_no_cc = (ce_cons * self.collapsed_pop).sum(dim=["region"]) + +# # Convert to array in case xarray becames temperamental. This is a hack +# # that need to be changed +# if isinstance(global_cons_no_cc, xr.Dataset): +# global_cons_no_cc = global_cons_no_cc.to_array() + +# global_cons_no_cc.name = f"global_cons_{disc_type}" + +# return global_cons_no_cc diff --git a/src/dscim/menu/main_recipe.py b/src/dscim/menu/main_recipe.py index 0ab531d7..f26b76e8 100644 --- a/src/dscim/menu/main_recipe.py +++ b/src/dscim/menu/main_recipe.py @@ -106,6 +106,8 @@ def __init__( extrap_formula=None, fair_dims=None, save_files=None, + geography=None, + individual_region=None, **kwargs, ): if scc_quantiles is None: @@ -193,6 +195,10 @@ def __init__( "global_consumption", "global_consumption_no_pulse", ] + + if "country_ISOs" in kwargs: + self.countries_mapping = pd.read_csv(kwargs["country_ISOs"]) + self.countries = self.countries_mapping.MatchedISO.dropna().unique() super().__init__( sector_path=sector_path, @@ -203,8 +209,11 @@ def __init__( eta=eta, subset_dict=subset_dict, ce_path=ce_path, + geography=geography, + kwargs=kwargs, ) + self.rho = rho self.eta = eta self.fit_type = fit_type @@ -231,6 +240,8 @@ def __init__( self.extrap_formula = extrap_formula self.fair_dims = fair_dims self.save_files = save_files + self.geography = geography + self.individual_region = individual_region self.__dict__.update(**kwargs) self.kwargs = kwargs @@ -439,7 +450,7 @@ def output_attrs(self): return meta @cachedproperty - def collapsed_pop(self): + def collapsed_pop(self, geography = None): """Collapse population according to discount type.""" if (self.discounting_type == "constant") or ("ramsey" in self.discounting_type): pop = self.pop @@ -447,6 +458,27 @@ def collapsed_pop(self): pop = self.pop.mean("model") elif "gwr" in self.discounting_type: pop = self.pop.mean(["model", "ssp"]) + + if geography == "ir": + pass + elif geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in dams_collapse.region.values: + territories.append(mapping_dict(region[:3])) + + pop = (pop + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif geography == "globe": + pop = pop.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + return pop @abstractmethod @@ -484,30 +516,31 @@ def damage_function_points(self) -> pd.DataFrame: -------- pd.DataFrame """ - df = self.global_damages_calculation() + df = self.damages_calculation(geography = self.geography) - if "slr" in df.columns: - df = df.merge(self.climate.gmsl, on=["year", "slr"]) - if "gcm" in df.columns: - df = df.merge(self.climate.gmst, on=["year", "gcm", "rcp"]) + climate_vars = [] + if "slr" in df.coords: + climate_vars.append(self.climate.gmsl.set_index(['slr','year']).to_xarray()) + if "gcm" in df.coords: + climate_vars.append(self.climate.gmst.set_index(['gcm','rcp','year']).to_xarray()) + climate_ds = xr.merge(climate_vars) # removing illegal combinations from estimation - if any([i in df.ssp.unique() for i in ["SSP1", "SSP5"]]): + if any([i in df.ssp.values for i in ["SSP1", "SSP5"]]): self.logger.info("Dropping illegal model combinations.") - for var in [i for i in df.columns if i in ["anomaly", "gmsl"]]: - df.loc[ - ((df.ssp == "SSP1") & (df.rcp == "rcp85")) - | ((df.ssp == "SSP5") & (df.rcp == "rcp45")), - var, - ] = np.nan + xr.where(((df.ssp == "SSP1") & (df.rcp == "rcp85")) + | ((df.ssp == "SSP5") & (df.rcp == "rcp45")),np.nan,df) # agriculture lacks ACCESS0-1/rcp85 combo if "agriculture" in self.sector: self.logger.info("Dropping illegal model combinations for agriculture.") - df.loc[(df.gcm == "ACCESS1-0") & (df.rcp == "rcp85"), "anomaly"] = np.nan + xr.where((climate_ds.gcm == "ACCESS1-0") & (climate_ds.rcp == "rcp85"), np.nan, climate_ds) + + df = xr.merge([df, climate_ds]).sel(year = df.year) return df + def damage_function_calculation(self, damage_function_points, global_consumption): """The damage function model fit may be : (1) ssp specific, (2) ssp-model specific, (3) unique across ssp-model. This depends on the type of discounting. In each case the input data passed to the fitting functions and the formatting of the returned @@ -790,17 +823,39 @@ def global_consumption_calculation(self, disc_type): def global_consumption_per_capita(self, disc_type): """Global consumption per capita - + Returns ------- xr.DataArray """ - + # Calculate global consumption per capita array_pc = self.global_consumption_calculation( disc_type - ) / self.collapsed_pop.sum("region") - + ) + + if self.geography == "ir": + array_pc = array_pc / self.collapsed_pop + elif self.geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in array_pc.region.values: + territories.append(mapping_dict[region[:3]]) + + pop = (self.collapsed_pop + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + + array_pc = (array_pc / pop) + elif self.geography == "globe": + array_pc = (array_pc / self.collapsed_pop.sum("region")).assign_coords({'region':'globe'}).expand_dims('region') + if self.NAME == "equity": # equity recipe's growth is capped to # risk aversion recipe's growth rates @@ -812,7 +867,7 @@ def global_consumption_per_capita(self, disc_type): method="growth_constant", cap=self.risk_aversion_growth_rates(), ) - + else: extrapolated = extrapolate( xr_array=array_pc, @@ -821,9 +876,9 @@ def global_consumption_per_capita(self, disc_type): interp_year=self.ext_end_year, method="growth_constant", ) - + complete_array = xr.concat([array_pc, extrapolated], dim="year") - + return complete_array @cachedproperty @@ -831,25 +886,44 @@ def global_consumption_per_capita(self, disc_type): def global_consumption(self): """Global consumption without climate change""" + individual_region = self.individual_region + # rff simulation means that GDP already exists out to 2300 if 2300 in self.gdp.year: self.logger.debug("Global consumption found up to 2300.") - global_cons = self.gdp.sum("region").rename("global_consumption") + if individual_region is not None: + global_cons = self.gdp.sel(region = individual_region,drop=True).rename("global_consumption") + else: + global_cons = self.gdp.sum("region").rename("global_consumption") else: self.logger.info("Extrapolating global consumption.") # holding population constant # from 2100 to 2300 with 2099 values - pop = self.collapsed_pop.sum("region") + if individual_region is None: + pop = self.collapsed_pop + if self.geography == "ir": + pass + elif self.geography == "country": + pass + elif self.geography == "globe": + pop = pop.sum("region") + else: + pop = self.collapsed_pop.sel(region = individual_region, drop=True) pop = pop.reindex( year=range(pop.year.min().values, self.ext_end_year + 1), method="ffill", ) # Calculate global consumption back by - global_cons = ( - self.global_consumption_per_capita(self.discounting_type) * pop - ) + if individual_region is None: + global_cons = ( + self.global_consumption_per_capita(self.discounting_type) * pop + ) + else: + global_cons = ( + self.global_consumption_per_capita(self.discounting_type).sel(region = individual_region,drop=True) * pop + ) # Add dimension # @TODO: remove this line altogether @@ -957,18 +1031,25 @@ def gmsl_max(self): @save(name="global_consumption_no_pulse") def global_consumption_no_pulse(self): """Global consumption under FAIR control scenario.""" - + individual_region = self.individual_region fair_control = self.climate.fair_control if self.clip_gmsl: fair_control["gmsl"] = np.minimum(fair_control["gmsl"], self.gmsl_max) - damages = compute_damages( - fair_control, - betas=self.damage_function_coefficients, - formula=self.formula, - ) - + if individual_region is not None: + damages = compute_damages( + fair_control, + betas=self.damage_function_coefficients.sel(region = individual_region), + formula=self.formula, + ) + else: + damages = compute_damages( + fair_control, + betas=self.damage_function_coefficients, + formula=self.formula, + ) + cc_cons = self.global_consumption - damages gc_no_pulse = [] @@ -988,18 +1069,25 @@ def global_consumption_no_pulse(self): @save(name="global_consumption_pulse") def global_consumption_pulse(self): """Global consumption under FAIR pulse scenario.""" - + individual_region = self.individual_region fair_pulse = self.climate.fair_pulse if self.clip_gmsl: fair_pulse["gmsl"] = np.minimum(fair_pulse["gmsl"], self.gmsl_max) - damages = compute_damages( - fair_pulse, - betas=self.damage_function_coefficients, - formula=self.formula, - ) - + if individual_region is not None: + damages = compute_damages( + fair_pulse, + betas=self.damage_function_coefficients.sel(region = individual_region), + formula=self.formula, + ) + else: + damages = compute_damages( + fair_pulse, + betas=self.damage_function_coefficients, + formula=self.formula, + ) + cc_cons = self.global_consumption - damages gc_no_pulse = [] for wp in self.weitzman_parameter: diff --git a/src/dscim/menu/risk_aversion.py b/src/dscim/menu/risk_aversion.py index a06f9030..e99783c1 100644 --- a/src/dscim/menu/risk_aversion.py +++ b/src/dscim/menu/risk_aversion.py @@ -43,6 +43,46 @@ def calculated_damages(self) -> xr.DataArray: """Calculate damages (difference between CEs)""" return self.ce_no_cc - self.ce_cc + def damages_calculation(self, geography) -> xr.Dataset: + """Aggregate damages to country level + + Returns + -------- + pd.DataFrame + """ + + self.logger.info(f"Calculating damages") + + dams_collapse = self.calculated_damages * self.collapsed_pop + + if geography == "ir": + pass + elif geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in dams_collapse.region.values: + territories.append(mapping_dict[region[:3]]) + + dams_collapse = (dams_collapse + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif geography == "globe": + dams_collapse = dams_collapse.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + + if "gwr" in self.discounting_type: + dams_collapse = dams_collapse.assign( + ssp=str(list(self.gdp.ssp.values)), + model=str(list(self.gdp.model.values)), + ) + + return dams_collapse.to_dataset(name = 'damages') + def global_damages_calculation(self) -> pd.DataFrame: """Aggregate damages to global level @@ -61,7 +101,7 @@ def global_damages_calculation(self) -> pd.DataFrame: ) return df - + def global_consumption_calculation(self, disc_type): """Calculate global consumption @@ -70,11 +110,19 @@ def global_consumption_calculation(self, disc_type): xr.DataArray """ + if self.geography == 'ir': + gdp = self.gdp + elif self.geography == 'country': + #group gdp by some grouping and collapse + gdp = self.gdp.sum(dim=["region"]) + else: + gdp = self.gdp.sum(dim=["region"]) + if (disc_type == "constant") or ("ramsey" in disc_type): - global_cons_no_cc = self.gdp.sum(dim=["region"]) + global_cons_no_cc = gdp elif disc_type == "constant_model_collapsed": - global_cons_no_cc = self.gdp.sum(dim=["region"]).mean(dim=["model"]) + global_cons_no_cc = gdp.mean(dim=["model"]) elif "gwr" in disc_type: ce_cons = self.ce(self.gdppc, dims=["ssp", "model"]) diff --git a/src/dscim/menu/simple_storage.py b/src/dscim/menu/simple_storage.py index 37dd7ce3..51f2ad91 100644 --- a/src/dscim/menu/simple_storage.py +++ b/src/dscim/menu/simple_storage.py @@ -96,7 +96,7 @@ def gmst(self): @property def gmsl(self): """Cached GMSL anomalies""" - gmsl = xr.open_zarr(self.gmsl_path).gmsl.to_dataframe().reset_index() + gmsl = xr.open_zarr(self.gmsl_path).to_dataframe().reset_index() return gmsl @@ -306,7 +306,12 @@ def __init__( histclim=None, ce_path=None, subset_dict=None, + geography=None, + **kwargs, ): + if geography is None: + geography = "global" + self.sector_path = sector_path self.save_path = save_path self.gdppc_bottom_code = gdppc_bottom_code @@ -317,7 +322,15 @@ def __init__( self.histclim = histclim self.ce_path = ce_path self.eta = eta + self.__dict__.update(**kwargs) + self.geography = geography + self.kwargs = kwargs + + if "country_ISOs" in kwargs: + self.countries_mapping = pd.read_csv(kwargs["country_ISOs"]) + self.countries = self.countries_mapping.MatchedISO.dropna().unique() + self.logger = logging.getLogger(__name__) def cut(self, xr_array, end_year=2099): @@ -351,7 +364,41 @@ def cut(self, xr_array, end_year=2099): ) return xr_data + + @property + def geography_collapsed_econ_vars(self): + + pop_collapse = self.pop + + if self.geography == "ir": + pass + elif self.geography == "country": + territories = [] + mapping_dict = {} + for ii, row in self.countries_mapping.iterrows(): + mapping_dict[row["ISO"]] = row["MatchedISO"] + if row["MatchedISO"] == "nan": + mapping_dict[row["ISO"]] = "nopop" + + for region in pop_collapse.region.values: + territories.append(mapping_dict(region[:3])) + + pop_collapse = (pop_collapse + .assign_coords({'region':territories}) + .groupby('region') + .sum()) + elif self.geography == "global": + pop_collapse = pop_collapse.sum(dim="region").assign_coords({'region':'globe'}).expand_dims('region') + + if "gwr" in self.discounting_type: + pop_collapse = pop_collapse.assign( + ssp=str(list(self.gdp.ssp.values)), + model=str(list(self.gdp.model.values)), + ) + + return pop_collapse.to_dataset(name = 'damages') + @property def cut_econ_vars(self): """Economic variables from SSP object""" @@ -384,9 +431,9 @@ def gdppc(self): def adding_up_damages(self): """This property calls pre-calculated adding-up IR-level 'mean' over batches.""" - mean_cc = f"{self.ce_path}/adding_up_cc.zarr" - mean_no_cc = f"{self.ce_path}/adding_up_no_cc.zarr" - + mean_cc = f"{self.ce_path}/adding_up_cc_eta{self.eta}.zarr" + mean_no_cc = f"{self.ce_path}/adding_up_no_cc_eta{self.eta}.zarr" + if os.path.exists(mean_cc) and os.path.exists(mean_no_cc): self.logger.info( f"Adding up aggregated damages found at {mean_cc}, {mean_no_cc}. These are being loaded..." diff --git a/src/dscim/preprocessing/input_damages.py b/src/dscim/preprocessing/input_damages.py index e55f6bb0..b84f5808 100644 --- a/src/dscim/preprocessing/input_damages.py +++ b/src/dscim/preprocessing/input_damages.py @@ -808,6 +808,8 @@ def prep( if damages[v].dtype == object: damages[v] = damages[v].astype("unicode") + damages.coords['gcm'] = damages.coords['gcm'].astype('object') + if i == 0: damages.to_zarr( f"{outpath}/impacts-darwin-montecarlo-damages-v{mortality_version}.zarr", diff --git a/src/dscim/utils/utils.py b/src/dscim/utils/utils.py index eff80d9a..18d2a15b 100644 --- a/src/dscim/utils/utils.py +++ b/src/dscim/utils/utils.py @@ -10,6 +10,37 @@ logger = logging.getLogger(__name__) +def fit_project(chunk_reg, formula, type_estimation, quantiles, index_dims): + # individual block's mapping function + df_reg = chunk_reg.to_dataframe().reset_index() + + year_range = df_reg.year.values + + df_tot = [] + for year in year_range: + time_window = range(year - 2, year + 3) + df = df_reg[df_reg.year.isin(time_window)] + params = modeler( + df=df, + formula=formula, + type_estimation=type_estimation, + quantiles=quantiles, + ) + params.assign(year = year) + df_tot.append(params) + + params = pd.concat(df_tot) + for dim in index_dims: + if chunk_reg[dim].size == 1: + params[dim] = chunk_reg[dim].values + else: + params[dim] = str(list(chunk_reg[dim].values)) + + print(params, flush=True) + + params = params.set_index(index_dims + ['year',]) + return(params.to_xarray()) + def modeler(df, formula, type_estimation, exog, quantiles=None): """Wrapper function for statsmodels functions (OLS and quantiles) @@ -66,15 +97,15 @@ def modeler(df, formula, type_estimation, exog, quantiles=None): q_params.append(params_quantile) fitted = pd.DataFrame(dict(exog, y_hat=quant_mod.predict(exog=exog))) - q_y_hat.append(fitted.assign(q=quant)) + #q_y_hat.append(fitted.assign(q=quant)) params = pd.concat(q_params) - y_hat = pd.DataFrame(pd.concat(q_y_hat)) + #y_hat = pd.DataFrame(pd.concat(q_y_hat)) else: raise NotImplementedError(f"{type_estimation} is not valid option!") - return params, y_hat + return params, None#y_hat def get_weights(quantiles): @@ -432,11 +463,11 @@ def model_outputs( ).squeeze() # Extrapolate by multiplying fixed params by ratios - extrap_preds = y_hat_df.sel(year=fix_global_c) * global_c_factors + #extrap_preds = y_hat_df.sel(year=fix_global_c) * global_c_factors extrap_params = param_df.sel(year=fix_global_c) * global_c_factors # concatenate extrapolation and pre-2100 - preds = xr.concat([y_hat_df, extrap_preds], dim="year") + #preds = xr.concat([y_hat_df, extrap_preds], dim="year") parameters = xr.concat([param_df, extrap_params], dim="year") # For the local case we don't care about the time dimension, the @@ -450,12 +481,11 @@ def model_outputs( # Return all results res = { "parameters": parameters, - "preds": preds, + "preds": None, } return res - def compute_damages(anomaly, betas, formula): """Calculate damages using FAIR anomalies (either control or pulse).