Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/705.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Populate CPS inputs for the SPM childcare expense formula.
1 change: 1 addition & 0 deletions policyengine_us_data/datasets/cps/census_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ class CensusCPS_2018(CensusCPS):
"A_LINENO",
"A_SPOUSE",
"A_EXPRRP",
"PERRP",
"A_FAMREL",
"A_FAMTYP",
"A_AGE",
Expand Down
23 changes: 23 additions & 0 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@
),
}

# Census CPS ASEC 2024 technical documentation, PERRP:
# https://www2.census.gov/programs-surveys/cps/techdocs/cpsmar24.pdf
PERRP_UNMARRIED_PARTNER_OF_HOUSEHOLD_HEAD_CODES = {
43: "Opposite Sex Unmarried Partner with Relatives",
44: "Opposite Sex Unmarried Partner without Relatives",
46: "Same Sex Unmarried Partner with Relatives",
47: "Same Sex Unmarried Partner without Relatives",
}

ESI_POLICYHOLDER_VARIABLE = (
"reported_owns_employer_sponsored_health_insurance_at_interview"
)
Expand Down Expand Up @@ -864,6 +873,7 @@ def _validate_raw_cps_schema(
) -> None:
required_person_columns = {
"CENSUS_TAX_ID",
"PERRP",
*ESI_SOURCE_COLUMNS,
}
required_tax_unit_columns = set()
Expand Down Expand Up @@ -1067,6 +1077,14 @@ def children_per_parent(col: str) -> pd.DataFrame:

