Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eaa105d
Initial implementation of using dynamic grid carbon intensity
davidkopp Sep 22, 2025
b793d97
Improve dynamic grid intensity implementation
davidkopp Sep 23, 2025
a1d5de2
Improve storing carbon intensity data
davidkopp Sep 23, 2025
372ec94
Enhance carbon intensity with timeseries support for phases
davidkopp Sep 23, 2025
939edfc
Remove unnecessary storing of carbon phase stats
davidkopp Sep 24, 2025
b77520e
Cleanup
davidkopp Sep 24, 2025
22ae9c1
Ensure dynamic carbon intensity storage always has data points at the…
davidkopp Sep 24, 2025
e89e1d5
Simplify get timestamp function - use nearest data point instead of i…
davidkopp Sep 24, 2025
c6bf1de
Improve error handling - fail hard if an error occurs
davidkopp Sep 25, 2025
8c4dd99
Use config.yml instead of user settings
davidkopp Sep 25, 2025
8ae1860
Merge branch 'main' into dynamic-grid-carbon-intensity
davidkopp Sep 25, 2025
65b863a
Cleanup
davidkopp Sep 25, 2025
167ca6b
Add grid carbon intensity as metric
davidkopp Sep 25, 2025
40ee6a5
Refactor carbon calculations in phase stats by moving it to the end
davidkopp Sep 25, 2025
8c834da
Fix tests
davidkopp Sep 25, 2025
1ae96cb
Remove carbon intensity methods not needed anymore
davidkopp Sep 25, 2025
a95c662
Fix storage of dynamic carbon intensity data at all relevant timestamps
davidkopp Sep 25, 2025
2976455
Remove code duplications
davidkopp Sep 25, 2025
a90e897
Minor fixes and improvements
davidkopp Sep 25, 2025
92fc2d9
Override sampling rate with custom one
davidkopp Sep 25, 2025
623f989
Cleanup
davidkopp Sep 25, 2025
94b8384
Fix smoke test test_db_rows_are_written_and_presented
davidkopp Sep 26, 2025
e00b70e
Only process grid carbon intensity if phase stats are calculated
davidkopp Sep 26, 2025
0eddcf2
Remove unnecessary docstrings and comments
davidkopp Sep 27, 2025
17be841
Refactor import test function for carbon intensity metrics
davidkopp Sep 27, 2025
6f3e1b2
Round carbon intensity value
davidkopp Sep 27, 2025
d9c8df6
Rename metric values to have proper scope definition
davidkopp Sep 27, 2025
bf57e0e
Improve error output
davidkopp Sep 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions config.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ sci:
# The default is the value for a developer machine (Pro Laptop - https://dataviz.boavizta.org/terminalimpact)
TE: 181000
# I is the Carbon Intensity at the location of this machine
# The value can either be a number in gCO2e/kWh or a carbon intensity provider that fetches this number dynamically
# https://docs.green-coding.io/docs/measuring/carbon-intensity-providers/carbon-intensity-providers-overview/ (TODO)
# This is a static value in gCO2e/kWh. If you want to use the current dynamic grid carbon intensity,
# uncomment the option 'dynamic_grid_carbon_intensity' below.
# For fixed world-wide values get the number from https://ember-climate.org/insights/research/global-electricity-review-2025/
# The number worldwide for 2024 is 473
# The number 334 that comes as default is for Germany from 2024 and comes from https://app.electricitymaps.com/zone/DE/all/yearly
Expand All @@ -248,15 +248,27 @@ sci:
# See https://www.green-coding.io/co2-formulas/ for details
N: 0.04106063

#optimization:
# ignore:
# - example_optimization_test
# If you want to use the current dynamic grid carbon intensity for the carbon metrics instead of the fixed number above (SCI.I),
# uncomment the following lines and set your location and ensure the Elephant service is setup correctly.
# The location needs to be a valid grid zone code.
# For more information see our documentation: https://docs.green-coding.io/docs/measuring/carbon/grid-carbon-intensity/
#dynamic_grid_carbon_intensity:
# location: 'DE'
# elephant:
# host: localhost
# port: 8000
# protocol: http

# The following configuration is an enterprise feature:
# In order to get the carbon intensity we use electricity maps which requires a token.
# You can get this under https://api-portal.electricitymaps.com/
# This is a free service please note that you need to pay if you want to use this commercially!
#electricity_maps_token: '123'

#optimization:
# ignore:
# - example_optimization_test

# Modules API / Frontend
# GMT can selectively activate some API and frontend components. This is asked in the install process and should NOT
# only be changed here as files in different locations are changed too. Please re-run the install process.
Expand Down
10 changes: 10 additions & 0 deletions frontend/js/helpers/config.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -621,5 +621,15 @@ METRIC_MAPPINGS = {
"clean_name": "Total System Disk Writes",
"source": "cgroup",
"explanation": "Total data written to disk for the system via cgroup"
},
"grid_carbon_intensity_config_location": {
"clean_name": "Grid Carbon Intensity",
"source": "Config (Static)",
"explanation": "Static grid carbon intensity used to calculate the carbon emissions"
},
"grid_carbon_intensity_api_location": {
"clean_name": "Grid Carbon Intensity",
"source": "External Provider (Dynamic)",
"explanation": "Dynamic grid carbon intensity during the run retrieved from external carbon intensity provider"
}
} // PLEASE DO NOT REMOVE THIS COMMENT -- END METRIC_MAPPINGS
259 changes: 259 additions & 0 deletions lib/carbon_intensity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import requests
from datetime import datetime, timezone
from typing import List, Dict, Any
from io import StringIO
from lib.global_config import GlobalConfig
from lib.db import DB


