Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/837.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Impute unavailable CPS prior-year wage and self-employment income instead of emitting sentinel values.
43 changes: 37 additions & 6 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,8 @@ def add_previous_year_income(self, cps: h5py.File) -> None:
)
return

prior_year_income_sentinels = {-1, -9999}

with (
_open_dataset_read_only(self.raw_cps) as cps_current_year_data,
_open_dataset_read_only(self.previous_year_raw_cps) as cps_previous_year_data,
Expand Down Expand Up @@ -1137,19 +1139,48 @@ def add_previous_year_income(self, cps: h5py.File) -> None:

joined_data = cps_current_year.join(previous_year_data)[
[
"WSAL_VAL",
"SEMP_VAL",
"employment_income_last_year",
"self_employment_income_last_year",
"I_ERNVAL",
"I_SEVAL",
]
]
].rename(
{
"WSAL_VAL": "current_year_employment_income",
"SEMP_VAL": "current_year_self_employment_income",
},
axis=1,
)

invalid_previous_year_income = joined_data.employment_income_last_year.isin(
prior_year_income_sentinels
) | joined_data.self_employment_income_last_year.isin(prior_year_income_sentinels)
joined_data.loc[
invalid_previous_year_income,
["employment_income_last_year", "self_employment_income_last_year"],
] = np.nan
joined_data.loc[
joined_data.current_year_employment_income.isin(prior_year_income_sentinels),
"current_year_employment_income",
] = np.nan
joined_data.loc[
joined_data.current_year_self_employment_income.isin(
prior_year_income_sentinels
),
"current_year_self_employment_income",
] = np.nan

joined_data["previous_year_income_available"] = (
~joined_data.employment_income_last_year.isna()
& ~joined_data.self_employment_income_last_year.isna()
& (joined_data.I_ERNVAL == 0)
& (joined_data.I_SEVAL == 0)
)
joined_data = joined_data.fillna(-1).drop(["I_ERNVAL", "I_SEVAL"], axis=1)
joined_data["employment_income_last_year"] = joined_data[
"employment_income_last_year"
].fillna(joined_data["current_year_employment_income"])
joined_data["self_employment_income_last_year"] = joined_data[
"self_employment_income_last_year"
].fillna(joined_data["current_year_self_employment_income"])
joined_data = joined_data.fillna(0)

# CPS already ordered by PERIDNUM, so the join wouldn't change the order.
cps["employment_income_last_year"] = joined_data[
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/datasets/test_cps_file_handles.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def test_add_previous_year_income_closes_raw_cps_handles():
current_person = pd.DataFrame(
{
"PERIDNUM": [10, 20],
"WSAL_VAL": [1_100, 2_100],
"SEMP_VAL": [110, 210],
"I_ERNVAL": [0, 0],
"I_SEVAL": [0, 0],
}
Expand Down Expand Up @@ -79,6 +81,56 @@ def test_add_previous_year_income_closes_raw_cps_handles():
assert previous_store.closed is True


def test_add_previous_year_income_imputes_unavailable_rows():
current_person = pd.DataFrame(
{
"PERIDNUM": [10, 20, 30, 40, 50],
"WSAL_VAL": [1_100, 2_100, 3_100, 4_100, -1],
"SEMP_VAL": [110, 210, 310, 410, -9999],
"I_ERNVAL": [0, 0, 0, 0, 0],
"I_SEVAL": [0, 0, 0, 0, 0],
}
)
previous_person = pd.DataFrame(
{
"PERIDNUM": [10, 20, 30],
"WSAL_VAL": [1_000, 2_000, -9999],
"SEMP_VAL": [100, -1, 300],
"I_ERNVAL": [0, 0, 0],
"I_SEVAL": [0, 0, 0],
}
)

current_store = _FakeStore(current_person)
previous_store = _FakeStore(previous_person)

current_dataset = type("CurrentDataset", (_FakeDataset,), {"store": current_store})
previous_dataset = type(
"PreviousDataset", (_FakeDataset,), {"store": previous_store}
)

holder = SimpleNamespace(
raw_cps=current_dataset,
previous_year_raw_cps=previous_dataset,
)
cps = {}

add_previous_year_income(holder, cps)

np.testing.assert_array_equal(
cps["employment_income_last_year"],
[1_000, 2_100, 3_100, 4_100, 0],
)
np.testing.assert_array_equal(
cps["self_employment_income_last_year"],
[100, 210, 310, 410, 0],
)
np.testing.assert_array_equal(
cps["previous_year_income_available"],
[True, False, False, False, False],
)


def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatch):
current_path = tmp_path / "current.h5"
previous_path = tmp_path / "previous.h5"
Expand All @@ -87,6 +139,8 @@ def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatc
store["person"] = pd.DataFrame(
{
"PERIDNUM": [10, 20],
"WSAL_VAL": [1_100, 2_100],
"SEMP_VAL": [110, 210],
"I_ERNVAL": [0, 0],
"I_SEVAL": [0, 0],
}
Expand Down
Loading