diff --git a/changelog.d/826.added.md b/changelog.d/826.added.md new file mode 100644 index 000000000..cf828b9a0 --- /dev/null +++ b/changelog.d/826.added.md @@ -0,0 +1 @@ +Preserve Forbes top-tail residence states through PUF local geography assignment. diff --git a/policyengine_us_data/calibration/clone_and_assign.py b/policyengine_us_data/calibration/clone_and_assign.py index cee43704a..6350e36f6 100644 --- a/policyengine_us_data/calibration/clone_and_assign.py +++ b/policyengine_us_data/calibration/clone_and_assign.py @@ -100,6 +100,7 @@ def assign_random_geography( household_agi: np.ndarray = None, cd_agi_targets: dict = None, agi_threshold_pctile: float = 90.0, + fixed_state_fips: np.ndarray = None, ) -> GeographyAssignment: """Assign random census block geography to cloned CPS records. @@ -114,12 +115,20 @@ def assign_random_geography( dataset. n_clones: Number of clones (default 10). seed: Random seed for reproducibility. + fixed_state_fips: Optional state FIPS per base record. Positive + values constrain every clone of that record to blocks in the + requested state; zero or missing values remain unrestricted. Returns: GeographyAssignment with arrays of length n_records * n_clones. """ blocks, cds, states, probs = load_global_block_distribution() + fixed_states = _validate_fixed_state_fips( + fixed_state_fips, + n_records=n_records, + available_states=states, + ) n_total = n_records * n_clones rng = np.random.default_rng(seed) @@ -137,7 +146,30 @@ def assign_random_geography( threshold, ) - def _sample(size, mask_slice=None): + state_draw_cache: dict[tuple[int, str], tuple[np.ndarray, np.ndarray]] = {} + + def _state_draw_inputs(state: int, probability_source: str): + key = (int(state), probability_source) + cached = state_draw_cache.get(key) + if cached is not None: + return cached + + state_indices = np.flatnonzero(states == state) + base_probs = agi_probs if probability_source == "agi" else probs + state_probs = base_probs[state_indices].astype(np.float64) + if not np.isfinite(state_probs).all() or state_probs.sum() <= 0: + state_probs = probs[state_indices].astype(np.float64) + if not np.isfinite(state_probs).all() or state_probs.sum() <= 0: + state_probs = np.ones(len(state_indices), dtype=np.float64) + state_probs = state_probs / state_probs.sum() + state_draw_cache[key] = (state_indices, state_probs) + return state_indices, state_probs + + def _sample_state(state: int, size: int, probability_source: str): + state_indices, state_probs = _state_draw_inputs(state, probability_source) + return rng.choice(state_indices, size=size, p=state_probs) + + def _sample_unrestricted(size, mask_slice=None): """Sample block indices, using AGI-weighted probs for extreme HHs.""" if ( extreme_mask is not None @@ -155,17 +187,53 @@ def _sample(size, mask_slice=None): return out return rng.choice(len(blocks), size=size, p=probs) + def _sample(size, mask_slice=None, fixed_slice=None): + out = np.empty(size, dtype=np.int64) + remaining = np.ones(size, dtype=bool) + + if fixed_slice is not None: + fixed_slice = np.asarray(fixed_slice, dtype=np.int32) + for state in np.unique(fixed_slice[fixed_slice > 0]): + state_mask = fixed_slice == state + if mask_slice is not None and agi_probs is not None: + extreme_state_mask = state_mask & mask_slice + normal_state_mask = state_mask & ~mask_slice + if extreme_state_mask.any(): + out[extreme_state_mask] = _sample_state( + int(state), + int(extreme_state_mask.sum()), + "agi", + ) + if normal_state_mask.any(): + out[normal_state_mask] = _sample_state( + int(state), + int(normal_state_mask.sum()), + "pop", + ) + else: + out[state_mask] = _sample_state( + int(state), + int(state_mask.sum()), + "pop", + ) + remaining[state_mask] = False + + if remaining.any(): + remaining_mask = mask_slice[remaining] if mask_slice is not None else None + out[remaining] = _sample_unrestricted(int(remaining.sum()), remaining_mask) + return out + indices = np.empty(n_total, dtype=np.int64) # Clone 0: unrestricted draw - indices[:n_records] = _sample(n_records, extreme_mask) + indices[:n_records] = _sample(n_records, extreme_mask, fixed_states) assigned_cds = np.empty((n_clones, n_records), dtype=object) assigned_cds[0] = cds[indices[:n_records]] for clone_idx in range(1, n_clones): start = clone_idx * n_records - clone_indices = _sample(n_records, extreme_mask) + clone_indices = _sample(n_records, extreme_mask, fixed_states) clone_cds = cds[clone_indices] collisions = np.zeros(n_records, dtype=bool) @@ -178,18 +246,11 @@ def _sample(size, mask_slice=None): break bad_mask = collisions if extreme_mask is not None and agi_probs is not None: - bad_ext = bad_mask & extreme_mask - bad_norm = bad_mask & ~extreme_mask - if bad_ext.sum() > 0: - clone_indices[bad_ext] = rng.choice( - len(blocks), size=bad_ext.sum(), p=agi_probs - ) - if bad_norm.sum() > 0: - clone_indices[bad_norm] = rng.choice( - len(blocks), size=bad_norm.sum(), p=probs - ) + replacement = _sample(n_records, extreme_mask, fixed_states) + clone_indices[bad_mask] = replacement[bad_mask] else: - clone_indices[collisions] = rng.choice(len(blocks), size=n_bad, p=probs) + replacement = _sample(n_records, fixed_slice=fixed_states) + clone_indices[collisions] = replacement[collisions] clone_cds = cds[clone_indices] collisions = np.zeros(n_records, dtype=bool) for prev in range(clone_idx): @@ -209,6 +270,44 @@ def _sample(size, mask_slice=None): ) +def _validate_fixed_state_fips( + fixed_state_fips: np.ndarray | None, + n_records: int, + available_states: np.ndarray, +) -> np.ndarray | None: + """Validate optional record-level state constraints.""" + + if fixed_state_fips is None: + return None + + fixed = np.asarray(fixed_state_fips) + if len(fixed) != n_records: + raise ValueError( + "fixed_state_fips must have one value per base record: " + f"got {len(fixed)} for {n_records} records." + ) + + fixed = np.nan_to_num(fixed.astype(float), nan=0.0).astype(np.int32) + positive = np.unique(fixed[fixed > 0]) + if len(positive) == 0: + return None + + available = set(np.asarray(available_states, dtype=np.int32).tolist()) + missing = [int(state) for state in positive if int(state) not in available] + if missing: + raise ValueError( + "fixed_state_fips contains states absent from the block " + f"distribution: {missing}" + ) + + logger.info( + "Preserving fixed state geography for %d of %d records", + int((fixed > 0).sum()), + n_records, + ) + return fixed + + def save_geography(geography: GeographyAssignment, path) -> None: """Save a GeographyAssignment to a compressed .npz file. diff --git a/policyengine_us_data/calibration/unified_calibration.py b/policyengine_us_data/calibration/unified_calibration.py index dbe5282d3..bf902a408 100644 --- a/policyengine_us_data/calibration/unified_calibration.py +++ b/policyengine_us_data/calibration/unified_calibration.py @@ -1054,6 +1054,80 @@ def compute_diagnostics( ) +def _raw_time_period_array( + raw_dataset: dict, + variable: str, + time_period: int, +) -> np.ndarray | None: + """Extract one variable array from a raw Dataset.load_dataset() dict.""" + + if variable not in raw_dataset: + return None + + values = raw_dataset[variable] + if isinstance(values, dict): + if time_period in values: + values = values[time_period] + elif str(time_period) in values: + values = values[str(time_period)] + else: + return None + + try: + return np.asarray(values[...]) + except (TypeError, ValueError): + return np.asarray(values) + + +def _extract_forbes_state_fips_overrides( + raw_dataset: dict, + time_period: int, + n_records: int, +) -> np.ndarray | None: + """Return fixed-state overrides for Forbes synthetic PUF households.""" + + from policyengine_us_data.datasets.puf.aggregate_record_utils import ( + SYNTHETIC_RECID_START, + ) + + household_id = _raw_time_period_array(raw_dataset, "household_id", time_period) + forbes_state_fips = _raw_time_period_array( + raw_dataset, + "forbes_state_fips", + time_period, + ) + if household_id is None or forbes_state_fips is None: + return None + if len(household_id) != n_records or len(forbes_state_fips) != n_records: + logger.info( + "Skipping Forbes fixed-state overrides because " + "household_id/forbes_state_fips " + "lengths do not match household records: %s/%s vs %s", + len(household_id), + len(forbes_state_fips), + n_records, + ) + return None + + forbes_state_fips = np.nan_to_num( + np.asarray(forbes_state_fips, dtype=float), + nan=0.0, + ).astype(np.int32) + household_id = np.asarray(household_id, dtype=float) + + fixed_mask = (forbes_state_fips > 0) & (household_id >= SYNTHETIC_RECID_START) + if not fixed_mask.any(): + return None + + fixed_state_fips = np.zeros(n_records, dtype=np.int32) + fixed_state_fips[fixed_mask] = forbes_state_fips[fixed_mask] + logger.info( + "Detected %d Forbes synthetic households with fixed state_fips", + int(fixed_mask.sum()), + ) + return fixed_state_fips + + def run_calibration( dataset_path: str, db_path: str, @@ -1193,7 +1267,8 @@ def run_calibration( logger.info("Loading dataset from %s", dataset_path) sim = Microsimulation(dataset=dataset_path) n_records = len(sim.calculate("household_id", map_to="household").values) - raw_keys = sim.dataset.load_dataset()["household_id"] + raw_dataset = sim.dataset.load_dataset() + raw_keys = raw_dataset["household_id"] if isinstance(raw_keys, dict): time_period = int(next(iter(raw_keys))) else: @@ -1221,6 +1296,11 @@ def run_calibration( "Loaded %d CD AGI targets for conditional assignment", len(cd_agi_targets), ) + fixed_state_fips = _extract_forbes_state_fips_overrides( + raw_dataset=raw_dataset, + time_period=time_period, + n_records=n_records, + ) # Step 2: Clone and assign geography logger.info( @@ -1235,6 +1315,7 @@ def run_calibration( seed=seed, household_agi=base_agi, cd_agi_targets=cd_agi_targets, + fixed_state_fips=fixed_state_fips, ) # Step 3: Source imputation (if requested) @@ -1245,7 +1326,7 @@ def run_calibration( base_states = geography.state_fips[:n_records] - raw_data = sim.dataset.load_dataset() + raw_data = raw_dataset data_dict = {} for var in raw_data: val = raw_data[var] diff --git a/policyengine_us_data/datasets/puf/disaggregate_puf.py b/policyengine_us_data/datasets/puf/disaggregate_puf.py index e7667dc28..51759f892 100644 --- a/policyengine_us_data/datasets/puf/disaggregate_puf.py +++ b/policyengine_us_data/datasets/puf/disaggregate_puf.py @@ -18,7 +18,10 @@ import numpy as np import pandas as pd from . import aggregate_record_utils as utils -from .forbes_backbone import build_forbes_top_tail_bucket +from .forbes_backbone import ( + FORBES_TOP_TAIL_METADATA_DEFAULTS, + build_forbes_top_tail_bucket, +) logger = logging.getLogger(__name__) @@ -60,6 +63,17 @@ def disaggregate_aggregate_records( if agg_mask.sum() == 0: return puf + if use_forbes_top_tail: + missing_metadata = [ + column + for column in FORBES_TOP_TAIL_METADATA_DEFAULTS + if column not in puf.columns + ] + if missing_metadata: + puf = puf.copy() + for column in missing_metadata: + puf[column] = FORBES_TOP_TAIL_METADATA_DEFAULTS[column] + agg_rows = puf[agg_mask].copy().set_index("RECID") regular = puf[~agg_mask].copy() amount_columns = _get_amount_columns(puf.columns) diff --git a/policyengine_us_data/datasets/puf/forbes_backbone.py b/policyengine_us_data/datasets/puf/forbes_backbone.py index c3f06f75c..b97c5f32c 100644 --- a/policyengine_us_data/datasets/puf/forbes_backbone.py +++ b/policyengine_us_data/datasets/puf/forbes_backbone.py @@ -28,6 +28,10 @@ from policyengine_us_data.datasets.puf import aggregate_record_utils as utils from policyengine_us_data.storage import STORAGE_FOLDER +from policyengine_us_data.utils.census import ( + STATE_ABBREV_TO_FIPS, + STATE_NAME_TO_FIPS, +) logger = logging.getLogger(__name__) @@ -48,6 +52,24 @@ f"forbes_us_top_400_{FORBES_DEFAULT_SNAPSHOT_DATE}_{FORBES_RTB_API_REF[:12]}.json" ) SCF_PACKAGED_DONOR_NAME = f"scf_forbes_donors_{FORBES_TOP_TAIL_SCF_YEAR}.json.gz" +FORBES_TOP_TAIL_METADATA_DEFAULTS = { + "forbes_alias": "", + "forbes_name": "", + "forbes_snapshot_date": "", + "forbes_marital_status": "", + "forbes_rank": 0, + "forbes_unit_id": -1, + "forbes_replicate_id": -1, + "forbes_age": 0, + "forbes_children": 0, + "forbes_state_fips": 0, +} +FORBES_STRING_METADATA_COLUMNS = { + "forbes_alias", + "forbes_name", + "forbes_snapshot_date", + "forbes_marital_status", +} SCF_JOINT_INCOME_COLUMNS = ( "wageinc", @@ -1498,7 +1520,7 @@ def apply_forbes_structural_overrides( synthetic: pd.DataFrame, forbes: pd.DataFrame, ) -> None: - """Set tax-unit structure directly from Forbes metadata where available.""" + """Set tax-unit structure and known state from Forbes metadata.""" if "MARS" in synthetic.columns: married = forbes["is_married"].fillna(False).to_numpy(dtype=bool) @@ -1520,6 +1542,76 @@ def apply_forbes_structural_overrides( if "EIC" in synthetic.columns: synthetic["EIC"] = 0 + _apply_forbes_metadata(synthetic, forbes) + + +def _apply_forbes_metadata( + synthetic: pd.DataFrame, + forbes: pd.DataFrame, +) -> None: + """Carry source Forbes metadata as household-level sidecar columns.""" + + string_sources = { + "forbes_alias": "alias", + "forbes_name": "name", + "forbes_snapshot_date": "snapshot_date", + "forbes_marital_status": "marital_status", + } + for target, source in string_sources.items(): + if source in forbes.columns: + synthetic[target] = forbes[source].fillna("").astype(str) + else: + synthetic[target] = FORBES_TOP_TAIL_METADATA_DEFAULTS[target] + + numeric_sources = { + "forbes_rank": "rank", + "forbes_unit_id": "forbes_unit_id", + "forbes_replicate_id": "replicate_id", + "forbes_age": "age", + "forbes_children": "children", + } + for target, source in numeric_sources.items(): + if source in forbes.columns: + synthetic[target] = ( + pd.to_numeric(forbes[source], errors="coerce") + .fillna(FORBES_TOP_TAIL_METADATA_DEFAULTS[target]) + .astype(int) + ) + else: + synthetic[target] = FORBES_TOP_TAIL_METADATA_DEFAULTS[target] + + if "residence_state" in forbes.columns: + synthetic["forbes_state_fips"] = forbes["residence_state"].map( + _resolve_state_fips, + ) + else: + synthetic["forbes_state_fips"] = FORBES_TOP_TAIL_METADATA_DEFAULTS[ + "forbes_state_fips" + ] + + +def _resolve_state_fips(value) -> int: + """Resolve a Forbes residence state name/abbreviation to integer FIPS.""" + + if value is None or pd.isna(value): + return 0 + + text = str(value).strip() + if not text: + return 0 + if text.isdigit(): + return int(text) + + fips = STATE_NAME_TO_FIPS.get(text) + if fips is not None: + return int(fips) + + fips = STATE_ABBREV_TO_FIPS.get(text.upper()) + if fips is not None: + return int(fips) + + return 0 + def _build_calibration_diagnostics( synthetic: pd.DataFrame, diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index dc89c4a9a..2d80e1343 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -13,6 +13,10 @@ from policyengine_us_data.datasets.puf.disaggregate_puf import ( disaggregate_aggregate_records, ) +from policyengine_us_data.datasets.puf.forbes_backbone import ( + FORBES_STRING_METADATA_COLUMNS, + FORBES_TOP_TAIL_METADATA_DEFAULTS, +) from policyengine_us_data.utils.mortgage_interest import ( STRUCTURAL_MORTGAGE_VARIABLES, convert_mortgage_interest_to_structural_inputs, @@ -58,6 +62,16 @@ def conditionally_sample_lognormal(flag, target_mean, log_sigma, rng): ) +def as_utf8_bytes_array(values): + """Return a fixed-width UTF-8 bytes array suitable for HDF5.""" + + encoded = [ + ("" if value is None else str(value)).encode("utf-8") for value in values + ] + width = max(1, *(len(value) for value in encoded)) + return np.array(encoded, dtype=f"S{width}") + + def simulate_w2_and_ubia_from_puf(puf, *, seed=None, diagnostics=True): """ Simulate two Section 199A guard-rail quantities for every record @@ -748,6 +762,9 @@ def generate(self): puf = impute_missing_demographics(puf, demographics) # Derive age and is_male for pension imputation predictors puf["age"] = puf["AGERANGE"].apply(decode_age_filer) + if "forbes_age" in puf.columns: + forbes_age = pd.to_numeric(puf["forbes_age"], errors="coerce").fillna(0) + puf["age"] = np.where(forbes_age > 0, forbes_age, puf["age"]) puf["is_male"] = (puf["GENDER"] == 1).astype(float) puf["pre_tax_contributions"] = impute_pension_contributions_to_puf( puf[["employment_income", "age", "is_male"]] @@ -787,7 +804,11 @@ def generate(self): "is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent", - ] + self.available_financial_vars + ] + for column in FORBES_TOP_TAIL_METADATA_DEFAULTS: + if column in puf.columns: + VARIABLES.append(column) + VARIABLES += self.available_financial_vars self.holder = {variable: [] for variable in VARIABLES} @@ -818,8 +839,8 @@ def generate(self): self.holder[f"person_{group}_id"] = self.holder["person_tax_unit_id"] for key in self.holder: - if key == "filing_status": - self.holder[key] = np.array(self.holder[key]).astype("S") + if key == "filing_status" or key in FORBES_STRING_METADATA_COLUMNS: + self.holder[key] = as_utf8_bytes_array(self.holder[key]) else: self.holder[key] = np.array(self.holder[key]).astype(float) assert not np.isnan(self.holder[key]).any(), f"{key} has NaNs." @@ -840,6 +861,9 @@ def generate(self): def add_tax_unit(self, row, tax_unit_id): self.holder["tax_unit_id"].append(tax_unit_id) + for key, default in FORBES_TOP_TAIL_METADATA_DEFAULTS.items(): + if key in self.holder: + self.holder[key].append(row.get(key, default)) for key in self.available_financial_vars: if self.variable_to_entity[key] == "tax_unit": @@ -872,7 +896,10 @@ def add_filer(self, row, tax_unit_id): self.holder["is_tax_unit_spouse"].append(False) self.holder["is_tax_unit_dependent"].append(False) - self.holder["age"].append(decode_age_filer(round(row["AGERANGE"]))) + age = row.get("age", np.nan) + if pd.isna(age) or age <= 0: + age = decode_age_filer(round(row["AGERANGE"])) + self.holder["age"].append(age) self.holder["household_weight"].append(row["household_weight"]) self.holder["is_male"].append(row["GENDER"] == 1) diff --git a/tests/unit/calibration/test_clone_and_assign.py b/tests/unit/calibration/test_clone_and_assign.py index e7e70e719..b47d54211 100644 --- a/tests/unit/calibration/test_clone_and_assign.py +++ b/tests/unit/calibration/test_clone_and_assign.py @@ -170,6 +170,39 @@ def test_no_cd_collisions_across_clones(self, mock_load): f"Record {rec} has duplicate CDs: {rec_cds}" ) + @patch( + "policyengine_us_data.calibration.clone_and_assign" + ".load_global_block_distribution" + ) + def test_fixed_state_fips_constrains_all_clones(self, mock_load): + mock_load.return_value = _mock_distribution() + fixed_state_fips = np.array([0, 36, 2], dtype=np.int32) + + r = assign_random_geography( + n_records=3, + n_clones=4, + seed=42, + fixed_state_fips=fixed_state_fips, + ) + + for clone in range(r.n_clones): + start = clone * r.n_records + assert r.state_fips[start + 1] == 36 + assert r.state_fips[start + 2] == 2 + + @patch( + "policyengine_us_data.calibration.clone_and_assign" + ".load_global_block_distribution" + ) + def test_fixed_state_fips_rejects_wrong_length(self, mock_load): + mock_load.return_value = _mock_distribution() + with pytest.raises(ValueError, match="one value per base record"): + assign_random_geography( + n_records=3, + n_clones=2, + fixed_state_fips=np.array([6, 36]), + ) + def test_missing_file_raises(self, tmp_path): fake = tmp_path / "nonexistent" fake.mkdir() diff --git a/tests/unit/calibration/test_unified_calibration.py b/tests/unit/calibration/test_unified_calibration.py index 9914fc6c1..e5f55f198 100644 --- a/tests/unit/calibration/test_unified_calibration.py +++ b/tests/unit/calibration/test_unified_calibration.py @@ -38,6 +38,44 @@ ) +class TestForbesStateOverrides: + def test_extracts_only_synthetic_puf_state_fips(self): + from policyengine_us_data.calibration.unified_calibration import ( + _extract_forbes_state_fips_overrides, + ) + + raw_dataset = { + "household_id": {2024: np.array([10, 1_000_000, 1_000_001])}, + "forbes_state_fips": {2024: np.array([6, 36, 0])}, + } + + result = _extract_forbes_state_fips_overrides( + raw_dataset=raw_dataset, + time_period=2024, + n_records=3, + ) + + np.testing.assert_array_equal(result, np.array([0, 36, 0])) + + def test_ignores_ordinary_positive_state_fips(self): + from policyengine_us_data.calibration.unified_calibration import ( + _extract_forbes_state_fips_overrides, + ) + + raw_dataset = { + "household_id": {2024: np.array([10, 20, 30])}, + "forbes_state_fips": {2024: np.array([6, 36, 48])}, + } + + result = _extract_forbes_state_fips_overrides( + raw_dataset=raw_dataset, + time_period=2024, + n_records=3, + ) + + assert result is None + + class TestRerandomizeTakeupSeeding: """Verify seeded_rng(var, salt=block) produces reproducible, block-dependent draws.""" diff --git a/tests/unit/datasets/test_disaggregate_puf.py b/tests/unit/datasets/test_disaggregate_puf.py index 49e86934e..25af5e44c 100644 --- a/tests/unit/datasets/test_disaggregate_puf.py +++ b/tests/unit/datasets/test_disaggregate_puf.py @@ -882,6 +882,32 @@ def test_forbes_bucket_uses_marital_metadata(self, mini_puf, forbes_result): assert (bucket.DSI == 0).all() assert (bucket.EIC == 0).all() + def test_forbes_bucket_preserves_residence_state_fips( + self, mini_puf, forbes_result + ): + bucket = _synthetic_bucket( + forbes_result, mini_puf, 999999, use_forbes_top_tail=True + ) + assert "forbes_state_fips" in forbes_result.columns + assert set(bucket.forbes_state_fips.unique()) == {6} + assert ( + forbes_result.loc[forbes_result.RECID < 999996, "forbes_state_fips"] == 0 + ).all() + + def test_forbes_bucket_preserves_source_metadata(self, mini_puf, forbes_result): + bucket = _synthetic_bucket( + forbes_result, mini_puf, 999999, use_forbes_top_tail=True + ) + + assert {"forbes_alias", "forbes_name", "forbes_age"}.issubset( + forbes_result.columns + ) + assert bucket["forbes_alias"].str.startswith("mock-").all() + assert bucket["forbes_name"].str.startswith("Mock Forbes").all() + assert bucket["forbes_age"].between(45, 69).all() + assert bucket["forbes_unit_id"].min() == 0 + assert set(bucket["forbes_replicate_id"].unique()) == set(range(10)) + def test_scf_joint_profiles_scale_ratios_to_forbes_wealth(self): from policyengine_us_data.datasets.puf.forbes_backbone import ( sample_scf_joint_profiles, @@ -1036,6 +1062,49 @@ def test_scf_pension_signal_maps_to_puf_pension_lines(self): assert selected["E01700"].iloc[0] == pytest.approx(600_000.0) +class TestPUFForbesMetadata: + def test_utf8_metadata_encoding_preserves_names(self): + from policyengine_us_data.datasets.puf.puf import as_utf8_bytes_array + + result = as_utf8_bytes_array(["Françoise", ""]) + + assert result[0].decode("utf-8") == "Françoise" + assert result[1].decode("utf-8") == "" + + def test_filer_age_uses_forbes_age_when_available(self): + from policyengine_us_data.datasets.puf.puf import PUF + + puf = PUF.__new__(PUF) + puf.holder = { + "person_id": [], + "person_tax_unit_id": [], + "person_marital_unit_id": [], + "marital_unit_id": [], + "is_tax_unit_head": [], + "is_tax_unit_spouse": [], + "is_tax_unit_dependent": [], + "age": [], + "household_weight": [], + "is_male": [], + "deductible_mortgage_interest": [], + } + puf.available_financial_vars = [] + puf.variable_to_entity = {} + row = pd.Series( + { + "AGERANGE": 1, + "age": 53, + "household_weight": 1, + "GENDER": 1, + "interest_deduction": 0, + } + ) + + puf.add_filer(row, tax_unit_id=123) + + assert puf.holder["age"] == [53] + + class TestForbesCache: def test_default_load_uses_packaged_snapshot_without_network(self, monkeypatch): from policyengine_us_data.datasets.puf import forbes_backbone