cps["is_surviving_spouse"] = person.A_MARITL == 4
cps["is_separated"] = person.A_MARITL == 6
perrp = (
person.PERRP
if "PERRP" in person
else pd.Series(0, index=person.index, dtype=np.int16)
)
cps["is_unmarried_partner_of_household_head"] = perrp.isin(
PERRP_UNMARRIED_PARTNER_OF_HOUSEHOLD_HEAD_CODES.keys()
)
# High school or college/university enrollment status.
if "A_FTPT" in person.columns:
cps["is_full_time_college_student"] = (person.A_HSCOL == 2) & (
Expand All @@ -1081,6 +1099,10 @@ def children_per_parent(col: str) -> pd.DataFrame:
add_overtime_occupation(cps, person)


def derive_weeks_worked(weeks_worked: Series | np.ndarray) -> Series | np.ndarray:
return np.clip(weeks_worked, 0, 52)


@pipeline_node(
PipelineNode(
id="add_personal_income_variables",
Expand Down Expand Up @@ -1119,6 +1141,7 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int):

cps["weekly_hours_worked"] = person.HRSWK
cps["hours_worked_last_week"] = person.A_HRS1
cps["weeks_worked"] = derive_weeks_worked(person.WKSWORK)

cps["taxable_interest_income"] = person.INT_VAL * (p["taxable_interest_fraction"])
cps["tax_exempt_interest_income"] = person.INT_VAL * (
Expand Down
139 changes: 2 additions & 137 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def _calculate_spm_thresholds_from_assigned_geography(
# Hours/employment
"weekly_hours_worked",
"hours_worked_last_week",
"weeks_worked",
# ORG labor-market variables
"hourly_wage",
"is_paid_hourly",
Expand Down Expand Up @@ -734,125 +735,6 @@ def reconcile_ss_subcomponents(predictions, total_ss):
}


def derive_clone_capped_childcare_expenses(
donor_pre_subsidy: np.ndarray,
donor_capped: np.ndarray,
clone_pre_subsidy: np.ndarray,
clone_person_data: pd.DataFrame,
clone_spm_unit_ids: np.ndarray,
) -> np.ndarray:
"""Derive clone-half capped childcare from clone inputs.

The CPS provides both pre-subsidy childcare and the SPM-specific
capped childcare deduction. For the clone half, we impute only the
pre-subsidy amount, then deterministically rebuild the capped amount
instead of letting a second QRF predict it independently.

We preserve the donor's observed capping share while also respecting
the clone's own earnings cap. This keeps the clone-half value
consistent with pre-subsidy childcare and avoids impossible outputs
such as capped childcare exceeding pre-subsidy childcare.
"""

donor_pre_subsidy = np.asarray(donor_pre_subsidy, dtype=float)
donor_capped = np.asarray(donor_capped, dtype=float)
clone_pre_subsidy = np.asarray(clone_pre_subsidy, dtype=float)
clone_spm_unit_ids = np.asarray(clone_spm_unit_ids)

donor_cap_share = np.divide(
donor_capped,
donor_pre_subsidy,
out=np.zeros_like(donor_capped, dtype=float),
where=donor_pre_subsidy > 0,
)
donor_cap_share = np.clip(donor_cap_share, 0.0, 1.0)
capped_from_share = np.maximum(clone_pre_subsidy, 0.0) * donor_cap_share

if clone_person_data.empty:
earnings_cap = np.zeros(len(clone_spm_unit_ids), dtype=float)
else:
eligible = clone_person_data["is_parent_proxy"].astype(bool)
parent_rows = clone_person_data.loc[
eligible, ["spm_unit_id", "age", "earnings"]
].copy()
if parent_rows.empty:
earnings_cap = np.zeros(len(clone_spm_unit_ids), dtype=float)
else:
parent_rows["earnings"] = parent_rows["earnings"].clip(lower=0.0)
parent_rows["age_rank"] = parent_rows.groupby("spm_unit_id")["age"].rank(
method="first", ascending=False
)
top_two = parent_rows[parent_rows["age_rank"] <= 2].sort_values(
["spm_unit_id", "age_rank"]
)
earnings_cap_by_unit = top_two.groupby("spm_unit_id")["earnings"].agg(
lambda values: (
float(values.iloc[0])
if len(values) == 1
else float(np.minimum(values.iloc[0], values.iloc[1]))
)
)
earnings_cap = earnings_cap_by_unit.reindex(
clone_spm_unit_ids, fill_value=0.0
).to_numpy(dtype=float)

return np.minimum(capped_from_share, earnings_cap)


def _rebuild_clone_capped_childcare_expenses(
data: dict,
time_period: int,
cps_sim,
) -> np.ndarray:
"""Rebuild clone-half capped childcare expenses after stage-2 imputation."""

n_persons_half = len(data["person_id"][time_period]) // 2
n_spm_units_half = len(data["spm_unit_id"][time_period]) // 2

person_roles = cps_sim.calculate_dataframe(
["age", "is_tax_unit_head", "is_tax_unit_spouse"]
)
if len(person_roles) != n_persons_half:
raise ValueError(
"Unexpected person role frame length while rebuilding clone childcare "
f"expenses: got {len(person_roles)}, expected {n_persons_half}"
)

clone_person_data = pd.DataFrame(
{
"spm_unit_id": data["person_spm_unit_id"][time_period][n_persons_half:],
"age": person_roles["age"].values,
"is_parent_proxy": (
person_roles["is_tax_unit_head"].values
| person_roles["is_tax_unit_spouse"].values
),
"earnings": (
data["employment_income"][time_period][n_persons_half:]
+ data["self_employment_income"][time_period][n_persons_half:]
),
}
)

donor_pre_subsidy = data["spm_unit_pre_subsidy_childcare_expenses"][time_period][
:n_spm_units_half
]
donor_capped = data["spm_unit_capped_work_childcare_expenses"][time_period][
:n_spm_units_half
]
clone_pre_subsidy = data["spm_unit_pre_subsidy_childcare_expenses"][time_period][
n_spm_units_half:
]
clone_spm_unit_ids = data["spm_unit_id"][time_period][n_spm_units_half:]

return derive_clone_capped_childcare_expenses(
donor_pre_subsidy=donor_pre_subsidy,
donor_capped=donor_capped,
clone_pre_subsidy=clone_pre_subsidy,
clone_person_data=clone_person_data,
clone_spm_unit_ids=clone_spm_unit_ids,
)


def _apply_post_processing(predictions, X_test, time_period, data):
"""Apply retirement constraints and SS reconciliation."""
ret_cols = [c for c in predictions.columns if c in _RETIREMENT_VARS]
Expand Down Expand Up @@ -984,24 +866,6 @@ def _splice_cps_only_predictions(
new_values = np.concatenate([cps_half, pred_values])
data[var] = {time_period: new_values}

if (
"spm_unit_capped_work_childcare_expenses" in data
and "spm_unit_pre_subsidy_childcare_expenses" in data
):
n_half = entity_half_lengths.get(
"spm_unit",
len(data["spm_unit_capped_work_childcare_expenses"][time_period]) // 2,
)
cps_half = data["spm_unit_capped_work_childcare_expenses"][time_period][:n_half]
clone_half = _rebuild_clone_capped_childcare_expenses(
data=data,
time_period=time_period,
cps_sim=cps_sim,
)
data["spm_unit_capped_work_childcare_expenses"] = {
time_period: np.concatenate([cps_half, clone_half])
}

del cps_sim
return data

Expand Down Expand Up @@ -1380,6 +1244,7 @@ def _validate_structural_mortgage_conversion(
_KEEP_FORMULA_VARS = {
"person_id",
"spm_unit_spm_threshold",
"weeks_worked",
"self_employed_pension_contribution_ald",
"self_employed_health_insurance_ald",
}
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/datasets/test_cps_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def test_validate_raw_cps_schema_accepts_constructed_tax_unit_id_column():
person = pd.DataFrame(
{
"CENSUS_TAX_ID": [123],
"PERRP": [43],
"NOW_GRPFTYP": [1],
"NOW_HIPAID": [1],
"NOW_OWNGRP": [1],
Expand All @@ -195,3 +196,22 @@ def test_validate_raw_cps_schema_accepts_constructed_tax_unit_id_column():
tax_unit = pd.DataFrame({"TAX_ID": [1]})

_validate_raw_cps_schema(person, tax_unit, "census_cps_2024")


def test_validate_raw_cps_schema_requires_reference_partner_column():
from policyengine_us_data.datasets.cps.cps import _validate_raw_cps_schema

person = pd.DataFrame(
{
"CENSUS_TAX_ID": [123],
"NOW_GRPFTYP": [1],
"NOW_HIPAID": [1],
"NOW_OWNGRP": [1],
}
)
tax_unit = pd.DataFrame({"TAX_ID": [1]})

with pytest.raises(ValueError) as error:
_validate_raw_cps_schema(person, tax_unit, "census_cps_2024")

assert "PERRP" in str(error.value)
2 changes: 2 additions & 0 deletions tests/unit/datasets/test_cps_income_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def _minimal_person_income_frame() -> pd.DataFrame:
"WSAL_VAL",
"HRSWK",
"A_HRS1",
"WKSWORK",
"INT_VAL",
"SEMP_VAL",
"FRSE_VAL",
Expand Down Expand Up @@ -54,6 +55,7 @@ def _minimal_person_income_frame() -> pd.DataFrame:
person = pd.DataFrame({column: [0.0, 0.0] for column in columns})
person["A_AGE"] = [30, 45]
person["LKWEEKS"] = [0, 0]
person["WKSWORK"] = [0, 0]
return person


Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_employer_sponsored_insurance_premiums.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def test_raw_cps_schema_requires_esi_source_columns():
person = pd.DataFrame(
{
"CENSUS_TAX_ID": [1],
"PERRP": [43],
**{column: [1] for column in ESI_SOURCE_COLUMNS},
}
)
Expand Down
Loading