From fa52ad881aec40bab145663680f3252ef5f518dd Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 27 Apr 2026 14:32:33 -0400 Subject: [PATCH 1/2] Impute unavailable prior-year CPS income --- policyengine_us_data/datasets/cps/cps.py | 36 ++++++++++--- tests/unit/datasets/test_cps_file_handles.py | 54 ++++++++++++++++++++ 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index d12ba7eef..133cb9311 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -1108,6 +1108,8 @@ def add_previous_year_income(self, cps: h5py.File) -> None: ) return + prior_year_income_sentinels = {-1, -9999} + with ( _open_dataset_read_only(self.raw_cps) as cps_current_year_data, _open_dataset_read_only(self.previous_year_raw_cps) as cps_previous_year_data, @@ -1137,19 +1139,41 @@ def add_previous_year_income(self, cps: h5py.File) -> None: joined_data = cps_current_year.join(previous_year_data)[ [ + "WSAL_VAL", + "SEMP_VAL", "employment_income_last_year", "self_employment_income_last_year", - "I_ERNVAL", - "I_SEVAL", ] - ] + ].rename( + { + "WSAL_VAL": "current_year_employment_income", + "SEMP_VAL": "current_year_self_employment_income", + }, + axis=1, + ) + + invalid_previous_year_income = ( + joined_data.employment_income_last_year.isin(prior_year_income_sentinels) + | joined_data.self_employment_income_last_year.isin( + prior_year_income_sentinels + ) + ) + joined_data.loc[ + invalid_previous_year_income, + ["employment_income_last_year", "self_employment_income_last_year"], + ] = np.nan + joined_data["previous_year_income_available"] = ( ~joined_data.employment_income_last_year.isna() & ~joined_data.self_employment_income_last_year.isna() - & (joined_data.I_ERNVAL == 0) - & (joined_data.I_SEVAL == 0) ) - joined_data = joined_data.fillna(-1).drop(["I_ERNVAL", "I_SEVAL"], axis=1) + joined_data["employment_income_last_year"] = joined_data[ + "employment_income_last_year" + ].fillna(joined_data["current_year_employment_income"]) + joined_data["self_employment_income_last_year"] = joined_data[ + "self_employment_income_last_year" + ].fillna(joined_data["current_year_self_employment_income"]) + joined_data = joined_data.fillna(0) # CPS already ordered by PERIDNUM, so the join wouldn't change the order. cps["employment_income_last_year"] = joined_data[ diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index db5add17f..29b9cff4f 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -42,6 +42,8 @@ def test_add_previous_year_income_closes_raw_cps_handles(): current_person = pd.DataFrame( { "PERIDNUM": [10, 20], + "WSAL_VAL": [1_100, 2_100], + "SEMP_VAL": [110, 210], "I_ERNVAL": [0, 0], "I_SEVAL": [0, 0], } @@ -79,6 +81,56 @@ def test_add_previous_year_income_closes_raw_cps_handles(): assert previous_store.closed is True +def test_add_previous_year_income_imputes_unavailable_rows(): + current_person = pd.DataFrame( + { + "PERIDNUM": [10, 20, 30, 40], + "WSAL_VAL": [1_100, 2_100, 3_100, 4_100], + "SEMP_VAL": [110, 210, 310, 410], + "I_ERNVAL": [0, 0, 0, 0], + "I_SEVAL": [0, 0, 0, 0], + } + ) + previous_person = pd.DataFrame( + { + "PERIDNUM": [10, 20, 30], + "WSAL_VAL": [1_000, 2_000, -9999], + "SEMP_VAL": [100, -1, 300], + "I_ERNVAL": [0, 0, 0], + "I_SEVAL": [0, 0, 0], + } + ) + + current_store = _FakeStore(current_person) + previous_store = _FakeStore(previous_person) + + current_dataset = type("CurrentDataset", (_FakeDataset,), {"store": current_store}) + previous_dataset = type( + "PreviousDataset", (_FakeDataset,), {"store": previous_store} + ) + + holder = SimpleNamespace( + raw_cps=current_dataset, + previous_year_raw_cps=previous_dataset, + ) + cps = {} + + add_previous_year_income(holder, cps) + + np.testing.assert_array_equal( + cps["employment_income_last_year"], + [1_000, 2_100, 3_100, 4_100], + ) + np.testing.assert_array_equal( + cps["self_employment_income_last_year"], + [100, 210, 310, 410], + ) + np.testing.assert_array_equal( + cps["previous_year_income_available"], + [True, False, False, False], + ) + + def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatch): current_path = tmp_path / "current.h5" previous_path = tmp_path / "previous.h5" @@ -87,6 +139,8 @@ def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatc store["person"] = pd.DataFrame( { "PERIDNUM": [10, 20], + "WSAL_VAL": [1_100, 2_100], + "SEMP_VAL": [110, 210], "I_ERNVAL": [0, 0], "I_SEVAL": [0, 0], } From be988232885b2e2f97084af61c38672beffff1e1 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 27 Apr 2026 14:41:50 -0400 Subject: [PATCH 2/2] Sanitize prior-year income fallback sentinels --- changelog.d/837.fixed | 1 + policyengine_us_data/datasets/cps/cps.py | 19 +++++++++++++------ tests/unit/datasets/test_cps_file_handles.py | 16 ++++++++-------- 3 files changed, 22 insertions(+), 14 deletions(-) create mode 100644 changelog.d/837.fixed diff --git a/changelog.d/837.fixed b/changelog.d/837.fixed new file mode 100644 index 000000000..2a2b470f8 --- /dev/null +++ b/changelog.d/837.fixed @@ -0,0 +1 @@ +Impute unavailable CPS prior-year wage and self-employment income instead of emitting sentinel values. diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 133cb9311..94e57209c 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -1152,16 +1152,23 @@ def add_previous_year_income(self, cps: h5py.File) -> None: axis=1, ) - invalid_previous_year_income = ( - joined_data.employment_income_last_year.isin(prior_year_income_sentinels) - | joined_data.self_employment_income_last_year.isin( - prior_year_income_sentinels - ) - ) + invalid_previous_year_income = joined_data.employment_income_last_year.isin( + prior_year_income_sentinels + ) | joined_data.self_employment_income_last_year.isin(prior_year_income_sentinels) joined_data.loc[ invalid_previous_year_income, ["employment_income_last_year", "self_employment_income_last_year"], ] = np.nan + joined_data.loc[ + joined_data.current_year_employment_income.isin(prior_year_income_sentinels), + "current_year_employment_income", + ] = np.nan + joined_data.loc[ + joined_data.current_year_self_employment_income.isin( + prior_year_income_sentinels + ), + "current_year_self_employment_income", + ] = np.nan joined_data["previous_year_income_available"] = ( ~joined_data.employment_income_last_year.isna() diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index 29b9cff4f..62eaeeec7 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -84,11 +84,11 @@ def test_add_previous_year_income_closes_raw_cps_handles(): def test_add_previous_year_income_imputes_unavailable_rows(): current_person = pd.DataFrame( { - "PERIDNUM": [10, 20, 30, 40], - "WSAL_VAL": [1_100, 2_100, 3_100, 4_100], - "SEMP_VAL": [110, 210, 310, 410], - "I_ERNVAL": [0, 0, 0, 0], - "I_SEVAL": [0, 0, 0, 0], + "PERIDNUM": [10, 20, 30, 40, 50], + "WSAL_VAL": [1_100, 2_100, 3_100, 4_100, -1], + "SEMP_VAL": [110, 210, 310, 410, -9999], + "I_ERNVAL": [0, 0, 0, 0, 0], + "I_SEVAL": [0, 0, 0, 0, 0], } ) previous_person = pd.DataFrame( @@ -119,15 +119,15 @@ def test_add_previous_year_income_imputes_unavailable_rows(): np.testing.assert_array_equal( cps["employment_income_last_year"], - [1_000, 2_100, 3_100, 4_100], + [1_000, 2_100, 3_100, 4_100, 0], ) np.testing.assert_array_equal( cps["self_employment_income_last_year"], - [100, 210, 310, 410], + [100, 210, 310, 410, 0], ) np.testing.assert_array_equal( cps["previous_year_income_available"], - [True, False, False, False], + [True, False, False, False, False], )