Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/841.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Populate American Opportunity Credit eligibility inputs in Enhanced CPS from the PUF-imputed AOTC signal.
251 changes: 251 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
)
from policyengine_us_data.datasets.puf import PUF, PUF_2024
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.utils.aotc import (
maximum_american_opportunity_credit_per_student,
qualifying_expenses_from_american_opportunity_credit,
)
from policyengine_us_data.utils.mortgage_interest import (
STRUCTURAL_MORTGAGE_VARIABLES,
convert_mortgage_interest_to_structural_inputs,
Expand All @@ -30,6 +34,32 @@
logger = logging.getLogger(__name__)


AOTC_ELIGIBILITY_INPUTS = (
"is_pursuing_credential_for_american_opportunity_credit",
"attends_eligible_educational_institution_for_american_opportunity_credit",
"is_enrolled_at_least_half_time_for_american_opportunity_credit",
"has_american_opportunity_credit_1098_t_or_exception",
"has_american_opportunity_credit_institution_ein",
"has_completed_first_four_years_of_postsecondary_education",
"has_felony_drug_conviction",
"american_opportunity_credit_claimed_prior_years",
)


LLC_ELIGIBILITY_INPUTS = (
"attends_eligible_educational_institution_for_lifetime_learning_credit",
"has_lifetime_learning_credit_1098_t_or_exception",
)


def _supports_aotc_eligibility_inputs() -> bool:
return has_policyengine_us_variables(*AOTC_ELIGIBILITY_INPUTS)


def _supports_llc_eligibility_inputs() -> bool:
return has_policyengine_us_variables(*LLC_ELIGIBILITY_INPUTS)


def _supports_structural_mortgage_inputs() -> bool:
return has_policyengine_us_variables(*STRUCTURAL_MORTGAGE_VARIABLES)

Expand Down Expand Up @@ -898,6 +928,8 @@ def generate(self):
dataset_path=str(self.cps.file_path),
)