class CarbonIntensityClient:
def __init__(self, base_url: str = None):
"""
Initialize carbon intensity client for Elephant service.

Args:
base_url: Base URL of the Elephant service. If None, reads from config.yml
"""
if base_url is None:
config = GlobalConfig().config
dynamic_config = config.get('dynamic_grid_carbon_intensity', {})
elephant_config = dynamic_config.get('elephant', {})
protocol = elephant_config.get('protocol', 'http')
host = elephant_config.get('host', 'localhost')
port = elephant_config.get('port', 8000)
base_url = f"{protocol}://{host}:{port}"

self.base_url = base_url.rstrip('/')

def get_carbon_intensity_history(self, location: str, start_time: str, end_time: str) -> List[Dict[str, Any]]:
url = f"{self.base_url}/carbon-intensity/history"
params = {
'location': location, # Location code (e.g., "DE", "ES-IB-MA")
'startTime': start_time, # ISO 8601 format (e.g., "2025-09-22T10:50:00Z")
'endTime': end_time, # ISO 8601 format (e.g., "2025-09-22T10:55:00Z")
'interpolate': 'true' # we also want to get data points that are adjacent to the requested time range, to be ensure we always get at least one data point
}

response = requests.get(url, params=params, timeout=30)

if not response.ok:
error_detail = "No additional error details available"
try:
error_json = response.json()
if isinstance(error_json, dict) and 'detail' in error_json:
error_detail = error_json['detail']
elif isinstance(error_json, dict):
error_detail = str(error_json)
else:
error_detail = str(error_json)
except (ValueError, KeyError):
error_detail = response.text if response.text else "No response body"

raise requests.HTTPError(
f"Carbon intensity API request failed with status {response.status_code}. "
f"Error details: {error_detail}"
)

data = response.json()

if not isinstance(data, list):
raise ValueError(f"Expected list response from carbon intensity service, got {type(data)}")

for item in data:
if not all(key in item for key in ['location', 'time', 'carbon_intensity']):
raise ValueError(f"Invalid carbon intensity data format: missing required fields in {item}")

return data


def _get_run_data_and_phases(run_id):
run_query = """
SELECT phases, start_measurement, end_measurement
FROM runs
WHERE id = %s
"""
run_data = DB().fetch_one(run_query, (run_id,))
if not run_data or not run_data[0]:
raise ValueError(f"Run {run_id} does not have phases data")

phases, start_time_us, end_time_us = run_data
return phases, start_time_us, end_time_us


def _create_measurement_metric(run_id, metric_name, detail_name, unit, sampling_rate):
return DB().fetch_one('''
INSERT INTO measurement_metrics (run_id, metric, detail_name, unit, sampling_rate_configured)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
''', params=(run_id, metric_name, detail_name, unit, sampling_rate))[0]

# Defines for which timestamps a carbon intensity value is needed: run start/end & phase middles
def _get_base_timestamps(phases, start_time_us, end_time_us):
timestamps = set()

# Add overall run start and end times
if start_time_us and end_time_us:
timestamps.add(start_time_us)
timestamps.add(end_time_us)

# Add middle timestamp for each phase
for phase in phases:
middle_timestamp = (phase['start'] + phase['end']) // 2
timestamps.add(middle_timestamp)

return timestamps


def _bulk_insert_measurement_values(measurement_metric_id, value_timestamp_pairs):
if not value_timestamp_pairs:
return

# For small datasets, use regular INSERT with multiple VALUES
if len(value_timestamp_pairs) <= 10:
values_to_insert = []
for value, timestamp in value_timestamp_pairs:
values_to_insert.extend([measurement_metric_id, round(value), timestamp])

placeholders = ', '.join(['(%s, %s, %s)'] * len(value_timestamp_pairs))
query = f"INSERT INTO measurement_values (measurement_metric_id, value, time) VALUES {placeholders}"
DB().query(query, tuple(values_to_insert))
# For larger datasets, use COPY FROM for better performance
else:
values_data = [(measurement_metric_id, round(value), timestamp)
for value, timestamp in value_timestamp_pairs]
csv_data = '\n'.join([f"{row[0]},{row[1]},{row[2]}" for row in values_data])
f = StringIO(csv_data)
DB().copy_from(
file=f,
table='measurement_values',
columns=['measurement_metric_id', 'value', 'time'],
sep=','
)
f.close()


def store_static_carbon_intensity(run_id, static_value):
phases, start_time_us, end_time_us = _get_run_data_and_phases(run_id)

