diff --git a/changelog.d/837.fixed b/changelog.d/837.fixed new file mode 100644 index 000000000..2a2b470f8 --- /dev/null +++ b/changelog.d/837.fixed @@ -0,0 +1 @@ +Impute unavailable CPS prior-year wage and self-employment income instead of emitting sentinel values. diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index d12ba7eef..94e57209c 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -1108,6 +1108,8 @@ def add_previous_year_income(self, cps: h5py.File) -> None: ) return + prior_year_income_sentinels = {-1, -9999} + with ( _open_dataset_read_only(self.raw_cps) as cps_current_year_data, _open_dataset_read_only(self.previous_year_raw_cps) as cps_previous_year_data, @@ -1137,19 +1139,48 @@ def add_previous_year_income(self, cps: h5py.File) -> None: joined_data = cps_current_year.join(previous_year_data)[ [ + "WSAL_VAL", + "SEMP_VAL", "employment_income_last_year", "self_employment_income_last_year", - "I_ERNVAL", - "I_SEVAL", ] - ] + ].rename( + { + "WSAL_VAL": "current_year_employment_income", + "SEMP_VAL": "current_year_self_employment_income", + }, + axis=1, + ) + + invalid_previous_year_income = joined_data.employment_income_last_year.isin( + prior_year_income_sentinels + ) | joined_data.self_employment_income_last_year.isin(prior_year_income_sentinels) + joined_data.loc[ + invalid_previous_year_income, + ["employment_income_last_year", "self_employment_income_last_year"], + ] = np.nan + joined_data.loc[ + joined_data.current_year_employment_income.isin(prior_year_income_sentinels), + "current_year_employment_income", + ] = np.nan + joined_data.loc[ + joined_data.current_year_self_employment_income.isin( + prior_year_income_sentinels + ), + "current_year_self_employment_income", + ] = np.nan + joined_data["previous_year_income_available"] = ( ~joined_data.employment_income_last_year.isna() & ~joined_data.self_employment_income_last_year.isna() - & (joined_data.I_ERNVAL == 0) - & (joined_data.I_SEVAL == 0) ) - joined_data = joined_data.fillna(-1).drop(["I_ERNVAL", "I_SEVAL"], axis=1) + joined_data["employment_income_last_year"] = joined_data[ + "employment_income_last_year" + ].fillna(joined_data["current_year_employment_income"]) + joined_data["self_employment_income_last_year"] = joined_data[ + "self_employment_income_last_year" + ].fillna(joined_data["current_year_self_employment_income"]) + joined_data = joined_data.fillna(0) # CPS already ordered by PERIDNUM, so the join wouldn't change the order. cps["employment_income_last_year"] = joined_data[ diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index db5add17f..62eaeeec7 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -42,6 +42,8 @@ def test_add_previous_year_income_closes_raw_cps_handles(): current_person = pd.DataFrame( { "PERIDNUM": [10, 20], + "WSAL_VAL": [1_100, 2_100], + "SEMP_VAL": [110, 210], "I_ERNVAL": [0, 0], "I_SEVAL": [0, 0], } @@ -79,6 +81,56 @@ def test_add_previous_year_income_closes_raw_cps_handles(): assert previous_store.closed is True +def test_add_previous_year_income_imputes_unavailable_rows(): + current_person = pd.DataFrame( + { + "PERIDNUM": [10, 20, 30, 40, 50], + "WSAL_VAL": [1_100, 2_100, 3_100, 4_100, -1], + "SEMP_VAL": [110, 210, 310, 410, -9999], + "I_ERNVAL": [0, 0, 0, 0, 0], + "I_SEVAL": [0, 0, 0, 0, 0], + } + ) + previous_person = pd.DataFrame( + { + "PERIDNUM": [10, 20, 30], + "WSAL_VAL": [1_000, 2_000, -9999], + "SEMP_VAL": [100, -1, 300], + "I_ERNVAL": [0, 0, 0], + "I_SEVAL": [0, 0, 0], + } + ) + + current_store = _FakeStore(current_person) + previous_store = _FakeStore(previous_person) + + current_dataset = type("CurrentDataset", (_FakeDataset,), {"store": current_store}) + previous_dataset = type( + "PreviousDataset", (_FakeDataset,), {"store": previous_store} + ) + + holder = SimpleNamespace( + raw_cps=current_dataset, + previous_year_raw_cps=previous_dataset, + ) + cps = {} + + add_previous_year_income(holder, cps) + + np.testing.assert_array_equal( + cps["employment_income_last_year"], + [1_000, 2_100, 3_100, 4_100, 0], + ) + np.testing.assert_array_equal( + cps["self_employment_income_last_year"], + [100, 210, 310, 410, 0], + ) + np.testing.assert_array_equal( + cps["previous_year_income_available"], + [True, False, False, False, False], + ) + + def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatch): current_path = tmp_path / "current.h5" previous_path = tmp_path / "previous.h5" @@ -87,6 +139,8 @@ def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatc store["person"] = pd.DataFrame( { "PERIDNUM": [10, 20], + "WSAL_VAL": [1_100, 2_100], + "SEMP_VAL": [110, 210], "I_ERNVAL": [0, 0], "I_SEVAL": [0, 0], }