new_data = self._impute_aotc_eligibility_inputs(new_data, self.time_period)
new_data = self._impute_llc_eligibility_inputs(new_data, self.time_period)
new_data = self._rename_imputed_to_inputs(new_data)
if _supports_structural_mortgage_inputs():
had_positive_mortgage_input = self._has_positive_mortgage_input(
Expand All @@ -920,6 +952,225 @@ def generate(self):
new_data = self._drop_formula_variables(new_data)
self.save_dataset(new_data)

@classmethod
def _impute_aotc_eligibility_inputs(cls, data, time_period):
"""Convert imputed tax-unit AOTC amounts to person eligibility inputs."""
credit = data.get("american_opportunity_credit", {}).get(time_period)
tax_unit_ids = data.get("tax_unit_id", {}).get(time_period)
person_tax_unit_ids = data.get("person_tax_unit_id", {}).get(time_period)
tuition = data.get("qualified_tuition_expenses", {}).get(time_period)
if (
credit is None
or tax_unit_ids is None
or person_tax_unit_ids is None
or tuition is None
):
return data

credit = np.asarray(credit)
tax_unit_ids = np.asarray(tax_unit_ids)
person_tax_unit_ids = np.asarray(person_tax_unit_ids)
tuition = np.array(tuition, copy=True)
if len(credit) != len(tax_unit_ids) or len(tuition) != len(person_tax_unit_ids):
logger.warning(
"Skipping AOTC eligibility imputation due to entity length mismatch"
)
return data

aotc_student = np.zeros(len(person_tax_unit_ids), dtype=bool)

full_time = data.get("is_full_time_college_student", {}).get(time_period)
full_time = (
np.asarray(full_time, dtype=bool)
if full_time is not None
else np.zeros(len(person_tax_unit_ids), dtype=bool)
)
dependent = data.get("is_tax_unit_dependent", {}).get(time_period)
dependent = (
np.asarray(dependent, dtype=bool)
if dependent is not None
else np.zeros(len(person_tax_unit_ids), dtype=bool)
)

positive_credit = credit > 0
if not positive_credit.any():
return data

positive_credit_units = tax_unit_ids[positive_credit]
credit_by_tax_unit_id = dict(zip(tax_unit_ids, credit))
adjusted_tuition_count = 0
max_student_credit = maximum_american_opportunity_credit_per_student(
time_period
)
for tax_unit_id in positive_credit_units:
member_indices = np.flatnonzero(person_tax_unit_ids == tax_unit_id)
if member_indices.size == 0 or max_student_credit <= 0:
continue

tuition_indices = member_indices[tuition[member_indices] > 0]
candidate_groups = []
if tuition_indices.size > 0:
candidate_groups.append(tuition_indices)
candidate_groups.extend(
(
member_indices[full_time[member_indices]],
member_indices[dependent[member_indices]],
member_indices,
)
)
ordered_candidates = []
seen = set()
for group in candidate_groups:
for index in group:
if index not in seen:
ordered_candidates.append(index)
seen.add(index)

remaining_credit = float(credit_by_tax_unit_id[tax_unit_id])
for selected in ordered_candidates:
if remaining_credit <= 0:
break
student_credit = min(remaining_credit, max_student_credit)
target_tuition = qualifying_expenses_from_american_opportunity_credit(
student_credit,
time_period,
)
if tuition[selected] != target_tuition:
adjusted_tuition_count += 1
aotc_student[selected] = True
tuition[selected] = target_tuition
remaining_credit -= student_credit

if not _supports_aotc_eligibility_inputs():
existing = data.get("is_eligible_for_american_opportunity_credit", {}).get(
time_period
)
values = (
np.asarray(existing, dtype=bool).copy()
if existing is not None
else np.zeros(len(person_tax_unit_ids), dtype=bool)
)
values[aotc_student] = True
data["is_eligible_for_american_opportunity_credit"] = {time_period: values}
data["qualified_tuition_expenses"] = {time_period: tuition}
logger.info(
"AOTC eligibility imputation populated the legacy "
"eligibility input for %d people across %d tax units "
"and adjusted tuition for %d people",
int(aotc_student.sum()),
int(positive_credit.sum()),
adjusted_tuition_count,
)
return data

for variable in (
"is_pursuing_credential_for_american_opportunity_credit",
"attends_eligible_educational_institution_for_american_opportunity_credit",
"is_enrolled_at_least_half_time_for_american_opportunity_credit",
"has_american_opportunity_credit_1098_t_or_exception",
"has_american_opportunity_credit_institution_ein",
):
existing = data.get(variable, {}).get(time_period)
values = (
np.asarray(existing, dtype=bool).copy()
if existing is not None
else np.zeros(len(person_tax_unit_ids), dtype=bool)
)
values[aotc_student] = True
data[variable] = {time_period: values}

for variable in (
"has_completed_first_four_years_of_postsecondary_education",
"has_felony_drug_conviction",
):
existing = data.get(variable, {}).get(time_period)
values = (
np.asarray(existing, dtype=bool).copy()
if existing is not None
else np.zeros(len(person_tax_unit_ids), dtype=bool)
)
values[aotc_student] = False
data[variable] = {time_period: values}

existing_prior_years = data.get(
"american_opportunity_credit_claimed_prior_years", {}
).get(time_period)
prior_years = (
np.asarray(existing_prior_years).copy()
if existing_prior_years is not None
else np.zeros(len(person_tax_unit_ids), dtype=np.int8)
)
prior_years[aotc_student] = np.minimum(prior_years[aotc_student], 3)
data["american_opportunity_credit_claimed_prior_years"] = {
time_period: prior_years
}
data["qualified_tuition_expenses"] = {time_period: tuition}
logger.info(
"AOTC eligibility imputation populated inputs for %d people "
"across %d tax units and adjusted tuition for %d people",
int(aotc_student.sum()),
int(positive_credit.sum()),
adjusted_tuition_count,
)
return data

@classmethod
def _impute_llc_eligibility_inputs(cls, data, time_period):
"""Populate LLC factual eligibility inputs for non-AOTC tuition records."""

if not _supports_llc_eligibility_inputs():
return data

person_tax_unit_ids = data.get("person_tax_unit_id", {}).get(time_period)
tuition = data.get("qualified_tuition_expenses", {}).get(time_period)
if person_tax_unit_ids is None or tuition is None:
return data

person_tax_unit_ids = np.asarray(person_tax_unit_ids)
tuition = np.asarray(tuition)
if len(tuition) != len(person_tax_unit_ids):
logger.warning(
"Skipping LLC eligibility imputation due to entity length mismatch"
)
return data

aotc_student = data.get(
"is_pursuing_credential_for_american_opportunity_credit",
{},
).get(time_period)
if aotc_student is None:
aotc_student = data.get(
"is_eligible_for_american_opportunity_credit",
{},
).get(time_period)
aotc_student = (
np.asarray(aotc_student, dtype=bool)
if aotc_student is not None
else np.zeros(len(person_tax_unit_ids), dtype=bool)
)

llc_student = (tuition > 0) & ~aotc_student
if not llc_student.any():
return data

for variable in LLC_ELIGIBILITY_INPUTS:
existing = data.get(variable, {}).get(time_period)
values = (
np.asarray(existing, dtype=bool).copy()
if existing is not None
else np.zeros(len(person_tax_unit_ids), dtype=bool)
)
values[llc_student] = True
data[variable] = {time_period: values}

logger.info(
"LLC eligibility imputation populated inputs for %d people "
"across %d tax units",
int(llc_student.sum()),
int(np.unique(person_tax_unit_ids[llc_student]).size),
)
return data

@classmethod
def _rename_imputed_to_inputs(cls, data):
"""Rename QRF-imputed formula vars to their leaf inputs.
Expand Down
Loading
Loading