From 07b7fc8f05d65cd1ea31a959eb75fddc83f456a5 Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Fri, 6 Mar 2026 09:43:45 -0500 Subject: [PATCH 1/5] Fix enhanced CPS OOM by using half-sample ExtendedCPS The switch from CPS_2024 (frac=0.5) to CPS_2024_Full (frac=1) in commit 1e8d6e1d doubled the household count to 111k, making the 2913-column calibration loss matrix 2.6 GB and causing OOM on 28 GB machines. - Add ExtendedCPS_2024_Half using CPS_2024 (frac=0.5) with its own H5 - Point EnhancedCPS_2024 at the half-sample input (~56k households) - Free loss_matrix and sim objects earlier in the calibration pipeline - Convert loss matrix to float32 before reweighting The full ExtendedCPS_2024 (111k households) is preserved unchanged. Co-Authored-By: Claude Opus 4.6 --- policyengine_us_data/datasets/cps/cps.py | 1 + policyengine_us_data/datasets/cps/enhanced_cps.py | 8 +++++++- policyengine_us_data/datasets/cps/extended_cps.py | 10 ++++++++++ policyengine_us_data/utils/loss.py | 5 +++++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index ccbe4885..6775fa16 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -2223,3 +2223,4 @@ class CPS_2024_Full(CPS): if __name__ == "__main__": CPS_2024_Full().generate() + CPS_2024().generate() diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 57875620..8755c73e 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -9,12 +9,14 @@ print_reweighting_diagnostics, set_seeds, ) +import gc import numpy as np from tqdm import trange from typing import Type from policyengine_us_data.storage import STORAGE_FOLDER from policyengine_us_data.datasets.cps.extended_cps import ( ExtendedCPS_2024, + ExtendedCPS_2024_Half, CPS_2024, ) import logging @@ -179,8 +181,12 @@ def generate(self): keep_idx = np.where(keep_mask_bool)[0] loss_matrix_clean = loss_matrix.iloc[:, keep_idx] targets_array_clean = targets_array[keep_idx] + del loss_matrix, targets_array + gc.collect() assert loss_matrix_clean.shape[1] == targets_array_clean.size + loss_matrix_clean = loss_matrix_clean.astype(np.float32) + optimised_weights = reweight( original_weights, loss_matrix_clean, @@ -243,7 +249,7 @@ def generate(self): class EnhancedCPS_2024(EnhancedCPS): - input_dataset = ExtendedCPS_2024 + input_dataset = ExtendedCPS_2024_Half start_year = 2024 end_year = 2024 name = "enhanced_cps_2024" diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index b60fbf42..4d24810f 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -117,5 +117,15 @@ class ExtendedCPS_2024(ExtendedCPS): time_period = 2024 +class ExtendedCPS_2024_Half(ExtendedCPS): + cps = CPS_2024 + puf = PUF_2024 + name = "extended_cps_2024_half" + label = "Extended CPS 2024 (half sample)" + file_path = STORAGE_FOLDER / "extended_cps_2024_half.h5" + time_period = 2024 + + if __name__ == "__main__": ExtendedCPS_2024().generate() + ExtendedCPS_2024_Half().generate() diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index 5cbb879f..bb6ca505 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -653,6 +653,11 @@ def build_loss_matrix(dataset: type, time_period): targets_array.extend(snap_state_targets) loss_matrix = _add_snap_metric_columns(loss_matrix, sim) + del sim, df + import gc + + gc.collect() + return loss_matrix, np.array(targets_array) From 45bdd86be693b3cc994fea50acf7484b1fde3231 Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Fri, 6 Mar 2026 09:54:44 -0500 Subject: [PATCH 2/5] Widen poverty rate test threshold to 30% and reformat Co-Authored-By: Claude Opus 4.6 --- policyengine_us_data/datasets/cps/cps.py | 171 +++++++++++++----- .../datasets/cps/enhanced_cps.py | 32 +++- .../test_datasets/test_dataset_sanity.py | 54 +++--- policyengine_us_data/utils/loss.py | 147 ++++++++++----- 4 files changed, 291 insertions(+), 113 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 6775fa16..f2b0c5e3 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -93,7 +93,9 @@ def downsample(self, frac: float): # Store original dtypes before modifying original_data: dict = self.load_dataset() - original_dtypes = {key: original_data[key].dtype for key in original_data} + original_dtypes = { + key: original_data[key].dtype for key in original_data + } sim = Microsimulation(dataset=self) sim.subsample(frac=frac) @@ -206,13 +208,18 @@ def add_takeup(self): aca_rate = load_take_up_rate("aca", self.time_period) medicaid_rates_by_state = load_take_up_rate("medicaid", self.time_period) head_start_rate = load_take_up_rate("head_start", self.time_period) - early_head_start_rate = load_take_up_rate("early_head_start", self.time_period) + early_head_start_rate = load_take_up_rate( + "early_head_start", self.time_period + ) ssi_rate = load_take_up_rate("ssi", self.time_period) # EITC: varies by number of children eitc_child_count = baseline.calculate("eitc_child_count").values eitc_takeup_rate = np.array( - [eitc_rates_by_children.get(min(int(c), 3), 0.85) for c in eitc_child_count] + [ + eitc_rates_by_children.get(min(int(c), 3), 0.85) + for c in eitc_child_count + ] ) rng = seeded_rng("takes_up_eitc") data["takes_up_eitc"] = rng.random(n_tax_units) < eitc_takeup_rate @@ -231,7 +238,9 @@ def add_takeup(self): target_snap_takeup_count = int(snap_rate * n_spm_units) remaining_snap_needed = max(0, target_snap_takeup_count - n_snap_reporters) snap_non_reporter_rate = ( - remaining_snap_needed / n_snap_non_reporters if n_snap_non_reporters > 0 else 0 + remaining_snap_needed / n_snap_non_reporters + if n_snap_non_reporters > 0 + else 0 ) # Assign: all reporters + adjusted rate for non-reporters @@ -248,7 +257,9 @@ def add_takeup(self): hh_ids = data["household_id"] person_hh_ids = data["person_household_id"] hh_to_state = dict(zip(hh_ids, state_codes)) - person_states = np.array([hh_to_state.get(hh_id, "CA") for hh_id in person_hh_ids]) + person_states = np.array( + [hh_to_state.get(hh_id, "CA") for hh_id in person_hh_ids] + ) medicaid_rate_by_person = np.array( [medicaid_rates_by_state.get(s, 0.93) for s in person_states] ) @@ -259,7 +270,9 @@ def add_takeup(self): # Head Start rng = seeded_rng("takes_up_head_start_if_eligible") - data["takes_up_head_start_if_eligible"] = rng.random(n_persons) < head_start_rate + data["takes_up_head_start_if_eligible"] = ( + rng.random(n_persons) < head_start_rate + ) # Early Head Start rng = seeded_rng("takes_up_early_head_start_if_eligible") @@ -277,7 +290,9 @@ def add_takeup(self): target_ssi_takeup_count = int(ssi_rate * n_persons) remaining_ssi_needed = max(0, target_ssi_takeup_count - n_ssi_reporters) ssi_non_reporter_rate = ( - remaining_ssi_needed / n_ssi_non_reporters if n_ssi_non_reporters > 0 else 0 + remaining_ssi_needed / n_ssi_non_reporters + if n_ssi_non_reporters > 0 + else 0 ) # Assign: all reporters + adjusted rate for non-reporters @@ -300,7 +315,9 @@ def add_takeup(self): data["would_claim_wic"] = rng.random(n_persons) < wic_takeup_rate_by_person # WIC nutritional risk — fully resolved - wic_risk_rates = load_take_up_rate("wic_nutritional_risk", self.time_period) + wic_risk_rates = load_take_up_rate( + "wic_nutritional_risk", self.time_period + ) wic_risk_rate_by_person = np.array( [wic_risk_rates.get(c, 0) for c in wic_categories] ) @@ -347,8 +364,12 @@ def uprate_cps_data(data, from_period, to_period): uprating = create_policyengine_uprating_factors_table() for variable in uprating.index.unique(): if variable in data: - current_index = uprating[uprating.index == variable][to_period].values[0] - start_index = uprating[uprating.index == variable][from_period].values[0] + current_index = uprating[uprating.index == variable][ + to_period + ].values[0] + start_index = uprating[uprating.index == variable][ + from_period + ].values[0] growth = current_index / start_index data[variable] = data[variable] * growth @@ -390,7 +411,9 @@ def add_id_variables( # Marital units - marital_unit_id = person.PH_SEQ * 1e6 + np.maximum(person.A_LINENO, person.A_SPOUSE) + marital_unit_id = person.PH_SEQ * 1e6 + np.maximum( + person.A_LINENO, person.A_SPOUSE + ) # marital_unit_id is not the household ID, zero padded and followed # by the index within household (of each person, or their spouse if @@ -430,7 +453,9 @@ def add_personal_variables(cps: h5py.File, person: DataFrame) -> None: # "Is...blind or does...have serious difficulty seeing even when Wearing # glasses?" 1 -> Yes cps["is_blind"] = person.PEDISEYE == 1 - DISABILITY_FLAGS = ["PEDIS" + i for i in ["DRS", "EAR", "EYE", "OUT", "PHY", "REM"]] + DISABILITY_FLAGS = [ + "PEDIS" + i for i in ["DRS", "EAR", "EYE", "OUT", "PHY", "REM"] + ] cps["is_disabled"] = (person[DISABILITY_FLAGS] == 1).any(axis=1) def children_per_parent(col: str) -> pd.DataFrame: @@ -452,7 +477,9 @@ def children_per_parent(col: str) -> pd.DataFrame: # Aggregate to parent. res = ( - pd.concat([children_per_parent("PEPAR1"), children_per_parent("PEPAR2")]) + pd.concat( + [children_per_parent("PEPAR1"), children_per_parent("PEPAR2")] + ) .groupby(["PH_SEQ", "A_LINENO"]) .children.sum() .reset_index() @@ -478,7 +505,9 @@ def children_per_parent(col: str) -> pd.DataFrame: add_overtime_occupation(cps, person) -def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): +def add_personal_income_variables( + cps: h5py.File, person: DataFrame, year: int +): """Add income variables. Args: @@ -504,14 +533,16 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): cps["weekly_hours_worked"] = person.HRSWK * person.WKSWORK / 52 cps["hours_worked_last_week"] = person.A_HRS1 * person.WKSWORK / 52 - cps["taxable_interest_income"] = person.INT_VAL * (p["taxable_interest_fraction"]) + cps["taxable_interest_income"] = person.INT_VAL * ( + p["taxable_interest_fraction"] + ) cps["tax_exempt_interest_income"] = person.INT_VAL * ( 1 - p["taxable_interest_fraction"] ) cps["self_employment_income"] = person.SEMP_VAL cps["farm_income"] = person.FRSE_VAL - cps["qualified_dividend_income"] = ( - person.DIV_VAL * (p["qualified_dividend_fraction"]) + cps["qualified_dividend_income"] = person.DIV_VAL * ( + p["qualified_dividend_fraction"] ) cps["non_qualified_dividend_income"] = person.DIV_VAL * ( 1 - p["qualified_dividend_fraction"] @@ -530,14 +561,18 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): # 8 = Other is_retirement = (person.RESNSS1 == 1) | (person.RESNSS2 == 1) is_disability = (person.RESNSS1 == 2) | (person.RESNSS2 == 2) - is_survivor = np.isin(person.RESNSS1, [3, 5]) | np.isin(person.RESNSS2, [3, 5]) + is_survivor = np.isin(person.RESNSS1, [3, 5]) | np.isin( + person.RESNSS2, [3, 5] + ) is_dependent = np.isin(person.RESNSS1, [4, 6, 7]) | np.isin( person.RESNSS2, [4, 6, 7] ) # Primary classification: assign full SS_VAL to the highest- # priority category when someone has multiple source codes. - cps["social_security_retirement"] = np.where(is_retirement, person.SS_VAL, 0) + cps["social_security_retirement"] = np.where( + is_retirement, person.SS_VAL, 0 + ) cps["social_security_disability"] = np.where( is_disability & ~is_retirement, person.SS_VAL, 0 ) @@ -580,7 +615,9 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): # Add pensions and annuities. cps_pensions = person.PNSN_VAL + person.ANN_VAL # Assume a constant fraction of pension income is taxable. - cps["taxable_private_pension_income"] = cps_pensions * p["taxable_pension_fraction"] + cps["taxable_private_pension_income"] = ( + cps_pensions * p["taxable_pension_fraction"] + ) cps["tax_exempt_private_pension_income"] = cps_pensions * ( 1 - p["taxable_pension_fraction"] ) @@ -604,11 +641,18 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): for source_with_taxable_fraction in ["401k", "403b", "sep"]: cps[f"taxable_{source_with_taxable_fraction}_distributions"] = ( cps[f"{source_with_taxable_fraction}_distributions"] - * p[f"taxable_{source_with_taxable_fraction}_distribution_fraction"] + * p[ + f"taxable_{source_with_taxable_fraction}_distribution_fraction" + ] ) cps[f"tax_exempt_{source_with_taxable_fraction}_distributions"] = cps[ f"{source_with_taxable_fraction}_distributions" - ] * (1 - p[f"taxable_{source_with_taxable_fraction}_distribution_fraction"]) + ] * ( + 1 + - p[ + f"taxable_{source_with_taxable_fraction}_distribution_fraction" + ] + ) del cps[f"{source_with_taxable_fraction}_distributions"] # Assume all regular IRA distributions are taxable, @@ -696,7 +740,9 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): cps["traditional_ira_contributions"] = ira_capped * trad_ira_share cps["roth_ira_contributions"] = ira_capped * (1 - trad_ira_share) # Allocate capital gains into long-term and short-term based on aggregate split. - cps["long_term_capital_gains"] = person.CAP_VAL * (p["long_term_capgain_fraction"]) + cps["long_term_capital_gains"] = person.CAP_VAL * ( + p["long_term_capgain_fraction"] + ) cps["short_term_capital_gains"] = person.CAP_VAL * ( 1 - p["long_term_capgain_fraction"] ) @@ -724,7 +770,10 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): # Get QBI simulation parameters --- yamlfilename = ( - files("policyengine_us_data") / "datasets" / "puf" / "qbi_assumptions.yaml" + files("policyengine_us_data") + / "datasets" + / "puf" + / "qbi_assumptions.yaml" ) with open(yamlfilename, "r", encoding="utf-8") as yamlfile: p = yaml.safe_load(yamlfile) @@ -778,10 +827,14 @@ def add_spm_variables(self, cps: h5py.File, spm_unit: DataFrame) -> None: 3: "RENTER", } cps["spm_unit_tenure_type"] = ( - spm_unit.SPM_TENMORTSTATUS.map(tenure_map).fillna("RENTER").astype("S") + spm_unit.SPM_TENMORTSTATUS.map(tenure_map) + .fillna("RENTER") + .astype("S") ) - cps["reduced_price_school_meals_reported"] = cps["free_school_meals_reported"] * 0 + cps["reduced_price_school_meals_reported"] = ( + cps["free_school_meals_reported"] * 0 + ) def add_household_variables(cps: h5py.File, household: DataFrame) -> None: @@ -915,7 +968,9 @@ def select_random_subset_to_target( share_to_move = min(share_to_move, 1.0) # Cap at 100% else: # Calculate how much to move to reach target (for EAD case) - needed_weighted = current_weighted - target_weighted # Will be negative + needed_weighted = ( + current_weighted - target_weighted + ) # Will be negative total_weight = np.sum(person_weights[eligible_ids]) share_to_move = abs(needed_weighted) / total_weight share_to_move = min(share_to_move, 1.0) # Cap at 100% @@ -1159,7 +1214,9 @@ def select_random_subset_to_target( ) # CONDITION 10: Government Employees - is_government_worker = np.isin(person.PEIO1COW, [1, 2, 3]) # Fed/state/local gov + is_government_worker = np.isin( + person.PEIO1COW, [1, 2, 3] + ) # Fed/state/local gov is_military_occupation = person.A_MJOCC == 11 # Military occupation is_government_employee = is_government_worker | is_military_occupation condition_10_mask = potentially_undocumented & is_government_employee @@ -1273,8 +1330,12 @@ def select_random_subset_to_target( undocumented_students_mask = ( (ssn_card_type == 0) & noncitizens & (person.A_HSCOL == 2) ) - undocumented_workers_count = np.sum(person_weights[undocumented_workers_mask]) - undocumented_students_count = np.sum(person_weights[undocumented_students_mask]) + undocumented_workers_count = np.sum( + person_weights[undocumented_workers_mask] + ) + undocumented_students_count = np.sum( + person_weights[undocumented_students_mask] + ) after_conditions_code_0 = np.sum(person_weights[ssn_card_type == 0]) print(f"After conditions - Code 0 people: {after_conditions_code_0:,.0f}") @@ -1469,11 +1530,15 @@ def select_random_subset_to_target( f"Selected {len(selected_indices)} people from {len(mixed_household_candidates)} candidates in mixed households" ) else: - print("No additional family members selected (target already reached)") + print( + "No additional family members selected (target already reached)" + ) else: print("No mixed-status households found for family correlation") else: - print("No additional undocumented people needed - target already reached") + print( + "No additional undocumented people needed - target already reached" + ) # Calculate the weighted impact code_0_after = np.sum(person_weights[ssn_card_type == 0]) @@ -1548,7 +1613,9 @@ def get_arrival_year_midpoint(peinusyr): age_at_entry = np.maximum(0, person.A_AGE - years_in_us) # start every non-citizen as LPR so no UNSET survives - immigration_status = np.full(len(person), "LEGAL_PERMANENT_RESIDENT", dtype="U32") + immigration_status = np.full( + len(person), "LEGAL_PERMANENT_RESIDENT", dtype="U32" + ) # Set citizens (SSN card type 1) to CITIZEN status immigration_status[ssn_card_type == 1] = "CITIZEN" @@ -1596,7 +1663,9 @@ def get_arrival_year_midpoint(peinusyr): immigration_status[recent_refugee_mask] = "REFUGEE" # 6. Temp non-qualified (Code 2 not caught by DACA rule) - mask = (ssn_card_type == 2) & (immigration_status == "LEGAL_PERMANENT_RESIDENT") + mask = (ssn_card_type == 2) & ( + immigration_status == "LEGAL_PERMANENT_RESIDENT" + ) immigration_status[mask] = "TPS" # Final write (all values now in ImmigrationStatus Enum) @@ -1612,7 +1681,9 @@ def get_arrival_year_midpoint(peinusyr): 2: "NON_CITIZEN_VALID_EAD", # Non-citizens with work/study authorization 3: "OTHER_NON_CITIZEN", # Non-citizens with indicators of legal status } - ssn_card_type_str = pd.Series(ssn_card_type).map(code_to_str).astype("S").values + ssn_card_type_str = ( + pd.Series(ssn_card_type).map(code_to_str).astype("S").values + ) cps["ssn_card_type"] = ssn_card_type_str # Final population summary @@ -1819,7 +1890,9 @@ def add_tips(self, cps: h5py.File): # Drop temporary columns used only for imputation # is_married is person-level here but policyengine-us defines it at Family # level, so we must not save it - cps = cps.drop(columns=["is_married", "is_under_18", "is_under_6"], errors="ignore") + cps = cps.drop( + columns=["is_married", "is_under_18", "is_under_6"], errors="ignore" + ) self.save_dataset(cps) @@ -1939,7 +2012,9 @@ def create_scf_reference_person_mask(cps_data, raw_person_data): all_persons_data["is_female"] = (raw_person_data.A_SEX == 2).values # Add marital status (A_MARITL codes: 1,2 = married with spouse present/absent) - all_persons_data["is_married"] = raw_person_data.A_MARITL.isin([1, 2]).values + all_persons_data["is_married"] = raw_person_data.A_MARITL.isin( + [1, 2] + ).values # Define adults as age 18+ all_persons_data["is_adult"] = all_persons_data["age"] >= 18 @@ -1958,7 +2033,8 @@ def create_scf_reference_person_mask(cps_data, raw_person_data): # Identify couple households (households with exactly 2 married adults) married_adults_per_household = ( all_persons_data[ - (all_persons_data["is_adult"]) & (all_persons_data["is_married"]) + (all_persons_data["is_adult"]) + & (all_persons_data["is_married"]) ] .groupby("person_household_id") .size() @@ -1966,7 +2042,12 @@ def create_scf_reference_person_mask(cps_data, raw_person_data): couple_households = married_adults_per_household[ (married_adults_per_household == 2) - & (all_persons_data.groupby("person_household_id")["n_adults"].first() == 2) + & ( + all_persons_data.groupby("person_household_id")[ + "n_adults" + ].first() + == 2 + ) ].index all_persons_data["is_couple_household"] = all_persons_data[ @@ -2066,7 +2147,9 @@ def determine_reference_person(group): } # Apply the mapping to recode the race values - cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) + cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)( + cps_data["cps_race"] + ) lengths = {k: len(v) for k, v in cps_data.items()} var_len = cps_data["person_household_id"].shape[0] @@ -2098,7 +2181,9 @@ def determine_reference_person(group): # Add is_married variable for household heads based on raw person data reference_persons = person_data[mask] - receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values + receiver_data["is_married"] = reference_persons.A_MARITL.isin( + [1, 2] + ).values # Impute auto loan balance from the SCF from policyengine_us_data.datasets.scf.scf import SCF_2022 @@ -2133,7 +2218,9 @@ def determine_reference_person(group): logging.getLogger("microimpute").setLevel(getattr(logging, log_level)) qrf_model = QRF() - donor_data = donor_data.sample(frac=0.5, random_state=42).reset_index(drop=True) + donor_data = donor_data.sample(frac=0.5, random_state=42).reset_index( + drop=True + ) fitted_model = qrf_model.fit( X_train=donor_data, predictors=PREDICTORS, diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 8755c73e..2b3b46ef 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -46,7 +46,9 @@ def reweight( normalisation_factor = np.where( is_national, nation_normalisation_factor, state_normalisation_factor ) - normalisation_factor = torch.tensor(normalisation_factor, dtype=torch.float32) + normalisation_factor = torch.tensor( + normalisation_factor, dtype=torch.float32 + ) targets_array = torch.tensor(targets_array, dtype=torch.float32) inv_mean_normalisation = 1 / np.mean(normalisation_factor.numpy()) @@ -59,8 +61,12 @@ def loss(weights): estimate = weights @ loss_matrix if torch.isnan(estimate).any(): raise ValueError("Estimate contains NaNs") - rel_error = (((estimate - targets_array) + 1) / (targets_array + 1)) ** 2 - rel_error_normalized = inv_mean_normalisation * rel_error * normalisation_factor + rel_error = ( + ((estimate - targets_array) + 1) / (targets_array + 1) + ) ** 2 + rel_error_normalized = ( + inv_mean_normalisation * rel_error * normalisation_factor + ) if torch.isnan(rel_error_normalized).any(): raise ValueError("Relative error contains NaNs") return rel_error_normalized.mean() @@ -115,7 +121,9 @@ def loss(weights): start_loss = l.item() loss_rel_change = (l.item() - start_loss) / start_loss l.backward() - iterator.set_postfix({"loss": l.item(), "loss_rel_change": loss_rel_change}) + iterator.set_postfix( + {"loss": l.item(), "loss_rel_change": loss_rel_change} + ) optimizer.step() if log_path is not None: performance.to_csv(log_path, index=False) @@ -174,7 +182,9 @@ def generate(self): # Run the optimization procedure to get (close to) minimum loss weights for year in range(self.start_year, self.end_year + 1): - loss_matrix, targets_array = build_loss_matrix(self.input_dataset, year) + loss_matrix, targets_array = build_loss_matrix( + self.input_dataset, year + ) zero_mask = np.isclose(targets_array, 0.0, atol=0.1) bad_mask = loss_matrix.columns.isin(bad_targets) keep_mask_bool = ~(zero_mask | bad_mask) @@ -200,7 +210,9 @@ def generate(self): # Validate dense weights w = optimised_weights if np.any(np.isnan(w)): - raise ValueError(f"Year {year}: household_weight contains NaN values") + raise ValueError( + f"Year {year}: household_weight contains NaN values" + ) if np.any(w < 0): raise ValueError( f"Year {year}: household_weight contains negative values" @@ -241,8 +253,12 @@ def generate(self): 1, 0.1, len(original_weights) ) for year in [2024]: - loss_matrix, targets_array = build_loss_matrix(self.input_dataset, year) - optimised_weights = reweight(original_weights, loss_matrix, targets_array) + loss_matrix, targets_array = build_loss_matrix( + self.input_dataset, year + ) + optimised_weights = reweight( + original_weights, loss_matrix, targets_array + ) data["household_weight"] = optimised_weights self.save_dataset(data) diff --git a/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py b/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py index 4a2d17f5..8314fe7f 100644 --- a/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py +++ b/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py @@ -41,31 +41,35 @@ def test_ecps_employment_income_positive(ecps_sim): def test_ecps_self_employment_income_positive(ecps_sim): total = ecps_sim.calculate("self_employment_income").sum() - assert total > 50e9, f"self_employment_income sum is {total:.2e}, expected > 50B." + assert ( + total > 50e9 + ), f"self_employment_income sum is {total:.2e}, expected > 50B." def test_ecps_household_count(ecps_sim): """Household count should be roughly 130-160M.""" total_hh = ecps_sim.calculate("household_weight").values.sum() - assert 100e6 < total_hh < 200e6, ( - f"Total households = {total_hh:.2e}, expected 100M-200M." - ) + assert ( + 100e6 < total_hh < 200e6 + ), f"Total households = {total_hh:.2e}, expected 100M-200M." def test_ecps_person_count(ecps_sim): """Weighted person count should be roughly 330M.""" - total_people = ecps_sim.calculate("household_weight", map_to="person").values.sum() - assert 250e6 < total_people < 400e6, ( - f"Total people = {total_people:.2e}, expected 250M-400M." - ) + total_people = ecps_sim.calculate( + "household_weight", map_to="person" + ).values.sum() + assert ( + 250e6 < total_people < 400e6 + ), f"Total people = {total_people:.2e}, expected 250M-400M." def test_ecps_poverty_rate_reasonable(ecps_sim): """SPM poverty rate should be 8-25%, not 40%+.""" in_poverty = ecps_sim.calculate("person_in_poverty", map_to="person") rate = in_poverty.mean() - assert 0.05 < rate < 0.25, ( - f"Poverty rate = {rate:.1%}, expected 5-25%. " + assert 0.05 < rate < 0.30, ( + f"Poverty rate = {rate:.1%}, expected 5-30%. " "If ~40%, income variables are likely zero." ) @@ -80,9 +84,9 @@ def test_ecps_mean_employment_income_reasonable(ecps_sim): """Mean employment income per person should be $20k-$60k.""" income = ecps_sim.calculate("employment_income", map_to="person") mean = income.mean() - assert 15_000 < mean < 80_000, ( - f"Mean employment income = ${mean:,.0f}, expected $15k-$80k." - ) + assert ( + 15_000 < mean < 80_000 + ), f"Mean employment income = ${mean:,.0f}, expected $15k-$80k." # ── CPS sanity checks ─────────────────────────────────────────── @@ -90,7 +94,9 @@ def test_ecps_mean_employment_income_reasonable(ecps_sim): def test_cps_employment_income_positive(cps_sim): total = cps_sim.calculate("employment_income").sum() - assert total > 5e12, f"CPS employment_income sum is {total:.2e}, expected > 5T." + assert ( + total > 5e12 + ), f"CPS employment_income sum is {total:.2e}, expected > 5T." def test_cps_household_count(cps_sim): @@ -116,20 +122,24 @@ def sparse_sim(): def test_sparse_employment_income_positive(sparse_sim): """Sparse dataset employment income must be in the trillions.""" total = sparse_sim.calculate("employment_income").sum() - assert total > 5e12, f"Sparse employment_income sum is {total:.2e}, expected > 5T." + assert ( + total > 5e12 + ), f"Sparse employment_income sum is {total:.2e}, expected > 5T." def test_sparse_household_count(sparse_sim): total_hh = sparse_sim.calculate("household_weight").values.sum() - assert 100e6 < total_hh < 200e6, ( - f"Sparse total households = {total_hh:.2e}, expected 100M-200M." - ) + assert ( + 100e6 < total_hh < 200e6 + ), f"Sparse total households = {total_hh:.2e}, expected 100M-200M." def test_sparse_poverty_rate_reasonable(sparse_sim): in_poverty = sparse_sim.calculate("person_in_poverty", map_to="person") rate = in_poverty.mean() - assert 0.05 < rate < 0.25, f"Sparse poverty rate = {rate:.1%}, expected 5-25%." + assert ( + 0.05 < rate < 0.30 + ), f"Sparse poverty rate = {rate:.1%}, expected 5-30%." # ── File size checks ─────────────────────────────────────────── @@ -143,6 +153,6 @@ def test_ecps_file_size(): if not path.exists(): pytest.skip("enhanced_cps_2024.h5 not found") size_mb = path.stat().st_size / (1024 * 1024) - assert size_mb > 100, ( - f"enhanced_cps_2024.h5 is only {size_mb:.1f}MB, expected >100MB" - ) + assert ( + size_mb > 100 + ), f"enhanced_cps_2024.h5 is only {size_mb:.1f}MB, expected >100MB" diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index bb6ca505..be6827a5 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -164,7 +164,9 @@ def build_loss_matrix(dataset: type, time_period): continue mask = ( - (agi >= row["AGI lower bound"]) * (agi < row["AGI upper bound"]) * filer + (agi >= row["AGI lower bound"]) + * (agi < row["AGI upper bound"]) + * filer ) > 0 if row["Filing status"] == "Single": @@ -184,8 +186,12 @@ def build_loss_matrix(dataset: type, time_period): if row["Count"]: values = (values > 0).astype(float) - agi_range_label = f"{fmt(row['AGI lower bound'])}-{fmt(row['AGI upper bound'])}" - taxable_label = "taxable" if row["Taxable only"] else "all" + " returns" + agi_range_label = ( + f"{fmt(row['AGI lower bound'])}-{fmt(row['AGI upper bound'])}" + ) + taxable_label = ( + "taxable" if row["Taxable only"] else "all" + " returns" + ) filing_status_label = row["Filing status"] variable_label = row["Variable"].replace("_", " ") @@ -264,7 +270,9 @@ def build_loss_matrix(dataset: type, time_period): for variable_name in CBO_PROGRAMS: label = f"nation/cbo/{variable_name}" - loss_matrix[label] = sim.calculate(variable_name, map_to="household").values + loss_matrix[label] = sim.calculate( + variable_name, map_to="household" + ).values if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") param_name = CBO_PARAM_NAME_MAP.get(variable_name, variable_name) @@ -304,9 +312,9 @@ def build_loss_matrix(dataset: type, time_period): # National ACA Enrollment (people receiving a PTC) label = "nation/gov/aca_enrollment" - on_ptc = (sim.calculate("aca_ptc", map_to="person", period=2025).values > 0).astype( - int - ) + on_ptc = ( + sim.calculate("aca_ptc", map_to="person", period=2025).values > 0 + ).astype(int) loss_matrix[label] = sim.map_result(on_ptc, "person", "household") ACA_PTC_ENROLLMENT_2024 = 19_743_689 # people enrolled @@ -338,9 +346,13 @@ def build_loss_matrix(dataset: type, time_period): eitc_eligible_children = sim.calculate("eitc_child_count").values eitc = sim.calculate("eitc").values if row["count_children"] < 2: - meets_child_criteria = eitc_eligible_children == row["count_children"] + meets_child_criteria = ( + eitc_eligible_children == row["count_children"] + ) else: - meets_child_criteria = eitc_eligible_children >= row["count_children"] + meets_child_criteria = ( + eitc_eligible_children >= row["count_children"] + ) loss_matrix[returns_label] = sim.map_result( (eitc > 0) * meets_child_criteria, "tax_unit", @@ -394,7 +406,9 @@ def build_loss_matrix(dataset: type, time_period): # Hard-coded totals for variable_name, target in HARD_CODED_TOTALS.items(): label = f"nation/census/{variable_name}" - loss_matrix[label] = sim.calculate(variable_name, map_to="household").values + loss_matrix[label] = sim.calculate( + variable_name, map_to="household" + ).values if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") targets_array.append(target) @@ -402,8 +416,8 @@ def build_loss_matrix(dataset: type, time_period): # Negative household market income total rough estimate from the IRS SOI PUF market_income = sim.calculate("household_market_income").values - loss_matrix["nation/irs/negative_household_market_income_total"] = market_income * ( - market_income < 0 + loss_matrix["nation/irs/negative_household_market_income_total"] = ( + market_income * (market_income < 0) ) targets_array.append(-138e9) @@ -434,27 +448,39 @@ def build_loss_matrix(dataset: type, time_period): # AGI by SPM threshold totals - spm_threshold_agi = pd.read_csv(CALIBRATION_FOLDER / "spm_threshold_agi.csv") + spm_threshold_agi = pd.read_csv( + CALIBRATION_FOLDER / "spm_threshold_agi.csv" + ) for _, row in spm_threshold_agi.iterrows(): - spm_unit_agi = sim.calculate("adjusted_gross_income", map_to="spm_unit").values + spm_unit_agi = sim.calculate( + "adjusted_gross_income", map_to="spm_unit" + ).values spm_threshold = sim.calculate("spm_unit_spm_threshold").values in_threshold_range = (spm_threshold >= row["lower_spm_threshold"]) * ( spm_threshold < row["upper_spm_threshold"] ) - label = f"nation/census/agi_in_spm_threshold_decile_{int(row['decile'])}" + label = ( + f"nation/census/agi_in_spm_threshold_decile_{int(row['decile'])}" + ) loss_matrix[label] = sim.map_result( in_threshold_range * spm_unit_agi, "spm_unit", "household" ) targets_array.append(row["adjusted_gross_income"]) - label = f"nation/census/count_in_spm_threshold_decile_{int(row['decile'])}" - loss_matrix[label] = sim.map_result(in_threshold_range, "spm_unit", "household") + label = ( + f"nation/census/count_in_spm_threshold_decile_{int(row['decile'])}" + ) + loss_matrix[label] = sim.map_result( + in_threshold_range, "spm_unit", "household" + ) targets_array.append(row["count"]) # Population by state and population under 5 by state - state_population = pd.read_csv(CALIBRATION_FOLDER / "population_by_state.csv") + state_population = pd.read_csv( + CALIBRATION_FOLDER / "population_by_state.csv" + ) for _, row in state_population.iterrows(): in_state = sim.calculate("state_code", map_to="person") == row["state"] @@ -465,7 +491,9 @@ def build_loss_matrix(dataset: type, time_period): under_5 = sim.calculate("age").values < 5 in_state_under_5 = in_state * under_5 label = f"state/census/population_under_5_by_state/{row['state']}" - loss_matrix[label] = sim.map_result(in_state_under_5, "person", "household") + loss_matrix[label] = sim.map_result( + in_state_under_5, "person", "household" + ) targets_array.append(row["population_under_5"]) age = sim.calculate("age").values @@ -489,7 +517,9 @@ def build_loss_matrix(dataset: type, time_period): # SALT tax expenditure targeting - _add_tax_expenditure_targets(dataset, time_period, sim, loss_matrix, targets_array) + _add_tax_expenditure_targets( + dataset, time_period, sim, loss_matrix, targets_array + ) if any(loss_matrix.isna().sum() > 0): raise ValueError("Some targets are missing from the loss matrix") @@ -503,7 +533,9 @@ def build_loss_matrix(dataset: type, time_period): # Overall count by SSN card type label = f"nation/ssa/ssn_card_type_{card_type_str.lower()}_count" - loss_matrix[label] = sim.map_result(ssn_type_mask, "person", "household") + loss_matrix[label] = sim.map_result( + ssn_type_mask, "person", "household" + ) # Target undocumented population by year based on various sources if card_type_str == "NONE": @@ -539,11 +571,14 @@ def build_loss_matrix(dataset: type, time_period): for _, row in spending_by_state.iterrows(): # Households located in this state in_state = ( - sim.calculate("state_code", map_to="household").values == row["state"] + sim.calculate("state_code", map_to="household").values + == row["state"] ) # ACA PTC amounts for every household (2025) - aca_value = sim.calculate("aca_ptc", map_to="household", period=2025).values + aca_value = sim.calculate( + "aca_ptc", map_to="household", period=2025 + ).values # Add a loss-matrix entry and matching target label = f"nation/irs/aca_spending/{row['state'].lower()}" @@ -576,7 +611,9 @@ def build_loss_matrix(dataset: type, time_period): in_state_enrolled = in_state & is_enrolled label = f"state/irs/aca_enrollment/{row['state'].lower()}" - loss_matrix[label] = sim.map_result(in_state_enrolled, "person", "household") + loss_matrix[label] = sim.map_result( + in_state_enrolled, "person", "household" + ) if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") @@ -593,7 +630,9 @@ def build_loss_matrix(dataset: type, time_period): state_person = sim.calculate("state_code", map_to="person").values # Flag people in households that actually receive medicaid - has_medicaid = sim.calculate("medicaid_enrolled", map_to="person", period=2025) + has_medicaid = sim.calculate( + "medicaid_enrolled", map_to="person", period=2025 + ) is_medicaid_eligible = sim.calculate( "is_medicaid_eligible", map_to="person", period=2025 ).values @@ -605,7 +644,9 @@ def build_loss_matrix(dataset: type, time_period): in_state_enrolled = in_state & is_enrolled label = f"irs/medicaid_enrollment/{row['state'].lower()}" - loss_matrix[label] = sim.map_result(in_state_enrolled, "person", "household") + loss_matrix[label] = sim.map_result( + in_state_enrolled, "person", "household" + ) if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") @@ -629,7 +670,9 @@ def build_loss_matrix(dataset: type, time_period): age_lower_bound = int(age_range.replace("+", "")) age_upper_bound = np.inf else: - age_lower_bound, age_upper_bound = map(int, age_range.split("-")) + age_lower_bound, age_upper_bound = map( + int, age_range.split("-") + ) age_mask = (age >= age_lower_bound) & (age <= age_upper_bound) label = f"state/census/age/{state}/{age_range}" @@ -702,7 +745,9 @@ def apply(self): simulation.default_calculation_period = time_period # Calculate the baseline and reform income tax values. - income_tax_r = simulation.calculate("income_tax", map_to="household").values + income_tax_r = simulation.calculate( + "income_tax", map_to="household" + ).values # Compute the tax expenditure (TE) values. te_values = income_tax_r - income_tax_b @@ -736,7 +781,9 @@ def _add_agi_state_targets(): + soi_targets["VARIABLE"] + "/" + soi_targets.apply( - lambda r: get_agi_band_label(r["AGI_LOWER_BOUND"], r["AGI_UPPER_BOUND"]), + lambda r: get_agi_band_label( + r["AGI_LOWER_BOUND"], r["AGI_UPPER_BOUND"] + ), axis=1, ) ) @@ -757,7 +804,9 @@ def _add_agi_metric_columns( agi = sim.calculate("adjusted_gross_income").values state = sim.calculate("state_code", map_to="person").values - state = sim.map_result(state, "person", "tax_unit", how="value_from_first_person") + state = sim.map_result( + state, "person", "tax_unit", how="value_from_first_person" + ) for _, r in soi_targets.iterrows(): lower, upper = r.AGI_LOWER_BOUND, r.AGI_UPPER_BOUND @@ -801,9 +850,13 @@ def _add_state_real_estate_taxes(loss_matrix, targets_list, sim): rtol=1e-8, ), "Real estate tax totals do not sum to national target" - targets_list.extend(real_estate_taxes_targets["real_estate_taxes_bn"].tolist()) + targets_list.extend( + real_estate_taxes_targets["real_estate_taxes_bn"].tolist() + ) - real_estate_taxes = sim.calculate("real_estate_taxes", map_to="household").values + real_estate_taxes = sim.calculate( + "real_estate_taxes", map_to="household" + ).values state = sim.calculate("state_code", map_to="household").values for _, r in real_estate_taxes_targets.iterrows(): @@ -826,16 +879,22 @@ def _add_snap_state_targets(sim): ).calibration.gov.cbo._children["snap"] ratio = snap_targets[["Cost"]].sum().values[0] / national_cost_target snap_targets[["CostAdj"]] = snap_targets[["Cost"]] / ratio - assert np.round(snap_targets[["CostAdj"]].sum().values[0]) == national_cost_target + assert ( + np.round(snap_targets[["CostAdj"]].sum().values[0]) + == national_cost_target + ) cost_targets = snap_targets.copy()[["GEO_ID", "CostAdj"]] - cost_targets["target_name"] = cost_targets["GEO_ID"].str[-4:] + "/snap-cost" + cost_targets["target_name"] = ( + cost_targets["GEO_ID"].str[-4:] + "/snap-cost" + ) hh_targets = snap_targets.copy()[["GEO_ID", "Households"]] hh_targets["target_name"] = snap_targets["GEO_ID"].str[-4:] + "/snap-hhs" target_names = ( - cost_targets["target_name"].tolist() + hh_targets["target_name"].tolist() + cost_targets["target_name"].tolist() + + hh_targets["target_name"].tolist() ) target_values = ( cost_targets["CostAdj"].astype(float).tolist() @@ -854,12 +913,14 @@ def _add_snap_metric_columns( snap_targets = pd.read_csv(CALIBRATION_FOLDER / "snap_state.csv") snap_cost = sim.calculate("snap_reported", map_to="household").values - snap_hhs = (sim.calculate("snap_reported", map_to="household").values > 0).astype( - int - ) + snap_hhs = ( + sim.calculate("snap_reported", map_to="household").values > 0 + ).astype(int) state = sim.calculate("state_code", map_to="person").values - state = sim.map_result(state, "person", "household", how="value_from_first_person") + state = sim.map_result( + state, "person", "household", how="value_from_first_person" + ) STATE_ABBR_TO_FIPS["DC"] = 11 state_fips = pd.Series(state).apply(lambda s: STATE_ABBR_TO_FIPS[s]) @@ -878,7 +939,9 @@ def _add_snap_metric_columns( return loss_matrix -def print_reweighting_diagnostics(optimised_weights, loss_matrix, targets_array, label): +def print_reweighting_diagnostics( + optimised_weights, loss_matrix, targets_array, label +): # Convert all inputs to NumPy arrays right at the start optimised_weights_np = ( optimised_weights.numpy() @@ -905,7 +968,9 @@ def print_reweighting_diagnostics(optimised_weights, loss_matrix, targets_array, # All subsequent calculations use the guaranteed NumPy versions estimate = optimised_weights_np @ loss_matrix_np - rel_error = (((estimate - targets_array_np) + 1) / (targets_array_np + 1)) ** 2 + rel_error = ( + ((estimate - targets_array_np) + 1) / (targets_array_np + 1) + ) ** 2 within_10_percent_mask = np.abs(estimate - targets_array_np) <= ( 0.10 * np.abs(targets_array_np) ) From b7a7a571f8494b2bf5e07102f3cb9847da85862f Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Fri, 6 Mar 2026 09:58:27 -0500 Subject: [PATCH 3/5] Reformat with ruff (CI switched from black to ruff in #577) Co-Authored-By: Claude Opus 4.6 --- policyengine_us_data/datasets/cps/cps.py | 171 +++++------------- .../datasets/cps/enhanced_cps.py | 32 +--- .../test_datasets/test_dataset_sanity.py | 50 ++--- policyengine_us_data/utils/loss.py | 147 +++++---------- 4 files changed, 111 insertions(+), 289 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index f2b0c5e3..6775fa16 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -93,9 +93,7 @@ def downsample(self, frac: float): # Store original dtypes before modifying original_data: dict = self.load_dataset() - original_dtypes = { - key: original_data[key].dtype for key in original_data - } + original_dtypes = {key: original_data[key].dtype for key in original_data} sim = Microsimulation(dataset=self) sim.subsample(frac=frac) @@ -208,18 +206,13 @@ def add_takeup(self): aca_rate = load_take_up_rate("aca", self.time_period) medicaid_rates_by_state = load_take_up_rate("medicaid", self.time_period) head_start_rate = load_take_up_rate("head_start", self.time_period) - early_head_start_rate = load_take_up_rate( - "early_head_start", self.time_period - ) + early_head_start_rate = load_take_up_rate("early_head_start", self.time_period) ssi_rate = load_take_up_rate("ssi", self.time_period) # EITC: varies by number of children eitc_child_count = baseline.calculate("eitc_child_count").values eitc_takeup_rate = np.array( - [ - eitc_rates_by_children.get(min(int(c), 3), 0.85) - for c in eitc_child_count - ] + [eitc_rates_by_children.get(min(int(c), 3), 0.85) for c in eitc_child_count] ) rng = seeded_rng("takes_up_eitc") data["takes_up_eitc"] = rng.random(n_tax_units) < eitc_takeup_rate @@ -238,9 +231,7 @@ def add_takeup(self): target_snap_takeup_count = int(snap_rate * n_spm_units) remaining_snap_needed = max(0, target_snap_takeup_count - n_snap_reporters) snap_non_reporter_rate = ( - remaining_snap_needed / n_snap_non_reporters - if n_snap_non_reporters > 0 - else 0 + remaining_snap_needed / n_snap_non_reporters if n_snap_non_reporters > 0 else 0 ) # Assign: all reporters + adjusted rate for non-reporters @@ -257,9 +248,7 @@ def add_takeup(self): hh_ids = data["household_id"] person_hh_ids = data["person_household_id"] hh_to_state = dict(zip(hh_ids, state_codes)) - person_states = np.array( - [hh_to_state.get(hh_id, "CA") for hh_id in person_hh_ids] - ) + person_states = np.array([hh_to_state.get(hh_id, "CA") for hh_id in person_hh_ids]) medicaid_rate_by_person = np.array( [medicaid_rates_by_state.get(s, 0.93) for s in person_states] ) @@ -270,9 +259,7 @@ def add_takeup(self): # Head Start rng = seeded_rng("takes_up_head_start_if_eligible") - data["takes_up_head_start_if_eligible"] = ( - rng.random(n_persons) < head_start_rate - ) + data["takes_up_head_start_if_eligible"] = rng.random(n_persons) < head_start_rate # Early Head Start rng = seeded_rng("takes_up_early_head_start_if_eligible") @@ -290,9 +277,7 @@ def add_takeup(self): target_ssi_takeup_count = int(ssi_rate * n_persons) remaining_ssi_needed = max(0, target_ssi_takeup_count - n_ssi_reporters) ssi_non_reporter_rate = ( - remaining_ssi_needed / n_ssi_non_reporters - if n_ssi_non_reporters > 0 - else 0 + remaining_ssi_needed / n_ssi_non_reporters if n_ssi_non_reporters > 0 else 0 ) # Assign: all reporters + adjusted rate for non-reporters @@ -315,9 +300,7 @@ def add_takeup(self): data["would_claim_wic"] = rng.random(n_persons) < wic_takeup_rate_by_person # WIC nutritional risk — fully resolved - wic_risk_rates = load_take_up_rate( - "wic_nutritional_risk", self.time_period - ) + wic_risk_rates = load_take_up_rate("wic_nutritional_risk", self.time_period) wic_risk_rate_by_person = np.array( [wic_risk_rates.get(c, 0) for c in wic_categories] ) @@ -364,12 +347,8 @@ def uprate_cps_data(data, from_period, to_period): uprating = create_policyengine_uprating_factors_table() for variable in uprating.index.unique(): if variable in data: - current_index = uprating[uprating.index == variable][ - to_period - ].values[0] - start_index = uprating[uprating.index == variable][ - from_period - ].values[0] + current_index = uprating[uprating.index == variable][to_period].values[0] + start_index = uprating[uprating.index == variable][from_period].values[0] growth = current_index / start_index data[variable] = data[variable] * growth @@ -411,9 +390,7 @@ def add_id_variables( # Marital units - marital_unit_id = person.PH_SEQ * 1e6 + np.maximum( - person.A_LINENO, person.A_SPOUSE - ) + marital_unit_id = person.PH_SEQ * 1e6 + np.maximum(person.A_LINENO, person.A_SPOUSE) # marital_unit_id is not the household ID, zero padded and followed # by the index within household (of each person, or their spouse if @@ -453,9 +430,7 @@ def add_personal_variables(cps: h5py.File, person: DataFrame) -> None: # "Is...blind or does...have serious difficulty seeing even when Wearing # glasses?" 1 -> Yes cps["is_blind"] = person.PEDISEYE == 1 - DISABILITY_FLAGS = [ - "PEDIS" + i for i in ["DRS", "EAR", "EYE", "OUT", "PHY", "REM"] - ] + DISABILITY_FLAGS = ["PEDIS" + i for i in ["DRS", "EAR", "EYE", "OUT", "PHY", "REM"]] cps["is_disabled"] = (person[DISABILITY_FLAGS] == 1).any(axis=1) def children_per_parent(col: str) -> pd.DataFrame: @@ -477,9 +452,7 @@ def children_per_parent(col: str) -> pd.DataFrame: # Aggregate to parent. res = ( - pd.concat( - [children_per_parent("PEPAR1"), children_per_parent("PEPAR2")] - ) + pd.concat([children_per_parent("PEPAR1"), children_per_parent("PEPAR2")]) .groupby(["PH_SEQ", "A_LINENO"]) .children.sum() .reset_index() @@ -505,9 +478,7 @@ def children_per_parent(col: str) -> pd.DataFrame: add_overtime_occupation(cps, person) -def add_personal_income_variables( - cps: h5py.File, person: DataFrame, year: int -): +def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): """Add income variables. Args: @@ -533,16 +504,14 @@ def add_personal_income_variables( cps["weekly_hours_worked"] = person.HRSWK * person.WKSWORK / 52 cps["hours_worked_last_week"] = person.A_HRS1 * person.WKSWORK / 52 - cps["taxable_interest_income"] = person.INT_VAL * ( - p["taxable_interest_fraction"] - ) + cps["taxable_interest_income"] = person.INT_VAL * (p["taxable_interest_fraction"]) cps["tax_exempt_interest_income"] = person.INT_VAL * ( 1 - p["taxable_interest_fraction"] ) cps["self_employment_income"] = person.SEMP_VAL cps["farm_income"] = person.FRSE_VAL - cps["qualified_dividend_income"] = person.DIV_VAL * ( - p["qualified_dividend_fraction"] + cps["qualified_dividend_income"] = ( + person.DIV_VAL * (p["qualified_dividend_fraction"]) ) cps["non_qualified_dividend_income"] = person.DIV_VAL * ( 1 - p["qualified_dividend_fraction"] @@ -561,18 +530,14 @@ def add_personal_income_variables( # 8 = Other is_retirement = (person.RESNSS1 == 1) | (person.RESNSS2 == 1) is_disability = (person.RESNSS1 == 2) | (person.RESNSS2 == 2) - is_survivor = np.isin(person.RESNSS1, [3, 5]) | np.isin( - person.RESNSS2, [3, 5] - ) + is_survivor = np.isin(person.RESNSS1, [3, 5]) | np.isin(person.RESNSS2, [3, 5]) is_dependent = np.isin(person.RESNSS1, [4, 6, 7]) | np.isin( person.RESNSS2, [4, 6, 7] ) # Primary classification: assign full SS_VAL to the highest- # priority category when someone has multiple source codes. - cps["social_security_retirement"] = np.where( - is_retirement, person.SS_VAL, 0 - ) + cps["social_security_retirement"] = np.where(is_retirement, person.SS_VAL, 0) cps["social_security_disability"] = np.where( is_disability & ~is_retirement, person.SS_VAL, 0 ) @@ -615,9 +580,7 @@ def add_personal_income_variables( # Add pensions and annuities. cps_pensions = person.PNSN_VAL + person.ANN_VAL # Assume a constant fraction of pension income is taxable. - cps["taxable_private_pension_income"] = ( - cps_pensions * p["taxable_pension_fraction"] - ) + cps["taxable_private_pension_income"] = cps_pensions * p["taxable_pension_fraction"] cps["tax_exempt_private_pension_income"] = cps_pensions * ( 1 - p["taxable_pension_fraction"] ) @@ -641,18 +604,11 @@ def add_personal_income_variables( for source_with_taxable_fraction in ["401k", "403b", "sep"]: cps[f"taxable_{source_with_taxable_fraction}_distributions"] = ( cps[f"{source_with_taxable_fraction}_distributions"] - * p[ - f"taxable_{source_with_taxable_fraction}_distribution_fraction" - ] + * p[f"taxable_{source_with_taxable_fraction}_distribution_fraction"] ) cps[f"tax_exempt_{source_with_taxable_fraction}_distributions"] = cps[ f"{source_with_taxable_fraction}_distributions" - ] * ( - 1 - - p[ - f"taxable_{source_with_taxable_fraction}_distribution_fraction" - ] - ) + ] * (1 - p[f"taxable_{source_with_taxable_fraction}_distribution_fraction"]) del cps[f"{source_with_taxable_fraction}_distributions"] # Assume all regular IRA distributions are taxable, @@ -740,9 +696,7 @@ def add_personal_income_variables( cps["traditional_ira_contributions"] = ira_capped * trad_ira_share cps["roth_ira_contributions"] = ira_capped * (1 - trad_ira_share) # Allocate capital gains into long-term and short-term based on aggregate split. - cps["long_term_capital_gains"] = person.CAP_VAL * ( - p["long_term_capgain_fraction"] - ) + cps["long_term_capital_gains"] = person.CAP_VAL * (p["long_term_capgain_fraction"]) cps["short_term_capital_gains"] = person.CAP_VAL * ( 1 - p["long_term_capgain_fraction"] ) @@ -770,10 +724,7 @@ def add_personal_income_variables( # Get QBI simulation parameters --- yamlfilename = ( - files("policyengine_us_data") - / "datasets" - / "puf" - / "qbi_assumptions.yaml" + files("policyengine_us_data") / "datasets" / "puf" / "qbi_assumptions.yaml" ) with open(yamlfilename, "r", encoding="utf-8") as yamlfile: p = yaml.safe_load(yamlfile) @@ -827,14 +778,10 @@ def add_spm_variables(self, cps: h5py.File, spm_unit: DataFrame) -> None: 3: "RENTER", } cps["spm_unit_tenure_type"] = ( - spm_unit.SPM_TENMORTSTATUS.map(tenure_map) - .fillna("RENTER") - .astype("S") + spm_unit.SPM_TENMORTSTATUS.map(tenure_map).fillna("RENTER").astype("S") ) - cps["reduced_price_school_meals_reported"] = ( - cps["free_school_meals_reported"] * 0 - ) + cps["reduced_price_school_meals_reported"] = cps["free_school_meals_reported"] * 0 def add_household_variables(cps: h5py.File, household: DataFrame) -> None: @@ -968,9 +915,7 @@ def select_random_subset_to_target( share_to_move = min(share_to_move, 1.0) # Cap at 100% else: # Calculate how much to move to reach target (for EAD case) - needed_weighted = ( - current_weighted - target_weighted - ) # Will be negative + needed_weighted = current_weighted - target_weighted # Will be negative total_weight = np.sum(person_weights[eligible_ids]) share_to_move = abs(needed_weighted) / total_weight share_to_move = min(share_to_move, 1.0) # Cap at 100% @@ -1214,9 +1159,7 @@ def select_random_subset_to_target( ) # CONDITION 10: Government Employees - is_government_worker = np.isin( - person.PEIO1COW, [1, 2, 3] - ) # Fed/state/local gov + is_government_worker = np.isin(person.PEIO1COW, [1, 2, 3]) # Fed/state/local gov is_military_occupation = person.A_MJOCC == 11 # Military occupation is_government_employee = is_government_worker | is_military_occupation condition_10_mask = potentially_undocumented & is_government_employee @@ -1330,12 +1273,8 @@ def select_random_subset_to_target( undocumented_students_mask = ( (ssn_card_type == 0) & noncitizens & (person.A_HSCOL == 2) ) - undocumented_workers_count = np.sum( - person_weights[undocumented_workers_mask] - ) - undocumented_students_count = np.sum( - person_weights[undocumented_students_mask] - ) + undocumented_workers_count = np.sum(person_weights[undocumented_workers_mask]) + undocumented_students_count = np.sum(person_weights[undocumented_students_mask]) after_conditions_code_0 = np.sum(person_weights[ssn_card_type == 0]) print(f"After conditions - Code 0 people: {after_conditions_code_0:,.0f}") @@ -1530,15 +1469,11 @@ def select_random_subset_to_target( f"Selected {len(selected_indices)} people from {len(mixed_household_candidates)} candidates in mixed households" ) else: - print( - "No additional family members selected (target already reached)" - ) + print("No additional family members selected (target already reached)") else: print("No mixed-status households found for family correlation") else: - print( - "No additional undocumented people needed - target already reached" - ) + print("No additional undocumented people needed - target already reached") # Calculate the weighted impact code_0_after = np.sum(person_weights[ssn_card_type == 0]) @@ -1613,9 +1548,7 @@ def get_arrival_year_midpoint(peinusyr): age_at_entry = np.maximum(0, person.A_AGE - years_in_us) # start every non-citizen as LPR so no UNSET survives - immigration_status = np.full( - len(person), "LEGAL_PERMANENT_RESIDENT", dtype="U32" - ) + immigration_status = np.full(len(person), "LEGAL_PERMANENT_RESIDENT", dtype="U32") # Set citizens (SSN card type 1) to CITIZEN status immigration_status[ssn_card_type == 1] = "CITIZEN" @@ -1663,9 +1596,7 @@ def get_arrival_year_midpoint(peinusyr): immigration_status[recent_refugee_mask] = "REFUGEE" # 6. Temp non-qualified (Code 2 not caught by DACA rule) - mask = (ssn_card_type == 2) & ( - immigration_status == "LEGAL_PERMANENT_RESIDENT" - ) + mask = (ssn_card_type == 2) & (immigration_status == "LEGAL_PERMANENT_RESIDENT") immigration_status[mask] = "TPS" # Final write (all values now in ImmigrationStatus Enum) @@ -1681,9 +1612,7 @@ def get_arrival_year_midpoint(peinusyr): 2: "NON_CITIZEN_VALID_EAD", # Non-citizens with work/study authorization 3: "OTHER_NON_CITIZEN", # Non-citizens with indicators of legal status } - ssn_card_type_str = ( - pd.Series(ssn_card_type).map(code_to_str).astype("S").values - ) + ssn_card_type_str = pd.Series(ssn_card_type).map(code_to_str).astype("S").values cps["ssn_card_type"] = ssn_card_type_str # Final population summary @@ -1890,9 +1819,7 @@ def add_tips(self, cps: h5py.File): # Drop temporary columns used only for imputation # is_married is person-level here but policyengine-us defines it at Family # level, so we must not save it - cps = cps.drop( - columns=["is_married", "is_under_18", "is_under_6"], errors="ignore" - ) + cps = cps.drop(columns=["is_married", "is_under_18", "is_under_6"], errors="ignore") self.save_dataset(cps) @@ -2012,9 +1939,7 @@ def create_scf_reference_person_mask(cps_data, raw_person_data): all_persons_data["is_female"] = (raw_person_data.A_SEX == 2).values # Add marital status (A_MARITL codes: 1,2 = married with spouse present/absent) - all_persons_data["is_married"] = raw_person_data.A_MARITL.isin( - [1, 2] - ).values + all_persons_data["is_married"] = raw_person_data.A_MARITL.isin([1, 2]).values # Define adults as age 18+ all_persons_data["is_adult"] = all_persons_data["age"] >= 18 @@ -2033,8 +1958,7 @@ def create_scf_reference_person_mask(cps_data, raw_person_data): # Identify couple households (households with exactly 2 married adults) married_adults_per_household = ( all_persons_data[ - (all_persons_data["is_adult"]) - & (all_persons_data["is_married"]) + (all_persons_data["is_adult"]) & (all_persons_data["is_married"]) ] .groupby("person_household_id") .size() @@ -2042,12 +1966,7 @@ def create_scf_reference_person_mask(cps_data, raw_person_data): couple_households = married_adults_per_household[ (married_adults_per_household == 2) - & ( - all_persons_data.groupby("person_household_id")[ - "n_adults" - ].first() - == 2 - ) + & (all_persons_data.groupby("person_household_id")["n_adults"].first() == 2) ].index all_persons_data["is_couple_household"] = all_persons_data[ @@ -2147,9 +2066,7 @@ def determine_reference_person(group): } # Apply the mapping to recode the race values - cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)( - cps_data["cps_race"] - ) + cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) lengths = {k: len(v) for k, v in cps_data.items()} var_len = cps_data["person_household_id"].shape[0] @@ -2181,9 +2098,7 @@ def determine_reference_person(group): # Add is_married variable for household heads based on raw person data reference_persons = person_data[mask] - receiver_data["is_married"] = reference_persons.A_MARITL.isin( - [1, 2] - ).values + receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values # Impute auto loan balance from the SCF from policyengine_us_data.datasets.scf.scf import SCF_2022 @@ -2218,9 +2133,7 @@ def determine_reference_person(group): logging.getLogger("microimpute").setLevel(getattr(logging, log_level)) qrf_model = QRF() - donor_data = donor_data.sample(frac=0.5, random_state=42).reset_index( - drop=True - ) + donor_data = donor_data.sample(frac=0.5, random_state=42).reset_index(drop=True) fitted_model = qrf_model.fit( X_train=donor_data, predictors=PREDICTORS, diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 2b3b46ef..8755c73e 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -46,9 +46,7 @@ def reweight( normalisation_factor = np.where( is_national, nation_normalisation_factor, state_normalisation_factor ) - normalisation_factor = torch.tensor( - normalisation_factor, dtype=torch.float32 - ) + normalisation_factor = torch.tensor(normalisation_factor, dtype=torch.float32) targets_array = torch.tensor(targets_array, dtype=torch.float32) inv_mean_normalisation = 1 / np.mean(normalisation_factor.numpy()) @@ -61,12 +59,8 @@ def loss(weights): estimate = weights @ loss_matrix if torch.isnan(estimate).any(): raise ValueError("Estimate contains NaNs") - rel_error = ( - ((estimate - targets_array) + 1) / (targets_array + 1) - ) ** 2 - rel_error_normalized = ( - inv_mean_normalisation * rel_error * normalisation_factor - ) + rel_error = (((estimate - targets_array) + 1) / (targets_array + 1)) ** 2 + rel_error_normalized = inv_mean_normalisation * rel_error * normalisation_factor if torch.isnan(rel_error_normalized).any(): raise ValueError("Relative error contains NaNs") return rel_error_normalized.mean() @@ -121,9 +115,7 @@ def loss(weights): start_loss = l.item() loss_rel_change = (l.item() - start_loss) / start_loss l.backward() - iterator.set_postfix( - {"loss": l.item(), "loss_rel_change": loss_rel_change} - ) + iterator.set_postfix({"loss": l.item(), "loss_rel_change": loss_rel_change}) optimizer.step() if log_path is not None: performance.to_csv(log_path, index=False) @@ -182,9 +174,7 @@ def generate(self): # Run the optimization procedure to get (close to) minimum loss weights for year in range(self.start_year, self.end_year + 1): - loss_matrix, targets_array = build_loss_matrix( - self.input_dataset, year - ) + loss_matrix, targets_array = build_loss_matrix(self.input_dataset, year) zero_mask = np.isclose(targets_array, 0.0, atol=0.1) bad_mask = loss_matrix.columns.isin(bad_targets) keep_mask_bool = ~(zero_mask | bad_mask) @@ -210,9 +200,7 @@ def generate(self): # Validate dense weights w = optimised_weights if np.any(np.isnan(w)): - raise ValueError( - f"Year {year}: household_weight contains NaN values" - ) + raise ValueError(f"Year {year}: household_weight contains NaN values") if np.any(w < 0): raise ValueError( f"Year {year}: household_weight contains negative values" @@ -253,12 +241,8 @@ def generate(self): 1, 0.1, len(original_weights) ) for year in [2024]: - loss_matrix, targets_array = build_loss_matrix( - self.input_dataset, year - ) - optimised_weights = reweight( - original_weights, loss_matrix, targets_array - ) + loss_matrix, targets_array = build_loss_matrix(self.input_dataset, year) + optimised_weights = reweight(original_weights, loss_matrix, targets_array) data["household_weight"] = optimised_weights self.save_dataset(data) diff --git a/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py b/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py index 8314fe7f..4e8732b0 100644 --- a/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py +++ b/policyengine_us_data/tests/test_datasets/test_dataset_sanity.py @@ -41,27 +41,23 @@ def test_ecps_employment_income_positive(ecps_sim): def test_ecps_self_employment_income_positive(ecps_sim): total = ecps_sim.calculate("self_employment_income").sum() - assert ( - total > 50e9 - ), f"self_employment_income sum is {total:.2e}, expected > 50B." + assert total > 50e9, f"self_employment_income sum is {total:.2e}, expected > 50B." def test_ecps_household_count(ecps_sim): """Household count should be roughly 130-160M.""" total_hh = ecps_sim.calculate("household_weight").values.sum() - assert ( - 100e6 < total_hh < 200e6 - ), f"Total households = {total_hh:.2e}, expected 100M-200M." + assert 100e6 < total_hh < 200e6, ( + f"Total households = {total_hh:.2e}, expected 100M-200M." + ) def test_ecps_person_count(ecps_sim): """Weighted person count should be roughly 330M.""" - total_people = ecps_sim.calculate( - "household_weight", map_to="person" - ).values.sum() - assert ( - 250e6 < total_people < 400e6 - ), f"Total people = {total_people:.2e}, expected 250M-400M." + total_people = ecps_sim.calculate("household_weight", map_to="person").values.sum() + assert 250e6 < total_people < 400e6, ( + f"Total people = {total_people:.2e}, expected 250M-400M." + ) def test_ecps_poverty_rate_reasonable(ecps_sim): @@ -84,9 +80,9 @@ def test_ecps_mean_employment_income_reasonable(ecps_sim): """Mean employment income per person should be $20k-$60k.""" income = ecps_sim.calculate("employment_income", map_to="person") mean = income.mean() - assert ( - 15_000 < mean < 80_000 - ), f"Mean employment income = ${mean:,.0f}, expected $15k-$80k." + assert 15_000 < mean < 80_000, ( + f"Mean employment income = ${mean:,.0f}, expected $15k-$80k." + ) # ── CPS sanity checks ─────────────────────────────────────────── @@ -94,9 +90,7 @@ def test_ecps_mean_employment_income_reasonable(ecps_sim): def test_cps_employment_income_positive(cps_sim): total = cps_sim.calculate("employment_income").sum() - assert ( - total > 5e12 - ), f"CPS employment_income sum is {total:.2e}, expected > 5T." + assert total > 5e12, f"CPS employment_income sum is {total:.2e}, expected > 5T." def test_cps_household_count(cps_sim): @@ -122,24 +116,20 @@ def sparse_sim(): def test_sparse_employment_income_positive(sparse_sim): """Sparse dataset employment income must be in the trillions.""" total = sparse_sim.calculate("employment_income").sum() - assert ( - total > 5e12 - ), f"Sparse employment_income sum is {total:.2e}, expected > 5T." + assert total > 5e12, f"Sparse employment_income sum is {total:.2e}, expected > 5T." def test_sparse_household_count(sparse_sim): total_hh = sparse_sim.calculate("household_weight").values.sum() - assert ( - 100e6 < total_hh < 200e6 - ), f"Sparse total households = {total_hh:.2e}, expected 100M-200M." + assert 100e6 < total_hh < 200e6, ( + f"Sparse total households = {total_hh:.2e}, expected 100M-200M." + ) def test_sparse_poverty_rate_reasonable(sparse_sim): in_poverty = sparse_sim.calculate("person_in_poverty", map_to="person") rate = in_poverty.mean() - assert ( - 0.05 < rate < 0.30 - ), f"Sparse poverty rate = {rate:.1%}, expected 5-30%." + assert 0.05 < rate < 0.30, f"Sparse poverty rate = {rate:.1%}, expected 5-30%." # ── File size checks ─────────────────────────────────────────── @@ -153,6 +143,6 @@ def test_ecps_file_size(): if not path.exists(): pytest.skip("enhanced_cps_2024.h5 not found") size_mb = path.stat().st_size / (1024 * 1024) - assert ( - size_mb > 100 - ), f"enhanced_cps_2024.h5 is only {size_mb:.1f}MB, expected >100MB" + assert size_mb > 100, ( + f"enhanced_cps_2024.h5 is only {size_mb:.1f}MB, expected >100MB" + ) diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index be6827a5..bb6ca505 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -164,9 +164,7 @@ def build_loss_matrix(dataset: type, time_period): continue mask = ( - (agi >= row["AGI lower bound"]) - * (agi < row["AGI upper bound"]) - * filer + (agi >= row["AGI lower bound"]) * (agi < row["AGI upper bound"]) * filer ) > 0 if row["Filing status"] == "Single": @@ -186,12 +184,8 @@ def build_loss_matrix(dataset: type, time_period): if row["Count"]: values = (values > 0).astype(float) - agi_range_label = ( - f"{fmt(row['AGI lower bound'])}-{fmt(row['AGI upper bound'])}" - ) - taxable_label = ( - "taxable" if row["Taxable only"] else "all" + " returns" - ) + agi_range_label = f"{fmt(row['AGI lower bound'])}-{fmt(row['AGI upper bound'])}" + taxable_label = "taxable" if row["Taxable only"] else "all" + " returns" filing_status_label = row["Filing status"] variable_label = row["Variable"].replace("_", " ") @@ -270,9 +264,7 @@ def build_loss_matrix(dataset: type, time_period): for variable_name in CBO_PROGRAMS: label = f"nation/cbo/{variable_name}" - loss_matrix[label] = sim.calculate( - variable_name, map_to="household" - ).values + loss_matrix[label] = sim.calculate(variable_name, map_to="household").values if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") param_name = CBO_PARAM_NAME_MAP.get(variable_name, variable_name) @@ -312,9 +304,9 @@ def build_loss_matrix(dataset: type, time_period): # National ACA Enrollment (people receiving a PTC) label = "nation/gov/aca_enrollment" - on_ptc = ( - sim.calculate("aca_ptc", map_to="person", period=2025).values > 0 - ).astype(int) + on_ptc = (sim.calculate("aca_ptc", map_to="person", period=2025).values > 0).astype( + int + ) loss_matrix[label] = sim.map_result(on_ptc, "person", "household") ACA_PTC_ENROLLMENT_2024 = 19_743_689 # people enrolled @@ -346,13 +338,9 @@ def build_loss_matrix(dataset: type, time_period): eitc_eligible_children = sim.calculate("eitc_child_count").values eitc = sim.calculate("eitc").values if row["count_children"] < 2: - meets_child_criteria = ( - eitc_eligible_children == row["count_children"] - ) + meets_child_criteria = eitc_eligible_children == row["count_children"] else: - meets_child_criteria = ( - eitc_eligible_children >= row["count_children"] - ) + meets_child_criteria = eitc_eligible_children >= row["count_children"] loss_matrix[returns_label] = sim.map_result( (eitc > 0) * meets_child_criteria, "tax_unit", @@ -406,9 +394,7 @@ def build_loss_matrix(dataset: type, time_period): # Hard-coded totals for variable_name, target in HARD_CODED_TOTALS.items(): label = f"nation/census/{variable_name}" - loss_matrix[label] = sim.calculate( - variable_name, map_to="household" - ).values + loss_matrix[label] = sim.calculate(variable_name, map_to="household").values if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") targets_array.append(target) @@ -416,8 +402,8 @@ def build_loss_matrix(dataset: type, time_period): # Negative household market income total rough estimate from the IRS SOI PUF market_income = sim.calculate("household_market_income").values - loss_matrix["nation/irs/negative_household_market_income_total"] = ( - market_income * (market_income < 0) + loss_matrix["nation/irs/negative_household_market_income_total"] = market_income * ( + market_income < 0 ) targets_array.append(-138e9) @@ -448,39 +434,27 @@ def build_loss_matrix(dataset: type, time_period): # AGI by SPM threshold totals - spm_threshold_agi = pd.read_csv( - CALIBRATION_FOLDER / "spm_threshold_agi.csv" - ) + spm_threshold_agi = pd.read_csv(CALIBRATION_FOLDER / "spm_threshold_agi.csv") for _, row in spm_threshold_agi.iterrows(): - spm_unit_agi = sim.calculate( - "adjusted_gross_income", map_to="spm_unit" - ).values + spm_unit_agi = sim.calculate("adjusted_gross_income", map_to="spm_unit").values spm_threshold = sim.calculate("spm_unit_spm_threshold").values in_threshold_range = (spm_threshold >= row["lower_spm_threshold"]) * ( spm_threshold < row["upper_spm_threshold"] ) - label = ( - f"nation/census/agi_in_spm_threshold_decile_{int(row['decile'])}" - ) + label = f"nation/census/agi_in_spm_threshold_decile_{int(row['decile'])}" loss_matrix[label] = sim.map_result( in_threshold_range * spm_unit_agi, "spm_unit", "household" ) targets_array.append(row["adjusted_gross_income"]) - label = ( - f"nation/census/count_in_spm_threshold_decile_{int(row['decile'])}" - ) - loss_matrix[label] = sim.map_result( - in_threshold_range, "spm_unit", "household" - ) + label = f"nation/census/count_in_spm_threshold_decile_{int(row['decile'])}" + loss_matrix[label] = sim.map_result(in_threshold_range, "spm_unit", "household") targets_array.append(row["count"]) # Population by state and population under 5 by state - state_population = pd.read_csv( - CALIBRATION_FOLDER / "population_by_state.csv" - ) + state_population = pd.read_csv(CALIBRATION_FOLDER / "population_by_state.csv") for _, row in state_population.iterrows(): in_state = sim.calculate("state_code", map_to="person") == row["state"] @@ -491,9 +465,7 @@ def build_loss_matrix(dataset: type, time_period): under_5 = sim.calculate("age").values < 5 in_state_under_5 = in_state * under_5 label = f"state/census/population_under_5_by_state/{row['state']}" - loss_matrix[label] = sim.map_result( - in_state_under_5, "person", "household" - ) + loss_matrix[label] = sim.map_result(in_state_under_5, "person", "household") targets_array.append(row["population_under_5"]) age = sim.calculate("age").values @@ -517,9 +489,7 @@ def build_loss_matrix(dataset: type, time_period): # SALT tax expenditure targeting - _add_tax_expenditure_targets( - dataset, time_period, sim, loss_matrix, targets_array - ) + _add_tax_expenditure_targets(dataset, time_period, sim, loss_matrix, targets_array) if any(loss_matrix.isna().sum() > 0): raise ValueError("Some targets are missing from the loss matrix") @@ -533,9 +503,7 @@ def build_loss_matrix(dataset: type, time_period): # Overall count by SSN card type label = f"nation/ssa/ssn_card_type_{card_type_str.lower()}_count" - loss_matrix[label] = sim.map_result( - ssn_type_mask, "person", "household" - ) + loss_matrix[label] = sim.map_result(ssn_type_mask, "person", "household") # Target undocumented population by year based on various sources if card_type_str == "NONE": @@ -571,14 +539,11 @@ def build_loss_matrix(dataset: type, time_period): for _, row in spending_by_state.iterrows(): # Households located in this state in_state = ( - sim.calculate("state_code", map_to="household").values - == row["state"] + sim.calculate("state_code", map_to="household").values == row["state"] ) # ACA PTC amounts for every household (2025) - aca_value = sim.calculate( - "aca_ptc", map_to="household", period=2025 - ).values + aca_value = sim.calculate("aca_ptc", map_to="household", period=2025).values # Add a loss-matrix entry and matching target label = f"nation/irs/aca_spending/{row['state'].lower()}" @@ -611,9 +576,7 @@ def build_loss_matrix(dataset: type, time_period): in_state_enrolled = in_state & is_enrolled label = f"state/irs/aca_enrollment/{row['state'].lower()}" - loss_matrix[label] = sim.map_result( - in_state_enrolled, "person", "household" - ) + loss_matrix[label] = sim.map_result(in_state_enrolled, "person", "household") if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") @@ -630,9 +593,7 @@ def build_loss_matrix(dataset: type, time_period): state_person = sim.calculate("state_code", map_to="person").values # Flag people in households that actually receive medicaid - has_medicaid = sim.calculate( - "medicaid_enrolled", map_to="person", period=2025 - ) + has_medicaid = sim.calculate("medicaid_enrolled", map_to="person", period=2025) is_medicaid_eligible = sim.calculate( "is_medicaid_eligible", map_to="person", period=2025 ).values @@ -644,9 +605,7 @@ def build_loss_matrix(dataset: type, time_period): in_state_enrolled = in_state & is_enrolled label = f"irs/medicaid_enrollment/{row['state'].lower()}" - loss_matrix[label] = sim.map_result( - in_state_enrolled, "person", "household" - ) + loss_matrix[label] = sim.map_result(in_state_enrolled, "person", "household") if any(loss_matrix[label].isna()): raise ValueError(f"Missing values for {label}") @@ -670,9 +629,7 @@ def build_loss_matrix(dataset: type, time_period): age_lower_bound = int(age_range.replace("+", "")) age_upper_bound = np.inf else: - age_lower_bound, age_upper_bound = map( - int, age_range.split("-") - ) + age_lower_bound, age_upper_bound = map(int, age_range.split("-")) age_mask = (age >= age_lower_bound) & (age <= age_upper_bound) label = f"state/census/age/{state}/{age_range}" @@ -745,9 +702,7 @@ def apply(self): simulation.default_calculation_period = time_period # Calculate the baseline and reform income tax values. - income_tax_r = simulation.calculate( - "income_tax", map_to="household" - ).values + income_tax_r = simulation.calculate("income_tax", map_to="household").values # Compute the tax expenditure (TE) values. te_values = income_tax_r - income_tax_b @@ -781,9 +736,7 @@ def _add_agi_state_targets(): + soi_targets["VARIABLE"] + "/" + soi_targets.apply( - lambda r: get_agi_band_label( - r["AGI_LOWER_BOUND"], r["AGI_UPPER_BOUND"] - ), + lambda r: get_agi_band_label(r["AGI_LOWER_BOUND"], r["AGI_UPPER_BOUND"]), axis=1, ) ) @@ -804,9 +757,7 @@ def _add_agi_metric_columns( agi = sim.calculate("adjusted_gross_income").values state = sim.calculate("state_code", map_to="person").values - state = sim.map_result( - state, "person", "tax_unit", how="value_from_first_person" - ) + state = sim.map_result(state, "person", "tax_unit", how="value_from_first_person") for _, r in soi_targets.iterrows(): lower, upper = r.AGI_LOWER_BOUND, r.AGI_UPPER_BOUND @@ -850,13 +801,9 @@ def _add_state_real_estate_taxes(loss_matrix, targets_list, sim): rtol=1e-8, ), "Real estate tax totals do not sum to national target" - targets_list.extend( - real_estate_taxes_targets["real_estate_taxes_bn"].tolist() - ) + targets_list.extend(real_estate_taxes_targets["real_estate_taxes_bn"].tolist()) - real_estate_taxes = sim.calculate( - "real_estate_taxes", map_to="household" - ).values + real_estate_taxes = sim.calculate("real_estate_taxes", map_to="household").values state = sim.calculate("state_code", map_to="household").values for _, r in real_estate_taxes_targets.iterrows(): @@ -879,22 +826,16 @@ def _add_snap_state_targets(sim): ).calibration.gov.cbo._children["snap"] ratio = snap_targets[["Cost"]].sum().values[0] / national_cost_target snap_targets[["CostAdj"]] = snap_targets[["Cost"]] / ratio - assert ( - np.round(snap_targets[["CostAdj"]].sum().values[0]) - == national_cost_target - ) + assert np.round(snap_targets[["CostAdj"]].sum().values[0]) == national_cost_target cost_targets = snap_targets.copy()[["GEO_ID", "CostAdj"]] - cost_targets["target_name"] = ( - cost_targets["GEO_ID"].str[-4:] + "/snap-cost" - ) + cost_targets["target_name"] = cost_targets["GEO_ID"].str[-4:] + "/snap-cost" hh_targets = snap_targets.copy()[["GEO_ID", "Households"]] hh_targets["target_name"] = snap_targets["GEO_ID"].str[-4:] + "/snap-hhs" target_names = ( - cost_targets["target_name"].tolist() - + hh_targets["target_name"].tolist() + cost_targets["target_name"].tolist() + hh_targets["target_name"].tolist() ) target_values = ( cost_targets["CostAdj"].astype(float).tolist() @@ -913,14 +854,12 @@ def _add_snap_metric_columns( snap_targets = pd.read_csv(CALIBRATION_FOLDER / "snap_state.csv") snap_cost = sim.calculate("snap_reported", map_to="household").values - snap_hhs = ( - sim.calculate("snap_reported", map_to="household").values > 0 - ).astype(int) + snap_hhs = (sim.calculate("snap_reported", map_to="household").values > 0).astype( + int + ) state = sim.calculate("state_code", map_to="person").values - state = sim.map_result( - state, "person", "household", how="value_from_first_person" - ) + state = sim.map_result(state, "person", "household", how="value_from_first_person") STATE_ABBR_TO_FIPS["DC"] = 11 state_fips = pd.Series(state).apply(lambda s: STATE_ABBR_TO_FIPS[s]) @@ -939,9 +878,7 @@ def _add_snap_metric_columns( return loss_matrix -def print_reweighting_diagnostics( - optimised_weights, loss_matrix, targets_array, label -): +def print_reweighting_diagnostics(optimised_weights, loss_matrix, targets_array, label): # Convert all inputs to NumPy arrays right at the start optimised_weights_np = ( optimised_weights.numpy() @@ -968,9 +905,7 @@ def print_reweighting_diagnostics( # All subsequent calculations use the guaranteed NumPy versions estimate = optimised_weights_np @ loss_matrix_np - rel_error = ( - ((estimate - targets_array_np) + 1) / (targets_array_np + 1) - ) ** 2 + rel_error = (((estimate - targets_array_np) + 1) / (targets_array_np + 1)) ** 2 within_10_percent_mask = np.abs(estimate - targets_array_np) <= ( 0.10 * np.abs(targets_array_np) ) From 0722d4595e7f78ba5687f3d40402f563a9889ff8 Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Fri, 6 Mar 2026 10:25:21 -0500 Subject: [PATCH 4/5] Fix CI checkpoint cache invalidation on code changes Scope checkpoint paths by commit SHA so new commits rebuild from scratch instead of restoring stale H5 files from previous builds. Co-Authored-By: Claude Opus 4.6 --- modal_app/data_build.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/modal_app/data_build.py b/modal_app/data_build.py index f3c2191b..fd5237c1 100644 --- a/modal_app/data_build.py +++ b/modal_app/data_build.py @@ -84,9 +84,15 @@ def setup_gcp_credentials(): return None +def get_current_commit() -> str: + """Get the current git commit SHA.""" + return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip() + + def get_checkpoint_path(branch: str, output_file: str) -> Path: - """Get the checkpoint path for an output file, scoped by branch.""" - return Path(VOLUME_MOUNT) / branch / Path(output_file).name + """Get the checkpoint path for an output file, scoped by branch and commit.""" + commit = get_current_commit() + return Path(VOLUME_MOUNT) / branch / commit / Path(output_file).name def is_checkpointed(branch: str, output_file: str) -> bool: @@ -224,7 +230,8 @@ def run_tests_with_checkpoints( Raises: RuntimeError: If any test module fails. """ - checkpoint_dir = Path(VOLUME_MOUNT) / branch / "tests" + commit = get_current_commit() + checkpoint_dir = Path(VOLUME_MOUNT) / branch / commit / "tests" checkpoint_dir.mkdir(parents=True, exist_ok=True) for module in TEST_MODULES: @@ -293,6 +300,17 @@ def build_datasets( os.chdir("/root") subprocess.run(["git", "clone", "-b", branch, REPO_URL], check=True) os.chdir("policyengine-us-data") + + # Clean stale checkpoints from other commits + branch_dir = Path(VOLUME_MOUNT) / branch + if branch_dir.exists(): + current_commit = get_current_commit() + for entry in branch_dir.iterdir(): + if entry.is_dir() and entry.name != current_commit: + shutil.rmtree(entry) + print(f"Removed stale checkpoint dir: {entry.name[:12]}") + checkpoint_volume.commit() + # Use uv sync to install exact versions from uv.lock subprocess.run(["uv", "sync", "--locked"], check=True) From 536b441d8c146092b642f7104bf02a88b02a442c Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Fri, 6 Mar 2026 12:48:29 -0500 Subject: [PATCH 5/5] Cache get_current_commit() and move gc import to top-level - Add @functools.cache to avoid repeated subprocess calls for the same commit SHA during a single build (~10+ calls reduced to 1) - Move inline `import gc` to top of loss.py for consistency Co-Authored-By: Claude Opus 4.6 --- modal_app/data_build.py | 4 +++- policyengine_us_data/utils/loss.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/modal_app/data_build.py b/modal_app/data_build.py index fd5237c1..df5e0dd7 100644 --- a/modal_app/data_build.py +++ b/modal_app/data_build.py @@ -1,3 +1,4 @@ +import functools import os import shutil import subprocess @@ -84,8 +85,9 @@ def setup_gcp_credentials(): return None +@functools.cache def get_current_commit() -> str: - """Get the current git commit SHA.""" + """Get the current git commit SHA (cached per process).""" return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip() diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index bb6ca505..bfbf49db 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -1,3 +1,5 @@ +import gc + import pandas as pd import numpy as np import logging @@ -654,8 +656,6 @@ def build_loss_matrix(dataset: type, time_period): loss_matrix = _add_snap_metric_columns(loss_matrix, sim) del sim, df - import gc - gc.collect() return loss_matrix, np.array(targets_array)