diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..b842f03a4 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,4 @@ +# Repository Agent Instructions + +- Open PRs from branches in `PolicyEngine/policyengine-us-data`, not forks. This repository's PR CI rejects fork PRs in `check-fork`, so push agent-created branches to the canonical `PolicyEngine/policyengine-us-data` remote before creating a PR. +- Do not put `[codex]` in PR titles. diff --git a/changelog.d/835.changed b/changelog.d/835.changed new file mode 100644 index 000000000..ca78826b1 --- /dev/null +++ b/changelog.d/835.changed @@ -0,0 +1 @@ +Compute CPS net worth from SCF-anchored balance-sheet components, including SCF/SIPP-blended vehicle values, instead of an SCF aggregate residual. diff --git a/docs/data.md b/docs/data.md index bbce2c4a5..d8c16d3a4 100644 --- a/docs/data.md +++ b/docs/data.md @@ -68,16 +68,26 @@ missing from the CPS: ### Survey of Income and Program Participation (SIPP) The SIPP provides income and program participation data. We use SIPP primarily to impute tip income -through a Quantile Regression Forest model trained on SIPP data, using employment income, age, and -household composition as predictors. +and policy-relevant asset inputs through Quantile Regression Forest models trained on SIPP data. +The asset imputations currently cover bank accounts, stocks, bonds, household vehicle counts, and +household vehicle values. Bank accounts, stocks, bonds, and household vehicle values are then +combined with comparable SCF predictions through a stable household-level 50/50 source-model draw. +These fields are not a complete household balance sheet; they are exposed so policy models can +select the resources that matter for a specific program. ### Survey of Consumer Finances (SCF) The SCF provides wealth and debt information that we use to impute several financial variables missing from the CPS. We match auto loan balances based on household demographics and income, then -calculate interest on auto loans from these imputed balances. Additionally, we impute various net -worth components and other wealth measures not available in CPS. The SCF imputation uses their -reference person definition to ensure proper matching. +calculate interest on auto loans from these imputed balances. We also impute the SCF balance-sheet +components needed to express `net_worth` as a formula: certificates of deposit, savings bonds, +retirement assets, cash-value life insurance, managed assets, other financial assets, home value, +other real estate, business equity, other nonfinancial assets, mortgages, other residential debt, +lines of credit, credit card debt, vehicle installment debt, student debt, other installment debt, +and other debt. We also impute a direct SCF net-worth anchor, then proportionally rebalance the +SCF-only leaves so `net_worth` remains a component formula while preserving the final +SIPP/SCF-blended policy leaves. The SCF imputation uses their reference person definition to ensure +proper matching. ### American Community Survey (ACS) diff --git a/docs/methodology.md b/docs/methodology.md index 730196ff7..a37356ac3 100644 --- a/docs/methodology.md +++ b/docs/methodology.md @@ -237,12 +237,26 @@ as a predictor, which allows the imputed values to reflect geographic variation rates and rent levels. **SIPP (Survey of Income and Program Participation)**: Tip income, bank account assets, stock -assets, bond assets. The SIPP lacks state identifiers, so these imputations are state-blind at the -microdata level — geographic variation in tip income and assets enters only through calibration -weights, not through the imputed values themselves. - -**SCF (Survey of Consumer Finances)**: Net worth, auto loan balances, auto loan interest. The SCF -also lacks state identifiers, so these imputations are likewise state-blind. +assets, bond assets, household vehicle counts, and household vehicle values. The SIPP lacks state +identifiers, so these imputations are state-blind at the microdata level - geographic variation in +tip income and assets enters only through calibration weights, not through the imputed values +themselves. + +**SCF (Survey of Consumer Finances)**: Aggregate net worth, auto loan balances, auto loan interest, +and balance-sheet components needed to express net worth as a formula. The SCF also lacks state +identifiers, so these imputations are likewise state-blind. + +The asset fields are a mixed-source balance sheet. The SIPP liquid-asset and vehicle fields are +policy-relevant inputs in their own right. For overlapping bank-account, stock, bond, and vehicle +value variables, we use a stable household-level 50/50 source-model draw between the SIPP QRF +prediction and the comparable SCF QRF prediction, with a single draw shared across the asset block. +We then impute the non-overlapping SCF balance-sheet components - home value, mortgage debt, +retirement assets, business equity, other real estate, other financial assets, other debts, and +related categories including vehicle, student, and other installment debt. Because independently +imputed leaves do not preserve the SCF balance-sheet covariance exactly, we impute a direct SCF net +worth anchor and proportionally rebalance the SCF-only leaves to that anchor. This gives downstream +code a direct component formula without an accounting residual while preserving resource-tested +policy leaves. The output of this stage is the source-imputed stratified CPS (`source_imputed_stratified_extended_cps_2024.h5`), which serves as the input to the diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index a543eff4a..6a605a80a 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -12,7 +12,9 @@ household_vehicles_value (no state predictor) ORG -> hourly_wage, is_paid_hourly, is_union_member_or_covered - SCF -> net_worth, auto_loan_balance, auto_loan_interest + SCF -> net_worth, auto_loan_balance, auto_loan_interest, + SCF-only balance-sheet components, and + 50/50 source-model averaging for overlapping financial assets (no state predictor) Usage in unified calibration pipeline: @@ -45,7 +47,22 @@ predict_org_features, ) from policyengine_us_data.utils.asset_imputation import ( + SCF_NET_WORTH_TARGET, + SCF_FINANCIAL_ASSET_POLICY_VARIABLES, + SCF_HOUSEHOLD_ASSET_POLICY_VARIABLES, + SCF_NET_WORTH_COMPONENT_VARIABLES, + add_scf_financial_asset_targets, + add_scf_household_asset_targets, + add_scf_net_worth_target, + add_scf_net_worth_component_targets, + aggregate_person_values_to_reference_households, + align_household_values_to_reference_households, build_household_vehicle_receiver, + combine_sipp_and_scf_financial_assets, + combine_sipp_and_scf_household_assets, + compute_net_worth_from_components, + rebalance_scf_net_worth_components, + require_scf_net_worth_formula_targets, ) logger = logging.getLogger(__name__) @@ -64,12 +81,17 @@ "household_vehicles_value", ] -SCF_IMPUTED_VARIABLES = [ - "net_worth", +SCF_CORE_IMPUTED_VARIABLES = [ "auto_loan_balance", "auto_loan_interest", ] +SCF_IMPUTED_VARIABLES = [ + "net_worth", + *SCF_CORE_IMPUTED_VARIABLES, + *SCF_NET_WORTH_COMPONENT_VARIABLES, +] + ALL_SOURCE_VARIABLES = ( ACS_IMPUTED_VARIABLES + SIPP_IMPUTED_VARIABLES @@ -763,17 +785,32 @@ def _impute_scf( logger.warning("SCF missing predictors: %s", missing_preds) scf_predictors = available_preds - if "networth" in scf_df.columns and "net_worth" not in scf_df.columns: - scf_df["net_worth"] = scf_df["networth"] + scf_net_worth_targets = add_scf_net_worth_target(scf_df) + scf_financial_asset_targets = add_scf_financial_asset_targets(scf_df) + scf_household_asset_targets = add_scf_household_asset_targets(scf_df) + scf_component_targets = add_scf_net_worth_component_targets(scf_df) + require_scf_net_worth_formula_targets( + scf_financial_asset_targets=scf_financial_asset_targets, + scf_household_asset_targets=scf_household_asset_targets, + scf_component_targets=scf_component_targets, + scf_net_worth_targets=scf_net_worth_targets, + ) - available_vars = [v for v in SCF_IMPUTED_VARIABLES if v in scf_df.columns] - if not available_vars: + available_vars = [v for v in SCF_CORE_IMPUTED_VARIABLES if v in scf_df.columns] + qrf_vars = ( + [v for v in scf_net_worth_targets if v in scf_df.columns] + + available_vars + + [v for v in scf_financial_asset_targets if v in scf_df.columns] + + [v for v in scf_household_asset_targets if v in scf_df.columns] + + [v for v in scf_component_targets if v in scf_df.columns] + ) + if not qrf_vars: logger.warning("No SCF imputed variables available. Skipping.") return data weights = scf_df.get("wgt") - donor = scf_df[scf_predictors + available_vars].copy() + donor = scf_df[scf_predictors + qrf_vars].copy() if weights is not None: donor["wgt"] = weights donor = donor.dropna(subset=scf_predictors) @@ -834,12 +871,12 @@ def _impute_scf( "SCF QRF: %d train, %d test, vars=%s", len(donor), len(cps_df), - available_vars, + qrf_vars, ) fitted = qrf.fit( X_train=donor, predictors=scf_predictors, - imputed_variables=available_vars, + imputed_variables=qrf_vars, weight_col="wgt" if weights is not None else None, tune_hyperparameters=False, ) @@ -870,10 +907,82 @@ def _impute_scf( else: data[var] = {time_period: person_vals} + person_hh_ids = data.get("person_household_id", {}).get(time_period) + if person_hh_ids is not None: + first_person_mask = ~pd.Series(person_hh_ids).duplicated().values + reference_household_ids = person_hh_ids[first_person_mask] + for var in SCF_NET_WORTH_COMPONENT_VARIABLES: + if var in preds: + data[var] = { + time_period: preds.loc[first_person_mask, var].values.astype( + np.float32 + ) + } + for scf_var, policy_var in SCF_FINANCIAL_ASSET_POLICY_VARIABLES.items(): + if scf_var not in preds or policy_var not in data: + continue + data[policy_var] = { + time_period: combine_sipp_and_scf_financial_assets( + sipp_values=data[policy_var][time_period], + scf_household_values=preds.loc[first_person_mask, scf_var].values, + person_household_ids=person_hh_ids, + reference_person_mask=first_person_mask, + time_period=time_period, + ) + } + for scf_var, policy_var in SCF_HOUSEHOLD_ASSET_POLICY_VARIABLES.items(): + if scf_var not in preds or policy_var not in data: + continue + data[policy_var] = { + time_period: combine_sipp_and_scf_household_assets( + sipp_household_values=data[policy_var][time_period], + scf_household_values=preds.loc[first_person_mask, scf_var].values, + household_ids=hh_ids, + reference_household_ids=reference_household_ids, + time_period=time_period, + ) + } + net_worth_components = {} + for var in ("bank_account_assets", "stock_assets", "bond_assets"): + if var in data: + net_worth_components[var] = ( + aggregate_person_values_to_reference_households( + data[var][time_period], + person_hh_ids, + first_person_mask, + ) + ) + if "household_vehicles_value" in data: + net_worth_components["household_vehicles_value"] = ( + align_household_values_to_reference_households( + data["household_vehicles_value"][time_period], + hh_ids, + reference_household_ids, + ) + ) + for var in SCF_NET_WORTH_COMPONENT_VARIABLES: + if var in data: + net_worth_components[var] = data[var][time_period] + if SCF_NET_WORTH_TARGET in preds: + net_worth_components = rebalance_scf_net_worth_components( + components=net_worth_components, + target_net_worth=preds.loc[ + first_person_mask, SCF_NET_WORTH_TARGET + ].values.astype(np.float32), + ) + for var in SCF_NET_WORTH_COMPONENT_VARIABLES: + if var in net_worth_components: + data[var] = {time_period: net_worth_components[var]} + data["net_worth"] = { + time_period: compute_net_worth_from_components( + components=net_worth_components, + ) + } + del fitted, preds gc.collect() - logger.info("SCF imputation complete: %s", available_vars) + logger.info("SCF imputation complete: %s", SCF_IMPUTED_VARIABLES) return data diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index d12ba7eef..7b054dfda 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -48,7 +48,22 @@ reported_subsidized_marketplace_by_tax_unit, ) from policyengine_us_data.utils.asset_imputation import ( + SCF_NET_WORTH_TARGET, + SCF_FINANCIAL_ASSET_POLICY_VARIABLES, + SCF_HOUSEHOLD_ASSET_POLICY_VARIABLES, + SCF_NET_WORTH_COMPONENT_VARIABLES, + add_scf_financial_asset_targets, + add_scf_household_asset_targets, + add_scf_net_worth_target, + add_scf_net_worth_component_targets, + aggregate_person_values_to_reference_households, + align_household_values_to_reference_households, build_household_vehicle_receiver, + combine_sipp_and_scf_financial_assets, + combine_sipp_and_scf_household_assets, + compute_net_worth_from_components, + rebalance_scf_net_worth_components, + require_scf_net_worth_formula_targets, ) from policyengine_us_data.utils.policyengine import ( supports_medicare_enrollment_input, @@ -2156,7 +2171,9 @@ def add_tips(self, cps: h5py.File): mean_quantile=0.5, ).tip_income.values - # Impute liquid assets from SIPP (bank accounts, stocks, bonds) + # Impute SIPP liquid assets used directly by resource-tested policy rules. + # The SCF step below applies a stable 50/50 source-model draw for the + # overlapping bank, stock, and bond leaves. from policyengine_us_data.datasets.sipp import get_asset_model @@ -2315,7 +2332,8 @@ def add_overtime_occupation(cps: h5py.File, person: DataFrame) -> None: def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None: """ "Add auto loan balance, interest and net_worth variable.""" self.save_dataset(cps) - cps_data = self.load_dataset() + full_cps_data = self.load_dataset() + cps_data = dict(full_cps_data) # Access raw CPS for additional variables with _open_dataset_read_only(self.raw_cps) as raw_data: @@ -2473,6 +2491,7 @@ def determine_reference_person(group): mask = create_scf_reference_person_mask(cps_data, person_data) mask_len = mask.shape[0] + original_person_household_ids = np.asarray(cps_data["person_household_id"]) cps_data = { var: data[mask] if data.shape[0] == mask_len else data @@ -2543,7 +2562,9 @@ def determine_reference_person(group): reference_persons = person_data[mask] receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values - # Impute auto loan balance from the SCF + # Impute selected auto-loan fields, SCF equivalents for overlapping + # financial asset leaves, and SCF-only balance-sheet leaves. We compute + # net_worth from those components rather than storing an SCF aggregate. from policyengine_us_data.datasets.scf.scf import SCF_2022 scf_dataset = SCF_2022() @@ -2560,7 +2581,26 @@ def determine_reference_person(group): "interest_dividend_income", "social_security_pension_income", ] - IMPUTED_VARIABLES = ["networth", "auto_loan_balance", "auto_loan_interest"] + scf_net_worth_targets = add_scf_net_worth_target(scf_data) + scf_financial_asset_targets = add_scf_financial_asset_targets(scf_data) + scf_household_asset_targets = add_scf_household_asset_targets(scf_data) + scf_component_targets = add_scf_net_worth_component_targets(scf_data) + require_scf_net_worth_formula_targets( + scf_financial_asset_targets=scf_financial_asset_targets, + scf_household_asset_targets=scf_household_asset_targets, + scf_component_targets=scf_component_targets, + scf_net_worth_targets=scf_net_worth_targets, + ) + IMPUTED_VARIABLES = ( + list(scf_net_worth_targets) + + [ + "auto_loan_balance", + "auto_loan_interest", + ] + + list(scf_financial_asset_targets) + + list(scf_household_asset_targets) + + list(scf_component_targets) + ) weights = ["wgt"] donor_data = scf_data[PREDICTORS + IMPUTED_VARIABLES + weights].copy() @@ -2587,10 +2627,86 @@ def determine_reference_person(group): imputations = fitted_model.predict(X_test=receiver_data) for var in IMPUTED_VARIABLES: + if var == SCF_NET_WORTH_TARGET: + continue cps[var] = imputations[var] - cps["net_worth"] = cps["networth"] - del cps["networth"] + for scf_var, policy_var in SCF_FINANCIAL_ASSET_POLICY_VARIABLES.items(): + if scf_var not in imputations or policy_var not in full_cps_data: + continue + blended_values = combine_sipp_and_scf_financial_assets( + sipp_values=full_cps_data[policy_var], + scf_household_values=imputations[scf_var].values, + person_household_ids=original_person_household_ids, + reference_person_mask=mask, + time_period=self.time_period, + ) + full_cps_data[policy_var] = blended_values + if policy_var in cps: + del cps[policy_var] + cps[policy_var] = blended_values + if scf_var in cps: + del cps[scf_var] + + reference_household_ids = original_person_household_ids[mask] + for scf_var, policy_var in SCF_HOUSEHOLD_ASSET_POLICY_VARIABLES.items(): + if ( + scf_var not in imputations + or policy_var not in full_cps_data + or "household_id" not in full_cps_data + ): + continue + blended_values = combine_sipp_and_scf_household_assets( + sipp_household_values=full_cps_data[policy_var], + scf_household_values=imputations[scf_var].values, + household_ids=full_cps_data["household_id"], + reference_household_ids=reference_household_ids, + time_period=self.time_period, + ) + full_cps_data[policy_var] = blended_values + if policy_var in cps: + del cps[policy_var] + cps[policy_var] = blended_values + if scf_var in cps: + del cps[scf_var] + + net_worth_components = {} + for variable in ("bank_account_assets", "stock_assets", "bond_assets"): + if variable in full_cps_data: + net_worth_components[variable] = ( + aggregate_person_values_to_reference_households( + full_cps_data[variable], + original_person_household_ids, + mask, + ) + ) + if "household_vehicles_value" in full_cps_data and "household_id" in full_cps_data: + net_worth_components["household_vehicles_value"] = ( + align_household_values_to_reference_households( + full_cps_data["household_vehicles_value"], + full_cps_data["household_id"], + reference_household_ids, + ) + ) + for variable in SCF_NET_WORTH_COMPONENT_VARIABLES: + if variable in cps: + net_worth_components[variable] = cps[variable] + if SCF_NET_WORTH_TARGET in imputations: + net_worth_components = rebalance_scf_net_worth_components( + components=net_worth_components, + target_net_worth=imputations[SCF_NET_WORTH_TARGET].values, + ) + for variable in SCF_NET_WORTH_COMPONENT_VARIABLES: + if variable not in net_worth_components: + continue + if variable in cps: + del cps[variable] + cps[variable] = net_worth_components[variable] + if "net_worth" in cps: + del cps["net_worth"] + cps["net_worth"] = compute_net_worth_from_components( + components=net_worth_components + ) self.save_dataset(cps) diff --git a/policyengine_us_data/utils/asset_imputation.py b/policyengine_us_data/utils/asset_imputation.py index 60c1572e0..ee3f34cb6 100644 --- a/policyengine_us_data/utils/asset_imputation.py +++ b/policyengine_us_data/utils/asset_imputation.py @@ -1,7 +1,505 @@ +from dataclasses import dataclass +import hashlib +from typing import Mapping, Sequence + import numpy as np import pandas as pd +SIPP_LIQUID_ASSET_VARIABLES = ( + "bank_account_assets", + "stock_assets", + "bond_assets", +) +SIPP_VEHICLE_ASSET_VARIABLES = ("household_vehicles_value",) +SCF_NET_WORTH_VARIABLE = "net_worth" +SCF_NET_WORTH_TARGET = "scf_net_worth" +SCF_NET_WORTH_TARGETS = { + SCF_NET_WORTH_TARGET: ("networth",), +} +SCF_FINANCIAL_ASSET_TARGETS = { + "scf_bank_account_assets": ("liq",), + "scf_stock_assets": ("stocks", "nmmf"), + "scf_bond_assets": ("bond",), +} +SCF_FINANCIAL_ASSET_POLICY_VARIABLES = { + "scf_bank_account_assets": "bank_account_assets", + "scf_stock_assets": "stock_assets", + "scf_bond_assets": "bond_assets", +} +SCF_HOUSEHOLD_ASSET_TARGETS = { + "scf_household_vehicles_value": ("vehic",), +} +SCF_HOUSEHOLD_ASSET_POLICY_VARIABLES = { + "scf_household_vehicles_value": "household_vehicles_value", +} +SCF_NET_WORTH_COMPONENT_TARGETS = { + "scf_certificates_of_deposit": ("cds",), + "scf_savings_bonds": ("savbnd",), + "scf_retirement_assets": ("retqliq",), + "scf_cash_value_life_insurance": ("cashli",), + "scf_other_managed_assets": ("othma",), + "scf_other_financial_assets": ("othfin",), + "scf_primary_residence_value": ("houses",), + "scf_other_residential_real_estate": ("oresre",), + "scf_nonresidential_real_estate_equity": ("nnresre",), + "scf_business_equity": ("bus",), + "scf_other_nonfinancial_assets": ("othnfin",), + "scf_mortgage_debt": ("mrthel",), + "scf_other_residential_debt": ("resdbt",), + "scf_other_lines_of_credit": ("othloc",), + "scf_credit_card_debt": ("ccbal",), + "scf_vehicle_installment_debt": ("veh_inst",), + "scf_student_loan_debt": ("edn_inst",), + "scf_other_installment_debt": ("oth_inst",), + "scf_other_debt": ("odebt",), +} +SCF_NET_WORTH_COMPONENT_VARIABLES = tuple(SCF_NET_WORTH_COMPONENT_TARGETS) +SCF_OTHER_ASSET_COMPONENT = "scf_other_financial_assets" +SCF_OTHER_DEBT_COMPONENT = "scf_other_debt" + +EXPOSED_NET_WORTH_COMPONENT_VARIABLES = ( + SIPP_LIQUID_ASSET_VARIABLES + + SIPP_VEHICLE_ASSET_VARIABLES + + SCF_NET_WORTH_COMPONENT_VARIABLES +) +NET_WORTH_COMPONENT_SIGNS = { + "auto_loan_balance": -1.0, + "scf_mortgage_debt": -1.0, + "scf_other_residential_debt": -1.0, + "scf_other_lines_of_credit": -1.0, + "scf_credit_card_debt": -1.0, + "scf_vehicle_installment_debt": -1.0, + "scf_student_loan_debt": -1.0, + "scf_other_installment_debt": -1.0, + "scf_other_debt": -1.0, +} +UNOBSERVED_NET_WORTH_COMPONENT_GROUPS = () +NET_WORTH_COMPONENTS_ARE_COMPLETE = True +FINANCIAL_ASSET_SOURCE_SCF_PROBABILITY = 0.5 + + +@dataclass(frozen=True) +class NetWorthReconciliationReport: + """Summary of a household-level net worth reconciliation check.""" + + components_are_complete: bool + available_component_variables: tuple[str, ...] + missing_component_variables: tuple[str, ...] + unobserved_component_groups: tuple[str, ...] + max_abs_difference: float | None + is_reconciled: bool | None + message: str + + +def check_household_net_worth_reconciliation( + data: Mapping[str, Sequence[float]], + *, + component_variables: Sequence[str] = EXPOSED_NET_WORTH_COMPONENT_VARIABLES, + net_worth_variable: str = SCF_NET_WORTH_VARIABLE, + component_signs: Mapping[str, float] = NET_WORTH_COMPONENT_SIGNS, + components_are_complete: bool = NET_WORTH_COMPONENTS_ARE_COMPLETE, + rtol: float = 1e-6, + atol: float = 1.0, +) -> NetWorthReconciliationReport: + """Check whether household net worth equals signed balance-sheet components. + + The current CPS asset fields use blended SIPP/SCF liquid assets plus SCF-only + balance-sheet components. They are intended to reconstruct net worth without + an accounting residual when aligned to household rows. + """ + component_variables = tuple(component_variables) + available_components = tuple( + variable for variable in component_variables if variable in data + ) + missing_components = tuple( + variable for variable in component_variables if variable not in data + ) + + if not components_are_complete: + return NetWorthReconciliationReport( + components_are_complete=False, + available_component_variables=available_components, + missing_component_variables=missing_components, + unobserved_component_groups=UNOBSERVED_NET_WORTH_COMPONENT_GROUPS, + max_abs_difference=None, + is_reconciled=None, + message=( + "Net worth component reconciliation was skipped because the " + "component set was marked incomplete." + ), + ) + + if net_worth_variable not in data: + raise KeyError(f"Missing net worth variable: {net_worth_variable}") + if missing_components: + raise KeyError( + "Cannot reconcile net worth with a complete component set because " + f"these component variables are missing: {', '.join(missing_components)}" + ) + + net_worth = np.asarray(data[net_worth_variable], dtype=float) + component_total = np.zeros_like(net_worth, dtype=float) + + for variable in component_variables: + values = np.asarray(data[variable], dtype=float) + if values.shape != net_worth.shape: + raise ValueError( + f"{variable} has shape {values.shape}, but {net_worth_variable} " + f"has shape {net_worth.shape}. Reconciliation data must already " + "be aligned to household rows." + ) + component_total += component_signs.get(variable, 1.0) * values + + difference = net_worth - component_total + max_abs_difference = ( + float(np.nanmax(np.abs(difference))) if difference.size else 0.0 + ) + is_reconciled = bool( + np.allclose(net_worth, component_total, rtol=rtol, atol=atol, equal_nan=True) + ) + + return NetWorthReconciliationReport( + components_are_complete=True, + available_component_variables=available_components, + missing_component_variables=(), + unobserved_component_groups=(), + max_abs_difference=max_abs_difference, + is_reconciled=is_reconciled, + message=( + "Net worth reconciles to the signed component variables." + if is_reconciled + else "Net worth does not reconcile to the signed component variables." + ), + ) + + +def add_scf_financial_asset_targets(scf: pd.DataFrame) -> tuple[str, ...]: + """Add SCF financial asset targets comparable to SIPP policy leaves.""" + return _add_scf_targets(scf, SCF_FINANCIAL_ASSET_TARGETS) + + +def add_scf_net_worth_target(scf: pd.DataFrame) -> tuple[str, ...]: + """Add a direct SCF net worth anchor target for component rebalancing.""" + return _add_scf_targets(scf, SCF_NET_WORTH_TARGETS) + + +def add_scf_net_worth_component_targets(scf: pd.DataFrame) -> tuple[str, ...]: + """Add SCF-only balance-sheet targets needed for a net worth formula.""" + return _add_scf_targets(scf, SCF_NET_WORTH_COMPONENT_TARGETS) + + +def add_scf_household_asset_targets(scf: pd.DataFrame) -> tuple[str, ...]: + """Add SCF asset targets comparable to household-level SIPP leaves.""" + return _add_scf_targets(scf, SCF_HOUSEHOLD_ASSET_TARGETS) + + +def require_scf_net_worth_formula_targets( + *, + scf_financial_asset_targets: Sequence[str], + scf_household_asset_targets: Sequence[str], + scf_component_targets: Sequence[str], + scf_net_worth_targets: Sequence[str] = (), +) -> None: + """Fail loudly if the SCF source cannot supply the formula leaves.""" + available_targets = ( + set(scf_financial_asset_targets) + | set(scf_household_asset_targets) + | set(scf_component_targets) + | set(scf_net_worth_targets) + ) + missing_targets = [ + target + for target in ( + *SCF_NET_WORTH_TARGETS, + *SCF_FINANCIAL_ASSET_TARGETS, + *SCF_HOUSEHOLD_ASSET_TARGETS, + *SCF_NET_WORTH_COMPONENT_TARGETS, + ) + if target not in available_targets + ] + if missing_targets: + raise KeyError( + "SCF data is missing source columns needed to build these net " + f"worth formula targets: {', '.join(missing_targets)}" + ) + + +def _add_scf_targets( + scf: pd.DataFrame, + target_map: Mapping[str, tuple[str, ...]], +) -> tuple[str, ...]: + added_targets = [] + for target, source_columns in target_map.items(): + if all(column in scf.columns for column in source_columns): + scf[target] = sum(scf[column].fillna(0) for column in source_columns) + added_targets.append(target) + return tuple(added_targets) + + +def _stable_unit_interval(key: str) -> float: + digest = hashlib.blake2b(key.encode("utf-8"), digest_size=8).digest() + return int.from_bytes(digest, "big") / 2**64 + + +def financial_asset_source_is_scf( + household_ids: Sequence, + *, + time_period: int, + probability: float = FINANCIAL_ASSET_SOURCE_SCF_PROBABILITY, +) -> np.ndarray: + """Return a stable 50/50 source-model draw for overlapping assets. + + The draw is at the household asset-block level, so bank accounts, stocks, + bonds, and vehicle value all come from the same source for a household. + """ + if not 0 <= probability <= 1: + raise ValueError("probability must be between 0 and 1") + + household_ids = np.asarray(household_ids) + draws_by_household = { + household_id: ( + _stable_unit_interval( + f"financial_asset_source:{time_period}:{household_id}" + ) + < probability + ) + for household_id in pd.unique(household_ids) + } + return np.array( + [draws_by_household[household_id] for household_id in household_ids], + dtype=bool, + ) + + +def combine_sipp_and_scf_financial_assets( + *, + sipp_values: Sequence[float], + scf_household_values: Sequence[float], + person_household_ids: Sequence, + reference_person_mask: Sequence[bool], + time_period: int, +) -> np.ndarray: + """Apply a stable 50/50 SIPP/SCF source draw to a person-level asset leaf.""" + sipp_values = np.asarray(sipp_values, dtype=np.float32) + scf_household_values = np.asarray(scf_household_values, dtype=np.float32) + person_household_ids = np.asarray(person_household_ids) + reference_person_mask = np.asarray(reference_person_mask, dtype=bool) + + if sipp_values.shape != person_household_ids.shape: + raise ValueError( + "sipp_values and person_household_ids must have the same shape" + ) + if reference_person_mask.shape != person_household_ids.shape: + raise ValueError( + "reference_person_mask and person_household_ids must have the same shape" + ) + if scf_household_values.shape[0] != reference_person_mask.sum(): + raise ValueError( + "scf_household_values must contain one value per reference person" + ) + + scf_person_values = np.zeros_like(sipp_values, dtype=np.float32) + scf_person_values[reference_person_mask] = scf_household_values + use_scf = financial_asset_source_is_scf( + person_household_ids, + time_period=time_period, + ) + return np.where(use_scf, scf_person_values, sipp_values).astype(np.float32) + + +def combine_sipp_and_scf_household_assets( + *, + sipp_household_values: Sequence[float], + scf_household_values: Sequence[float], + household_ids: Sequence, + reference_household_ids: Sequence, + time_period: int, +) -> np.ndarray: + """Apply the stable SIPP/SCF source draw to a household-level asset leaf.""" + sipp_household_values = np.asarray(sipp_household_values, dtype=np.float32) + scf_household_values = np.asarray(scf_household_values, dtype=np.float32) + household_ids = np.asarray(household_ids) + reference_household_ids = np.asarray(reference_household_ids) + + if sipp_household_values.shape != household_ids.shape: + raise ValueError("sipp_household_values and household_ids must align") + if scf_household_values.shape != reference_household_ids.shape: + raise ValueError("scf_household_values and reference_household_ids must align") + + scf_values = ( + pd.Series(scf_household_values, index=reference_household_ids) + .reindex(household_ids) + .fillna(0) + .to_numpy(dtype=np.float32) + ) + use_scf = financial_asset_source_is_scf( + household_ids, + time_period=time_period, + ) + return np.where(use_scf, scf_values, sipp_household_values).astype(np.float32) + + +def aggregate_person_values_to_reference_households( + person_values: Sequence[float], + person_household_ids: Sequence, + reference_person_mask: Sequence[bool], +) -> np.ndarray: + """Aggregate person values to households in reference-person order.""" + person_values = np.asarray(person_values, dtype=np.float32) + person_household_ids = np.asarray(person_household_ids) + reference_person_mask = np.asarray(reference_person_mask, dtype=bool) + reference_household_ids = person_household_ids[reference_person_mask] + totals = pd.Series(person_values).groupby(person_household_ids).sum() + return totals.reindex(reference_household_ids).fillna(0).to_numpy(dtype=np.float32) + + +def align_household_values_to_reference_households( + household_values: Sequence[float], + household_ids: Sequence, + reference_household_ids: Sequence, +) -> np.ndarray: + """Align household values from household-id order to reference-person order.""" + household_values = np.asarray(household_values, dtype=np.float32) + household_ids = np.asarray(household_ids) + reference_household_ids = np.asarray(reference_household_ids) + values = pd.Series(household_values, index=household_ids) + return values.reindex(reference_household_ids).fillna(0).to_numpy(dtype=np.float32) + + +def compute_net_worth_from_components( + *, + components: Mapping[str, Sequence[float]], + component_signs: Mapping[str, float] = NET_WORTH_COMPONENT_SIGNS, +) -> np.ndarray: + """Compute household net worth from signed balance-sheet components.""" + iterator = iter(components.items()) + try: + first_variable, first_values = next(iterator) + except StopIteration: + return np.array([], dtype=np.float32) + + first_values = np.asarray(first_values, dtype=np.float32) + component_total = (component_signs.get(first_variable, 1.0) * first_values).astype( + np.float32 + ) + + for variable, values in iterator: + values = np.asarray(values, dtype=np.float32) + if values.shape != component_total.shape: + raise ValueError( + f"{variable} has shape {values.shape}, but expected " + f"{component_total.shape}." + ) + component_total += component_signs.get(variable, 1.0) * values + + return component_total.astype(np.float32) + + +def rebalance_scf_net_worth_components( + *, + components: Mapping[str, Sequence[float]], + target_net_worth: Sequence[float], + adjustable_variables: Sequence[str] = SCF_NET_WORTH_COMPONENT_VARIABLES, + protected_variables: Sequence[str] = ( + SIPP_LIQUID_ASSET_VARIABLES + SIPP_VEHICLE_ASSET_VARIABLES + ), + component_signs: Mapping[str, float] = NET_WORTH_COMPONENT_SIGNS, +) -> dict[str, np.ndarray]: + """Rebalance SCF-only leaves so the component formula matches net worth. + + Component QRFs are fit sequentially but still predict each leaf separately, + so their sum can drift from the direct SCF net worth distribution. Preserve + the final SIPP/SCF-blended policy leaves and proportionally scale SCF-only + same-sign leaves to the direct SCF net worth anchor. + """ + adjusted = { + variable: np.asarray(values, dtype=np.float32).copy() + for variable, values in components.items() + } + if not adjusted: + return adjusted + + target_net_worth = np.asarray(target_net_worth, dtype=np.float32) + first_shape = target_net_worth.shape + for variable, values in adjusted.items(): + if values.shape != first_shape: + raise ValueError( + f"{variable} has shape {values.shape}, but target_net_worth " + f"has shape {first_shape}." + ) + + protected_variables = set(protected_variables) + adjustable_variables = tuple( + variable + for variable in adjustable_variables + if variable in adjusted and variable not in protected_variables + ) + if not adjustable_variables: + return adjusted + + fixed_total = np.zeros_like(target_net_worth, dtype=np.float32) + for variable, values in adjusted.items(): + if variable not in adjustable_variables: + fixed_total += component_signs.get(variable, 1.0) * values + + asset_variables = [ + variable + for variable in adjustable_variables + if component_signs.get(variable, 1.0) >= 0 + ] + debt_variables = [ + variable + for variable in adjustable_variables + if component_signs.get(variable, 1.0) < 0 + ] + + asset_total = np.zeros_like(target_net_worth, dtype=np.float32) + for variable in asset_variables: + asset_total += adjusted[variable] + + debt_total = np.zeros_like(target_net_worth, dtype=np.float32) + for variable in debt_variables: + debt_total += adjusted[variable] + + desired_adjustable_total = target_net_worth - fixed_total + positive_target = desired_adjustable_total >= 0 + + required_assets = np.maximum(desired_adjustable_total + debt_total, 0) + asset_scale = np.divide( + required_assets, + asset_total, + out=np.ones_like(required_assets, dtype=np.float32), + where=(asset_total > 0) & positive_target, + ) + for variable in asset_variables: + adjusted[variable][positive_target] *= asset_scale[positive_target] + + needs_asset_fallback = positive_target & (asset_total <= 0) & (required_assets > 0) + if needs_asset_fallback.any() and SCF_OTHER_ASSET_COMPONENT in adjusted: + adjusted[SCF_OTHER_ASSET_COMPONENT][needs_asset_fallback] = required_assets[ + needs_asset_fallback + ] + + required_debts = np.maximum(asset_total - desired_adjustable_total, 0) + debt_scale = np.divide( + required_debts, + debt_total, + out=np.ones_like(required_debts, dtype=np.float32), + where=(debt_total > 0) & ~positive_target, + ) + for variable in debt_variables: + adjusted[variable][~positive_target] *= debt_scale[~positive_target] + + needs_debt_fallback = (~positive_target) & (debt_total <= 0) & (required_debts > 0) + if needs_debt_fallback.any() and SCF_OTHER_DEBT_COMPONENT in adjusted: + adjusted[SCF_OTHER_DEBT_COMPONENT][needs_debt_fallback] = required_debts[ + needs_debt_fallback + ] + + return adjusted + + def build_household_vehicle_receiver( person_df: pd.DataFrame, tenure_type: np.ndarray | None = None, diff --git a/tests/unit/calibration/test_source_impute.py b/tests/unit/calibration/test_source_impute.py index 7324351e6..188141753 100644 --- a/tests/unit/calibration/test_source_impute.py +++ b/tests/unit/calibration/test_source_impute.py @@ -87,6 +87,9 @@ def test_scf_variables_defined(self): assert "net_worth" in SCF_IMPUTED_VARIABLES assert "auto_loan_balance" in SCF_IMPUTED_VARIABLES assert "auto_loan_interest" in SCF_IMPUTED_VARIABLES + assert "scf_retirement_assets" in SCF_IMPUTED_VARIABLES + assert "scf_vehicle_installment_debt" in SCF_IMPUTED_VARIABLES + assert "scf_mortgage_debt" in SCF_IMPUTED_VARIABLES def test_org_variables_defined(self): assert "hourly_wage" in ORG_IMPUTED_VARIABLES diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index db5add17f..eeaecc5e6 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -9,6 +9,10 @@ add_auto_loan_interest_and_net_worth, add_previous_year_income, ) +from policyengine_us_data.utils.asset_imputation import ( + combine_sipp_and_scf_financial_assets, + financial_asset_source_is_scf, +) class _FakeStore: @@ -157,17 +161,23 @@ class FakeDataset: def __init__(self): self.raw_cps = FakeRawCPS() self.saved_dataset = None + self.time_period = 2024 def save_dataset(self, data): self.saved_dataset = data def load_dataset(self): return { + "household_id": np.array([10, 20]), "person_household_id": np.array([10, 20]), "age": np.array([35, 40]), "is_female": np.array([False, True]), "cps_race": np.array([1, 2]), "own_children_in_household": np.array([0, 1]), + "bank_account_assets": np.array([1_000.0, 2_000.0]), + "stock_assets": np.array([300.0, 400.0]), + "bond_assets": np.array([50.0, 60.0]), + "household_vehicles_value": np.array([5_000.0, 1_000.0]), "employment_income": np.array([40_000.0, 25_000.0]), "taxable_interest_income": np.array([100.0, 0.0]), "tax_exempt_interest_income": np.array([0.0, 0.0]), @@ -189,7 +199,31 @@ def load_dataset(self): "employment_income": np.array([35_000.0, 20_000.0]), "interest_dividend_income": np.array([100.0, 50.0]), "social_security_pension_income": np.array([0.0, 0.0]), - "networth": np.array([10_000.0, 5_000.0]), + "networth": np.array([100_000.0, 80_000.0]), + "liq": np.array([10_000.0, 20_000.0]), + "stocks": np.array([100.0, 200.0]), + "nmmf": np.array([1.0, 2.0]), + "bond": np.array([10.0, 20.0]), + "vehic": np.array([3_000.0, 2_000.0]), + "cds": np.array([12_000.0, 6_000.0]), + "savbnd": np.array([0.0, 0.0]), + "retqliq": np.array([0.0, 0.0]), + "cashli": np.array([0.0, 0.0]), + "othma": np.array([0.0, 0.0]), + "othfin": np.array([0.0, 0.0]), + "houses": np.array([0.0, 0.0]), + "oresre": np.array([0.0, 0.0]), + "nnresre": np.array([0.0, 0.0]), + "bus": np.array([0.0, 0.0]), + "othnfin": np.array([0.0, 0.0]), + "mrthel": np.array([0.0, 0.0]), + "resdbt": np.array([0.0, 0.0]), + "othloc": np.array([0.0, 0.0]), + "ccbal": np.array([0.0, 0.0]), + "veh_inst": np.array([2_000.0, 1_000.0]), + "edn_inst": np.array([0.0, 0.0]), + "oth_inst": np.array([0.0, 0.0]), + "odebt": np.array([0.0, 0.0]), "auto_loan_balance": np.array([2_000.0, 1_000.0]), "auto_loan_interest": np.array([200.0, 100.0]), "wgt": np.array([1.0, 1.0]), @@ -211,13 +245,21 @@ def fit( def predict(self, X_test): assert X_test["is_married"].tolist() == [True, False] - return pd.DataFrame( + values = {var: [0.0, 0.0] for var in self.imputed_variables} + values.update( { - "networth": [10_000.0, 5_000.0], + "scf_certificates_of_deposit": [12_000.0, 6_000.0], + "scf_net_worth": [100_000.0, 80_000.0], + "scf_bank_account_assets": [10_000.0, 20_000.0], + "scf_stock_assets": [101.0, 202.0], + "scf_bond_assets": [10.0, 20.0], + "scf_household_vehicles_value": [3_000.0, 2_000.0], + "scf_vehicle_installment_debt": [2_000.0, 1_000.0], "auto_loan_balance": [2_000.0, 1_000.0], "auto_loan_interest": [200.0, 100.0], } ) + return pd.DataFrame(values) import policyengine_us_data.datasets.scf.scf as scf_module import microimpute.models.qrf as qrf_module @@ -229,8 +271,44 @@ def predict(self, X_test): add_auto_loan_interest_and_net_worth(dataset, {}) assert raw_store.closed is True + vehicle_values = np.where( + financial_asset_source_is_scf(np.array([10, 20]), time_period=2024), + [3_000.0, 2_000.0], + [5_000.0, 1_000.0], + ) + bank_assets = combine_sipp_and_scf_financial_assets( + sipp_values=np.array([1_000.0, 2_000.0]), + scf_household_values=np.array([10_000.0, 20_000.0]), + person_household_ids=np.array([10, 20]), + reference_person_mask=np.array([True, True]), + time_period=2024, + ) + stock_assets = combine_sipp_and_scf_financial_assets( + sipp_values=np.array([300.0, 400.0]), + scf_household_values=np.array([101.0, 202.0]), + person_household_ids=np.array([10, 20]), + reference_person_mask=np.array([True, True]), + time_period=2024, + ) + bond_assets = combine_sipp_and_scf_financial_assets( + sipp_values=np.array([50.0, 60.0]), + scf_household_values=np.array([10.0, 20.0]), + person_household_ids=np.array([10, 20]), + reference_person_mask=np.array([True, True]), + time_period=2024, + ) + np.testing.assert_array_equal( + dataset.saved_dataset["net_worth"], + np.array([100_000.0, 80_000.0], dtype=np.float32), + ) + assert "scf_net_worth" not in dataset.saved_dataset + np.testing.assert_array_equal( + dataset.saved_dataset["bank_account_assets"], bank_assets + ) + np.testing.assert_array_equal(dataset.saved_dataset["stock_assets"], stock_assets) + np.testing.assert_array_equal(dataset.saved_dataset["bond_assets"], bond_assets) np.testing.assert_array_equal( - dataset.saved_dataset["net_worth"], [10_000.0, 5_000.0] + dataset.saved_dataset["household_vehicles_value"], vehicle_values ) np.testing.assert_array_equal( dataset.saved_dataset["auto_loan_interest"], [200.0, 100.0] diff --git a/tests/unit/test_asset_imputation.py b/tests/unit/test_asset_imputation.py index 6178450c2..6d5aa4b86 100644 --- a/tests/unit/test_asset_imputation.py +++ b/tests/unit/test_asset_imputation.py @@ -2,7 +2,20 @@ import pandas as pd from policyengine_us_data.utils.asset_imputation import ( + NET_WORTH_COMPONENTS_ARE_COMPLETE, + add_scf_financial_asset_targets, + add_scf_household_asset_targets, + add_scf_net_worth_target, + add_scf_net_worth_component_targets, + aggregate_person_values_to_reference_households, + align_household_values_to_reference_households, build_household_vehicle_receiver, + check_household_net_worth_reconciliation, + combine_sipp_and_scf_financial_assets, + combine_sipp_and_scf_household_assets, + compute_net_worth_from_components, + financial_asset_source_is_scf, + rebalance_scf_net_worth_components, ) @@ -37,3 +50,259 @@ def test_build_household_vehicle_receiver_aggregates_person_inputs(): assert receiver["reference_is_female"].tolist() == [1.0, 0.0] assert receiver["reference_is_married"].tolist() == [1.0, 0.0] assert receiver["is_homeowner"].tolist() == [1.0, 0.0] + + +def test_net_worth_reconciliation_can_be_explicitly_skipped(): + data = { + "net_worth": np.array([500_000.0]), + "bank_account_assets": np.array([10_000.0]), + "stock_assets": np.array([5_000.0]), + "bond_assets": np.array([1_000.0]), + "household_vehicles_value": np.array([15_000.0]), + "auto_loan_balance": np.array([2_000.0]), + } + + report = check_household_net_worth_reconciliation( + data, + components_are_complete=False, + ) + + assert NET_WORTH_COMPONENTS_ARE_COMPLETE is True + assert report.components_are_complete is False + assert report.is_reconciled is None + assert report.max_abs_difference is None + assert "marked incomplete" in report.message + + +def test_net_worth_reconciliation_checks_complete_household_components(): + data = { + "net_worth": np.array([125.0, -10.0]), + "bank_account_assets": np.array([100.0, 10.0]), + "stock_assets": np.array([50.0, 0.0]), + "auto_loan_balance": np.array([25.0, 20.0]), + } + + report = check_household_net_worth_reconciliation( + data, + component_variables=( + "bank_account_assets", + "stock_assets", + "auto_loan_balance", + ), + components_are_complete=True, + atol=0.0, + ) + + assert report.components_are_complete is True + assert report.is_reconciled is True + assert report.max_abs_difference == 0.0 + + +def test_net_worth_reconciliation_reports_complete_component_mismatch(): + data = { + "net_worth": np.array([126.0]), + "bank_account_assets": np.array([100.0]), + "stock_assets": np.array([50.0]), + "auto_loan_balance": np.array([25.0]), + } + + report = check_household_net_worth_reconciliation( + data, + component_variables=( + "bank_account_assets", + "stock_assets", + "auto_loan_balance", + ), + components_are_complete=True, + atol=0.0, + ) + + assert report.is_reconciled is False + assert report.max_abs_difference == 1.0 + + +def test_add_scf_financial_asset_targets_builds_sipp_comparable_columns(): + scf = pd.DataFrame( + { + "liq": [100.0, 200.0], + "stocks": [10.0, 20.0], + "nmmf": [1.0, 2.0], + "bond": [5.0, 6.0], + } + ) + + targets = add_scf_financial_asset_targets(scf) + + assert targets == ( + "scf_bank_account_assets", + "scf_stock_assets", + "scf_bond_assets", + ) + assert scf["scf_bank_account_assets"].tolist() == [100.0, 200.0] + assert scf["scf_stock_assets"].tolist() == [11.0, 22.0] + assert scf["scf_bond_assets"].tolist() == [5.0, 6.0] + + +def test_add_scf_household_asset_targets_builds_sipp_comparable_columns(): + scf = pd.DataFrame({"vehic": [12_000.0, 6_000.0]}) + + targets = add_scf_household_asset_targets(scf) + + assert targets == ("scf_household_vehicles_value",) + assert scf["scf_household_vehicles_value"].tolist() == [12_000.0, 6_000.0] + + +def test_add_scf_net_worth_target_builds_direct_anchor(): + scf = pd.DataFrame({"networth": [125_000.0, -10_000.0]}) + + targets = add_scf_net_worth_target(scf) + + assert targets == ("scf_net_worth",) + assert scf["scf_net_worth"].tolist() == [125_000.0, -10_000.0] + + +def test_add_scf_net_worth_component_targets_builds_formula_columns(): + scf = pd.DataFrame( + { + "cds": [1.0], + "savbnd": [1.5], + "retqliq": [2.0], + "cashli": [3.0], + "othma": [4.0], + "othfin": [5.0], + "houses": [100.0], + "oresre": [20.0], + "nnresre": [30.0], + "bus": [40.0], + "othnfin": [6.0], + "mrthel": [50.0], + "resdbt": [7.0], + "othloc": [8.0], + "ccbal": [9.0], + "veh_inst": [9.5], + "edn_inst": [10.0], + "oth_inst": [11.0], + "odebt": [13.0], + } + ) + + targets = add_scf_net_worth_component_targets(scf) + + assert "scf_savings_bonds" in targets + assert "scf_retirement_assets" in targets + assert "scf_vehicle_installment_debt" in targets + assert "scf_mortgage_debt" in targets + assert "scf_buy_now_pay_later_debt" not in targets + assert scf["scf_savings_bonds"].tolist() == [1.5] + assert scf["scf_retirement_assets"].tolist() == [2.0] + assert scf["scf_vehicle_installment_debt"].tolist() == [9.5] + assert scf["scf_mortgage_debt"].tolist() == [50.0] + + +def test_financial_asset_source_draw_is_household_stable(): + household_ids = np.array([10, 10, 20, 30]) + + first = financial_asset_source_is_scf(household_ids, time_period=2024) + second = financial_asset_source_is_scf(household_ids, time_period=2024) + + assert first.tolist() == second.tolist() + assert first[0] == first[1] + + +def test_combine_sipp_and_scf_financial_assets_preserves_household_scf_total(): + person_household_ids = np.array([10, 10, 20, 20]) + reference_person_mask = np.array([True, False, True, False]) + use_scf = financial_asset_source_is_scf( + person_household_ids, + time_period=2024, + ) + + combined = combine_sipp_and_scf_financial_assets( + sipp_values=np.array([1.0, 2.0, 3.0, 4.0]), + scf_household_values=np.array([100.0, 200.0]), + person_household_ids=person_household_ids, + reference_person_mask=reference_person_mask, + time_period=2024, + ) + + for household_id, scf_total in [(10, 100.0), (20, 200.0)]: + household_mask = person_household_ids == household_id + if use_scf[household_mask][0]: + assert combined[household_mask].sum() == scf_total + assert combined[household_mask & ~reference_person_mask].sum() == 0.0 + else: + np.testing.assert_array_equal( + combined[household_mask], + np.array([1.0, 2.0, 3.0, 4.0])[household_mask], + ) + + +def test_combine_sipp_and_scf_household_assets_uses_same_source_draw(): + household_ids = np.array([10, 20]) + use_scf = financial_asset_source_is_scf(household_ids, time_period=2024) + + combined = combine_sipp_and_scf_household_assets( + sipp_household_values=np.array([1.0, 2.0]), + scf_household_values=np.array([100.0, 200.0]), + household_ids=household_ids, + reference_household_ids=np.array([10, 20]), + time_period=2024, + ) + + expected = np.where(use_scf, [100.0, 200.0], [1.0, 2.0]) + np.testing.assert_array_equal(combined, expected.astype(np.float32)) + + +def test_aggregate_and_align_household_components(): + person_household_ids = np.array([20, 10, 20, 10]) + reference_person_mask = np.array([True, True, False, False]) + + aggregated = aggregate_person_values_to_reference_households( + [1.0, 2.0, 3.0, 4.0], + person_household_ids, + reference_person_mask, + ) + aligned = align_household_values_to_reference_households( + household_values=[100.0, 200.0], + household_ids=np.array([10, 20]), + reference_household_ids=person_household_ids[reference_person_mask], + ) + + assert aggregated.tolist() == [4.0, 6.0] + assert aligned.tolist() == [200.0, 100.0] + + +def test_compute_net_worth_from_components_applies_signs(): + net_worth = compute_net_worth_from_components( + components={ + "bank_account_assets": np.array([100.0]), + "scf_retirement_assets": np.array([300.0]), + "auto_loan_balance": np.array([50.0]), + "scf_mortgage_debt": np.array([200.0]), + }, + ) + + assert net_worth.tolist() == [150.0] + + +def test_rebalance_scf_net_worth_components_preserves_policy_leaves(): + components = { + "bank_account_assets": np.array([100.0, 400.0]), + "scf_retirement_assets": np.array([300.0, 100.0]), + "scf_other_financial_assets": np.array([0.0, 0.0]), + "scf_other_debt": np.array([50.0, 0.0]), + } + + adjusted = rebalance_scf_net_worth_components( + components=components, + target_net_worth=np.array([200.0, 50.0]), + ) + + np.testing.assert_array_equal( + adjusted["bank_account_assets"], + np.array([100.0, 400.0], dtype=np.float32), + ) + net_worth = compute_net_worth_from_components(components=adjusted) + np.testing.assert_allclose(net_worth, np.array([200.0, 50.0])) + assert adjusted["scf_retirement_assets"].tolist() == [150.0, 100.0] + assert adjusted["scf_other_debt"].tolist() == [50.0, 450.0]