metric_name = 'grid_carbon_intensity_config_location'
detail_name = '[CONFIG]'
unit = 'gCO2e/kWh'
sampling_rate = 0 # Static value has no sampling rate

measurement_metric_id = _create_measurement_metric(
run_id, metric_name, detail_name, unit, sampling_rate
)

# Calculate base timestamps, for which we definitely need a value:
# start/end of run + middle of each phase
timestamps = _get_base_timestamps(phases, start_time_us, end_time_us)

value_timestamp_pairs = [(static_value, timestamp) for timestamp in timestamps]

_bulk_insert_measurement_values(measurement_metric_id, value_timestamp_pairs)

print(f"Stored static carbon intensity value {static_value} gCO2e/kWh at {len(timestamps)} timestamps (run start/end + phase middles)")


def store_dynamic_carbon_intensity(run_id, location):
phases, start_time_us, end_time_us = _get_run_data_and_phases(run_id)
start_time_iso = _microseconds_to_iso8601(start_time_us)
end_time_iso = _microseconds_to_iso8601(end_time_us)

carbon_client = CarbonIntensityClient()
carbon_intensity_data = carbon_client.get_carbon_intensity_history(
location, start_time_iso, end_time_iso
)
if not carbon_intensity_data:
raise ValueError(
f"No carbon intensity data received from service for location '{location}' "
f"between {start_time_iso} and {end_time_iso}. The service returned an empty dataset."
)

values = [float(dp['carbon_intensity']) for dp in carbon_intensity_data]
print(f"Retrieved {len(carbon_intensity_data)} API data points for {location}: "
f"range {min(values):.1f}-{max(values):.1f} gCO2e/kWh")

metric_name = 'grid_carbon_intensity_api_location'
detail_name = location
unit = 'gCO2e/kWh'
sampling_rate = _calculate_sampling_rate_from_data(carbon_intensity_data)

measurement_metric_id = _create_measurement_metric(
run_id, metric_name, detail_name, unit, sampling_rate
)

# Convert API data to format we need within GMT
carbon_data_for_lookup = []
for data_point in carbon_intensity_data:
# Convert ISO timestamp to microseconds
iso_time = data_point['time']
dt = datetime.fromisoformat(iso_time.replace('Z', '+00:00'))
timestamp_us = int(dt.timestamp() * 1_000_000)

carbon_data_for_lookup.append({
'timestamp_us': timestamp_us,
'carbon_intensity': float(data_point['carbon_intensity'])
})

carbon_data_for_lookup.sort(key=lambda x: x['timestamp_us'])

# Calculate base timestamps, for which we definitely need a value:
# start/end of run + middle of each phase
timestamps = _get_base_timestamps(phases, start_time_us, end_time_us)

# Add any intermediate API data points that fall within measurement timeframe
for data_point in carbon_data_for_lookup:
timestamp_us = data_point['timestamp_us']
if start_time_us <= timestamp_us <= end_time_us:
timestamps.add(timestamp_us)

value_timestamp_pairs = []
if len(carbon_data_for_lookup) == 1:
# If only one data point, use it for all timestamps
carbon_intensity = carbon_data_for_lookup[0]['carbon_intensity']
value_timestamp_pairs = [(carbon_intensity, timestamp) for timestamp in timestamps]
else:
# Convert timestamps to values using nearest data point logic
for timestamp in timestamps:
carbon_intensity = _get_carbon_intensity_at_timestamp(timestamp, carbon_data_for_lookup)
value_timestamp_pairs.append((carbon_intensity, timestamp))

_bulk_insert_measurement_values(measurement_metric_id, value_timestamp_pairs)

unique_values = len(set(int(value) for value, _ in value_timestamp_pairs))
print(f"Stored dynamic carbon intensity for location {location}: {len(value_timestamp_pairs)} timestamps, {unique_values} unique values")


# Find the data point with timestamp closest to target timestamp.
# Interpolation is not used on purpose here.
def _get_carbon_intensity_at_timestamp(timestamp_us: int, carbon_data: List[Dict[str, Any]]) -> float:
closest_point = min(
carbon_data,
key=lambda point: abs(point['timestamp_us'] - timestamp_us)
)

return float(closest_point['carbon_intensity'])


def _calculate_sampling_rate_from_data(carbon_intensity_data: List[Dict[str, Any]]) -> int:
if not carbon_intensity_data or len(carbon_intensity_data) < 2:
return 0

try:
time1 = datetime.fromisoformat(carbon_intensity_data[0]['time'].replace('Z', '+00:00'))
time2 = datetime.fromisoformat(carbon_intensity_data[1]['time'].replace('Z', '+00:00'))
interval_seconds = abs((time2 - time1).total_seconds())
sampling_rate = int(interval_seconds * 1000)
return sampling_rate
except (KeyError, ValueError, IndexError):
return 0


def _microseconds_to_iso8601(timestamp_us: int) -> str:
timestamp_seconds = timestamp_us / 1_000_000
dt = datetime.fromtimestamp(timestamp_seconds, timezone.utc)
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
Loading
Loading