Skip to content
Open
106 changes: 102 additions & 4 deletions scout/ecm_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,73 @@ def update_active_measures(cls,
return run_setup


def add_internal_gains_aggregate(msegs: dict, years, ig_names=None, new_name="internal gains"):
"""Aggregate internal gain thermal load components into one node per mseg.

Non-destructive: original component nodes are preserved under
['internal gains']['components_original'].

Parameters
----------
msegs : dict
Baseline microsegment stock/energy structure (loaded from JSON).
years : iterable[str]
AEO modeling year strings used as keys in energy dicts.
ig_names : list[str], optional
Component names to aggregate. Defaults to standard set.
new_name : str
Name of aggregated node to create.
"""
if ig_names is None:
# Only aggregate people + equipment gains per requirements
ig_names = ["people gain", "equipment gain"]

# Traverse top-level geography keys
for geo_key, geo_val in (msegs.items() if isinstance(msegs, dict) else []):
if not isinstance(geo_val, dict):
continue
for bldg_key, bldg_val in geo_val.items():
if not isinstance(bldg_val, dict):
continue
for fuel_key, fuel_val in bldg_val.items():
if not isinstance(fuel_val, dict):
continue
# print(f"Processing {geo_key} {bldg_key} {fuel_key}")
for eu in ("heating", "secondary heating", "cooling"):
eu_dict = fuel_val.get(eu)
if not isinstance(eu_dict, dict):
continue
demand = eu_dict.get("demand")
if not isinstance(demand, dict):
continue
# Skip if already aggregated
if new_name in demand:
continue
comp_energy_pairs = [] # (name, energy_dict)
for nm in ig_names:
node = demand.get(nm)
if isinstance(node, dict) and isinstance(node.get("energy"), dict):
comp_energy_pairs.append((nm, node["energy"]))
if not comp_energy_pairs:
continue
# Sum per year (missing years treated as zero)
summed = {yr: float(sum(ed.get(yr, 0.0)
for _, ed in comp_energy_pairs)) for yr in years}
demand[new_name] = {
"stock": "NA",
"energy": summed,
# Preserve originals for traceability
"components_original": {nm: demand[nm] for nm, _ in comp_energy_pairs}
}
# Remove original component nodes to prevent double counting elsewhere
for nm, _ in comp_energy_pairs:
try:
del demand[nm]
except Exception:
pass
return msegs


class UsefulInputFiles(object):
"""Class of input file paths to be used by this routine.

Expand Down Expand Up @@ -578,7 +645,13 @@ def __init__(self, base_dir, handyfiles, opts):
self.demand_tech = [
'roof', 'ground', 'lighting gain', 'windows conduction',
'equipment gain', 'floor', 'infiltration', 'people gain',
'windows solar', 'ventilation', 'other heat gain', 'wall']
'windows solar', 'ventilation', 'other heat gain', 'wall',
'internal gains'] # 'internal gains' is aggregated from people + equipment gains
# Map legacy internal gain component names to the aggregated node
self.demand_tech_alias = {
'people gain': 'internal gains',
'equipment gain': 'internal gains',
}
# Note: ASHP costs are zero by convention in EIA data for new
# construction
self.zero_cost_tech = ['infiltration', 'ASHP']
Expand Down Expand Up @@ -1222,7 +1295,7 @@ def __init__(self, base_dir, handyfiles, opts):
"demand": [
'roof', 'ground', 'windows solar',
'windows conduction', 'equipment gain',
'people gain', 'wall', 'infiltration']},
'people gain', 'internal gains', 'wall', 'infiltration']},
"commercial": {
"supply": {
"electricity": {
Expand Down Expand Up @@ -1319,7 +1392,7 @@ def __init__(self, base_dir, handyfiles, opts):
'roof', 'ground', 'lighting gain',
'windows conduction', 'equipment gain',
'floor', 'infiltration', 'people gain',
'windows solar', 'ventilation',
'internal gains', 'windows solar', 'ventilation',
'other heat gain', 'wall']}}}
# Find the full set of valid names for describing a measure's
# applicable baseline that do not begin with 'all'
Expand Down Expand Up @@ -1787,7 +1860,7 @@ def __init__(self, base_dir, handyfiles, opts):
delimiter=',', dtype=(['<U25'] * 3 + ['<f8'] * 4))
self.env_heat_ls_scrn = (
"windows solar", "equipment gain", "people gain",
"other heat gain")
"other heat gain", "internal gains")
self.skipped_ecms = []
self.save_shp_warn = []

Expand Down Expand Up @@ -10200,6 +10273,19 @@ def create_keychain(self, mseg_type):
self.fuel_type[mseg_type], self.end_use[mseg_type],
self.technology_type[mseg_type],
self.technology[mseg_type], self.structure_type]]
# Map legacy internal gain component names to the aggregated node, if present
try:
alias_map = getattr(self.handyvars, 'demand_tech_alias', {})
except Exception:
alias_map = {}
if isinstance(self.technology[mseg_type], list) and alias_map:
mapped = []
for t in self.technology[mseg_type]:
mt = alias_map.get(t, t)
if mt not in mapped:
mapped.append(mt)
self.technology[mseg_type] = mapped

# Flag heating/cooling end use microsegments. For heating/cooling
# cases, an extra 'supply' or 'demand' key is required in the key
# chain; this key indicates the supply-side and demand-side variants
Expand Down Expand Up @@ -14818,6 +14904,18 @@ def main(opts: argparse.NameSpace): # noqa: F821
msegs = json.loads(zip_ref.read().decode('utf-8'))
else:
msegs = Utils.load_json(handyfiles.msegs_in)
# Aggregate internal gains components (people + equipment only)
# into a single 'internal gains' node for heating/secondary heating/cooling demand
# microsegments. Original component nodes are preserved under
# ['internal gains']['components_original'] for traceability. This prevents
# downstream double counting once logic skips originals when aggregate present.
try:
msegs = add_internal_gains_aggregate(msegs, handyvars.aeo_years)
logger.info("Applied internal gains aggregation (people + equipment)")

except Exception as e:
logger.warning(
f"Internal gains aggregation failed; proceeding without aggregation: {e}")
# Import baseline cost, performance, and lifetime data
bjszip = handyfiles.msegs_cpl_in
with gzip.GzipFile(bjszip, 'r') as zip_ref:
Expand Down
Loading