diff --git a/config.yml.example b/config.yml.example index bd2aafcdf..5d9a47ce8 100644 --- a/config.yml.example +++ b/config.yml.example @@ -237,8 +237,8 @@ sci: # The default is the value for a developer machine (Pro Laptop - https://dataviz.boavizta.org/terminalimpact) TE: 181000 # I is the Carbon Intensity at the location of this machine - # The value can either be a number in gCO2e/kWh or a carbon intensity provider that fetches this number dynamically - # https://docs.green-coding.io/docs/measuring/carbon-intensity-providers/carbon-intensity-providers-overview/ (TODO) + # This is a static value in gCO2e/kWh. If you want to use the current dynamic grid carbon intensity, + # uncomment the option 'dynamic_grid_carbon_intensity' below. # For fixed world-wide values get the number from https://ember-climate.org/insights/research/global-electricity-review-2025/ # The number worldwide for 2024 is 473 # The number 334 that comes as default is for Germany from 2024 and comes from https://app.electricitymaps.com/zone/DE/all/yearly @@ -248,15 +248,27 @@ sci: # See https://www.green-coding.io/co2-formulas/ for details N: 0.04106063 -#optimization: -# ignore: -# - example_optimization_test +# If you want to use the current dynamic grid carbon intensity for the carbon metrics instead of the fixed number above (SCI.I), +# uncomment the following lines and set your location and ensure the Elephant service is setup correctly. +# The location needs to be a valid grid zone code. +# For more information see our documentation: https://docs.green-coding.io/docs/measuring/carbon/grid-carbon-intensity/ +#dynamic_grid_carbon_intensity: +# location: 'DE' +# elephant: +# host: localhost +# port: 8000 +# protocol: http +# The following configuration is an enterprise feature: # In order to get the carbon intensity we use electricity maps which requires a token. # You can get this under https://api-portal.electricitymaps.com/ # This is a free service please note that you need to pay if you want to use this commercially! #electricity_maps_token: '123' +#optimization: +# ignore: +# - example_optimization_test + # Modules API / Frontend # GMT can selectively activate some API and frontend components. This is asked in the install process and should NOT # only be changed here as files in different locations are changed too. Please re-run the install process. diff --git a/frontend/js/helpers/config.js.example b/frontend/js/helpers/config.js.example index b62a99280..cf74a0939 100644 --- a/frontend/js/helpers/config.js.example +++ b/frontend/js/helpers/config.js.example @@ -621,5 +621,15 @@ METRIC_MAPPINGS = { "clean_name": "Total System Disk Writes", "source": "cgroup", "explanation": "Total data written to disk for the system via cgroup" + }, + "grid_carbon_intensity_config_location": { + "clean_name": "Grid Carbon Intensity", + "source": "Config (Static)", + "explanation": "Static grid carbon intensity used to calculate the carbon emissions" + }, + "grid_carbon_intensity_api_location": { + "clean_name": "Grid Carbon Intensity", + "source": "External Provider (Dynamic)", + "explanation": "Dynamic grid carbon intensity during the run retrieved from external carbon intensity provider" } } // PLEASE DO NOT REMOVE THIS COMMENT -- END METRIC_MAPPINGS diff --git a/lib/carbon_intensity.py b/lib/carbon_intensity.py new file mode 100644 index 000000000..fbb38a8b7 --- /dev/null +++ b/lib/carbon_intensity.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import requests +from datetime import datetime, timezone +from typing import List, Dict, Any +from io import StringIO +from lib.global_config import GlobalConfig +from lib.db import DB + + +class CarbonIntensityClient: + def __init__(self, base_url: str = None): + """ + Initialize carbon intensity client for Elephant service. + + Args: + base_url: Base URL of the Elephant service. If None, reads from config.yml + """ + if base_url is None: + config = GlobalConfig().config + dynamic_config = config.get('dynamic_grid_carbon_intensity', {}) + elephant_config = dynamic_config.get('elephant', {}) + protocol = elephant_config.get('protocol', 'http') + host = elephant_config.get('host', 'localhost') + port = elephant_config.get('port', 8000) + base_url = f"{protocol}://{host}:{port}" + + self.base_url = base_url.rstrip('/') + + def get_carbon_intensity_history(self, location: str, start_time: str, end_time: str) -> List[Dict[str, Any]]: + url = f"{self.base_url}/carbon-intensity/history" + params = { + 'location': location, # Location code (e.g., "DE", "ES-IB-MA") + 'startTime': start_time, # ISO 8601 format (e.g., "2025-09-22T10:50:00Z") + 'endTime': end_time, # ISO 8601 format (e.g., "2025-09-22T10:55:00Z") + 'interpolate': 'true' # we also want to get data points that are adjacent to the requested time range, to be ensure we always get at least one data point + } + + response = requests.get(url, params=params, timeout=30) + + if not response.ok: + error_detail = "No additional error details available" + try: + error_json = response.json() + if isinstance(error_json, dict) and 'detail' in error_json: + error_detail = error_json['detail'] + elif isinstance(error_json, dict): + error_detail = str(error_json) + else: + error_detail = str(error_json) + except (ValueError, KeyError): + error_detail = response.text if response.text else "No response body" + + raise requests.HTTPError( + f"Carbon intensity API request failed with status {response.status_code}. " + f"Error details: {error_detail}" + ) + + data = response.json() + + if not isinstance(data, list): + raise ValueError(f"Expected list response from carbon intensity service, got {type(data)}") + + for item in data: + if not all(key in item for key in ['location', 'time', 'carbon_intensity']): + raise ValueError(f"Invalid carbon intensity data format: missing required fields in {item}") + + return data + + +def _get_run_data_and_phases(run_id): + run_query = """ + SELECT phases, start_measurement, end_measurement + FROM runs + WHERE id = %s + """ + run_data = DB().fetch_one(run_query, (run_id,)) + if not run_data or not run_data[0]: + raise ValueError(f"Run {run_id} does not have phases data") + + phases, start_time_us, end_time_us = run_data + return phases, start_time_us, end_time_us + + +def _create_measurement_metric(run_id, metric_name, detail_name, unit, sampling_rate): + return DB().fetch_one(''' + INSERT INTO measurement_metrics (run_id, metric, detail_name, unit, sampling_rate_configured) + VALUES (%s, %s, %s, %s, %s) + RETURNING id + ''', params=(run_id, metric_name, detail_name, unit, sampling_rate))[0] + +# Defines for which timestamps a carbon intensity value is needed: run start/end & phase middles +def _get_base_timestamps(phases, start_time_us, end_time_us): + timestamps = set() + + # Add overall run start and end times + if start_time_us and end_time_us: + timestamps.add(start_time_us) + timestamps.add(end_time_us) + + # Add middle timestamp for each phase + for phase in phases: + middle_timestamp = (phase['start'] + phase['end']) // 2 + timestamps.add(middle_timestamp) + + return timestamps + + +def _bulk_insert_measurement_values(measurement_metric_id, value_timestamp_pairs): + if not value_timestamp_pairs: + return + + # For small datasets, use regular INSERT with multiple VALUES + if len(value_timestamp_pairs) <= 10: + values_to_insert = [] + for value, timestamp in value_timestamp_pairs: + values_to_insert.extend([measurement_metric_id, round(value), timestamp]) + + placeholders = ', '.join(['(%s, %s, %s)'] * len(value_timestamp_pairs)) + query = f"INSERT INTO measurement_values (measurement_metric_id, value, time) VALUES {placeholders}" + DB().query(query, tuple(values_to_insert)) + # For larger datasets, use COPY FROM for better performance + else: + values_data = [(measurement_metric_id, round(value), timestamp) + for value, timestamp in value_timestamp_pairs] + csv_data = '\n'.join([f"{row[0]},{row[1]},{row[2]}" for row in values_data]) + f = StringIO(csv_data) + DB().copy_from( + file=f, + table='measurement_values', + columns=['measurement_metric_id', 'value', 'time'], + sep=',' + ) + f.close() + + +def store_static_carbon_intensity(run_id, static_value): + phases, start_time_us, end_time_us = _get_run_data_and_phases(run_id) + + metric_name = 'grid_carbon_intensity_config_location' + detail_name = '[CONFIG]' + unit = 'gCO2e/kWh' + sampling_rate = 0 # Static value has no sampling rate + + measurement_metric_id = _create_measurement_metric( + run_id, metric_name, detail_name, unit, sampling_rate + ) + + # Calculate base timestamps, for which we definitely need a value: + # start/end of run + middle of each phase + timestamps = _get_base_timestamps(phases, start_time_us, end_time_us) + + value_timestamp_pairs = [(static_value, timestamp) for timestamp in timestamps] + + _bulk_insert_measurement_values(measurement_metric_id, value_timestamp_pairs) + + print(f"Stored static carbon intensity value {static_value} gCO2e/kWh at {len(timestamps)} timestamps (run start/end + phase middles)") + + +def store_dynamic_carbon_intensity(run_id, location): + phases, start_time_us, end_time_us = _get_run_data_and_phases(run_id) + start_time_iso = _microseconds_to_iso8601(start_time_us) + end_time_iso = _microseconds_to_iso8601(end_time_us) + + carbon_client = CarbonIntensityClient() + carbon_intensity_data = carbon_client.get_carbon_intensity_history( + location, start_time_iso, end_time_iso + ) + if not carbon_intensity_data: + raise ValueError( + f"No carbon intensity data received from service for location '{location}' " + f"between {start_time_iso} and {end_time_iso}. The service returned an empty dataset." + ) + + values = [float(dp['carbon_intensity']) for dp in carbon_intensity_data] + print(f"Retrieved {len(carbon_intensity_data)} API data points for {location}: " + f"range {min(values):.1f}-{max(values):.1f} gCO2e/kWh") + + metric_name = 'grid_carbon_intensity_api_location' + detail_name = location + unit = 'gCO2e/kWh' + sampling_rate = _calculate_sampling_rate_from_data(carbon_intensity_data) + + measurement_metric_id = _create_measurement_metric( + run_id, metric_name, detail_name, unit, sampling_rate + ) + + # Convert API data to format we need within GMT + carbon_data_for_lookup = [] + for data_point in carbon_intensity_data: + # Convert ISO timestamp to microseconds + iso_time = data_point['time'] + dt = datetime.fromisoformat(iso_time.replace('Z', '+00:00')) + timestamp_us = int(dt.timestamp() * 1_000_000) + + carbon_data_for_lookup.append({ + 'timestamp_us': timestamp_us, + 'carbon_intensity': float(data_point['carbon_intensity']) + }) + + carbon_data_for_lookup.sort(key=lambda x: x['timestamp_us']) + + # Calculate base timestamps, for which we definitely need a value: + # start/end of run + middle of each phase + timestamps = _get_base_timestamps(phases, start_time_us, end_time_us) + + # Add any intermediate API data points that fall within measurement timeframe + for data_point in carbon_data_for_lookup: + timestamp_us = data_point['timestamp_us'] + if start_time_us <= timestamp_us <= end_time_us: + timestamps.add(timestamp_us) + + value_timestamp_pairs = [] + if len(carbon_data_for_lookup) == 1: + # If only one data point, use it for all timestamps + carbon_intensity = carbon_data_for_lookup[0]['carbon_intensity'] + value_timestamp_pairs = [(carbon_intensity, timestamp) for timestamp in timestamps] + else: + # Convert timestamps to values using nearest data point logic + for timestamp in timestamps: + carbon_intensity = _get_carbon_intensity_at_timestamp(timestamp, carbon_data_for_lookup) + value_timestamp_pairs.append((carbon_intensity, timestamp)) + + _bulk_insert_measurement_values(measurement_metric_id, value_timestamp_pairs) + + unique_values = len(set(int(value) for value, _ in value_timestamp_pairs)) + print(f"Stored dynamic carbon intensity for location {location}: {len(value_timestamp_pairs)} timestamps, {unique_values} unique values") + + +# Find the data point with timestamp closest to target timestamp. +# Interpolation is not used on purpose here. +def _get_carbon_intensity_at_timestamp(timestamp_us: int, carbon_data: List[Dict[str, Any]]) -> float: + closest_point = min( + carbon_data, + key=lambda point: abs(point['timestamp_us'] - timestamp_us) + ) + + return float(closest_point['carbon_intensity']) + + +def _calculate_sampling_rate_from_data(carbon_intensity_data: List[Dict[str, Any]]) -> int: + if not carbon_intensity_data or len(carbon_intensity_data) < 2: + return 0 + + try: + time1 = datetime.fromisoformat(carbon_intensity_data[0]['time'].replace('Z', '+00:00')) + time2 = datetime.fromisoformat(carbon_intensity_data[1]['time'].replace('Z', '+00:00')) + interval_seconds = abs((time2 - time1).total_seconds()) + sampling_rate = int(interval_seconds * 1000) + return sampling_rate + except (KeyError, ValueError, IndexError): + return 0 + + +def _microseconds_to_iso8601(timestamp_us: int) -> str: + timestamp_seconds = timestamp_us / 1_000_000 + dt = datetime.fromtimestamp(timestamp_seconds, timezone.utc) + return dt.strftime('%Y-%m-%dT%H:%M:%SZ') diff --git a/lib/phase_stats.py b/lib/phase_stats.py index 4421bd975..4d30b3919 100644 --- a/lib/phase_stats.py +++ b/lib/phase_stats.py @@ -66,6 +66,8 @@ def generate_csv_line(run_id, metric, detail_name, phase_name, value, value_type # else '' resolves to NULL return f"{run_id},{metric},{detail_name},{phase_name},{round(value)},{value_type},{round(max_value) if max_value is not None else ''},{round(min_value) if min_value is not None else ''},{round(sampling_rate_avg) if sampling_rate_avg is not None else ''},{round(sampling_rate_max) if sampling_rate_max is not None else ''},{round(sampling_rate_95p) if sampling_rate_95p is not None else ''},{unit},NOW()\n" + + def build_and_store_phase_stats(run_id, sci=None): if not sci: sci = {} @@ -115,6 +117,9 @@ def build_and_store_phase_stats(run_id, sci=None): cpu_utilization_machine = None network_io_carbon_in_ug = None + phase_grid_carbon_intensity = None + phase_energy_metrics = [] + select_query = """ WITH lag_table as ( SELECT time, value, (time - LAG(time) OVER (ORDER BY time ASC)) AS diff @@ -239,13 +244,16 @@ def build_and_store_phase_stats(run_id, sci=None): power_min = (min_value * 10**3) / (duration / value_count) csv_buffer.write(generate_csv_line(run_id, f"{metric.replace('_energy_', '_power_')}", detail_name, f"{idx:03}_{phase['name']}", power_avg, 'MEAN', power_max, power_min, sampling_rate_avg, sampling_rate_max, sampling_rate_95p, 'mW')) - if sci.get('I', None) is not None: - value_carbon_ug = (value_sum / 3_600_000) * Decimal(sci['I']) - - csv_buffer.write(generate_csv_line(run_id, f"{metric.replace('_energy_', '_carbon_')}", detail_name, f"{idx:03}_{phase['name']}", value_carbon_ug, 'TOTAL', None, None, sampling_rate_avg, sampling_rate_max, sampling_rate_95p, 'ug')) - - if '[' not in phase['name'] and metric.endswith('_machine'): # only for runtime sub phases to not double count ... needs refactor ... see comment at beginning of file - software_carbon_intensity_global['machine_carbon_ug'] = software_carbon_intensity_global.get('machine_carbon_ug', 0) + value_carbon_ug + # Store energy metric data for carbon calculation at the end of the phase loop + phase_energy_metrics.append({ + 'metric': metric, + 'detail_name': detail_name, + 'value_sum': value_sum, + 'phase_name': f"{idx:03}_{phase['name']}", + 'sampling_rate_avg': sampling_rate_avg, + 'sampling_rate_max': sampling_rate_max, + 'sampling_rate_95p': sampling_rate_95p + }) if metric.endswith('_machine'): @@ -255,32 +263,31 @@ def build_and_store_phase_stats(run_id, sci=None): machine_energy_current_phase = value_sum machine_power_current_phase = power_avg + elif "grid_carbon_intensity" in metric: + # For the average sampling rate use the configured one, for 95p and max we don't use a value + csv_buffer.write(generate_csv_line(run_id, metric, detail_name, f"{idx:03}_{phase['name']}", avg_value, 'MEAN', max_value, min_value, sampling_rate_configured, None, None, unit)) + phase_grid_carbon_intensity = avg_value + else: # Default if metric not in ('cpu_time_powermetrics_vm', ): error_helpers.log_error('Unmapped phase_stat found, using default', metric=metric, detail_name=detail_name, run_id=run_id) csv_buffer.write(generate_csv_line(run_id, metric, detail_name, f"{idx:03}_{phase['name']}", value_sum, 'TOTAL', max_value, min_value, sampling_rate_avg, sampling_rate_max, sampling_rate_95p, unit)) # after going through detail metrics, create cumulated ones + network_io_in_kWh = None if network_bytes_total: - if sci.get('N', None) is not None and sci.get('I', None) is not None: + if sci.get('N', None) is not None: # build the network energy by using a formula: https://www.green-coding.io/co2-formulas/ # pylint: disable=invalid-name network_io_in_kWh = Decimal(sum(network_bytes_total)) / 1_000_000_000 * Decimal(sci['N']) network_io_in_uJ = network_io_in_kWh * 3_600_000_000_000 csv_buffer.write(generate_csv_line(run_id, 'network_energy_formula_global', '[FORMULA]', f"{idx:03}_{phase['name']}", network_io_in_uJ, 'TOTAL', None, None, None, None, None, 'uJ')) - #power calculations + # power calculations network_io_power_in_mW = (network_io_in_kWh * Decimal('3600000') / Decimal(duration_in_s) * Decimal('1000')) csv_buffer.write(generate_csv_line(run_id, 'network_power_formula_global', '[FORMULA]', f"{idx:03}_{phase['name']}", network_io_power_in_mW, 'TOTAL', None, None, None, None, None, 'mW')) - - # co2 calculations - network_io_carbon_in_ug = network_io_in_kWh * Decimal(sci['I']) * 1_000_000 - csv_buffer.write(generate_csv_line(run_id, 'network_carbon_formula_global', '[FORMULA]', f"{idx:03}_{phase['name']}", network_io_carbon_in_ug, 'TOTAL', None, None, None, None, None, 'ug')) else: - error_helpers.log_error('Cannot calculate the total network energy consumption. SCI values I and N are missing in the config.', run_id=run_id) - network_io_carbon_in_ug = 0 - else: - network_io_carbon_in_ug = 0 + error_helpers.log_error('Cannot calculate the total network energy consumption. SCI value N is missing in the config.', run_id=run_id) if sci.get('EL', None) is not None and sci.get('TE', None) is not None and sci.get('RS', None) is not None: duration_in_years = duration_in_s / (60 * 60 * 24 * 365) @@ -308,6 +315,28 @@ def build_and_store_phase_stats(run_id, sci=None): csv_buffer.write(generate_csv_line(run_id, 'psu_energy_cgroup_container', detail_name, f"{idx:03}_{phase['name']}", surplus_energy_runtime * splitting_ratio, 'TOTAL', None, None, None, None, None, 'uJ')) csv_buffer.write(generate_csv_line(run_id, 'psu_power_cgroup_container', detail_name, f"{idx:03}_{phase['name']}", surplus_power_runtime * splitting_ratio, 'TOTAL', None, None, None, None, None, 'mW')) + # Calculate carbon emissions for this phase's energy metrics + if phase_grid_carbon_intensity is not None and phase_energy_metrics: + for energy_metric in phase_energy_metrics: + metric = energy_metric['metric'] + detail_name = energy_metric['detail_name'] + value_sum = energy_metric['value_sum'] + phase_full_name = energy_metric['phase_name'] + sampling_rate_avg = energy_metric['sampling_rate_avg'] + sampling_rate_max = energy_metric['sampling_rate_max'] + sampling_rate_95p = energy_metric['sampling_rate_95p'] + + value_carbon_ug = (value_sum / 3_600_000) * phase_grid_carbon_intensity + csv_buffer.write(generate_csv_line(run_id, f"{metric.replace('_energy_', '_carbon_')}", detail_name, phase_full_name, value_carbon_ug, 'TOTAL', None, None, sampling_rate_avg, sampling_rate_max, sampling_rate_95p, 'ug')) + + if '[' not in phase['name'] and metric.endswith('_machine'): # only for runtime sub phases to not double count ... needs refactor ... see comment at beginning of file + software_carbon_intensity_global['machine_carbon_ug'] = software_carbon_intensity_global.get('machine_carbon_ug', 0) + value_carbon_ug + + # Calculate network carbon emissions for this phase + if phase_grid_carbon_intensity is not None and network_io_in_kWh is not None: + network_io_carbon_in_ug = network_io_in_kWh * phase_grid_carbon_intensity * 1_000_000 + csv_buffer.write(generate_csv_line(run_id, 'network_carbon_formula_global', '[FORMULA]', f"{idx:03}_{phase['name']}", network_io_carbon_in_ug, 'TOTAL', None, None, None, None, None, 'ug')) + # TODO: refactor to be a metric provider. Than it can also be per phase # pylint: disable=fixme if software_carbon_intensity_global.get('machine_carbon_ug', None) is not None \ and software_carbon_intensity_global.get('embodied_carbon_share_ug', None) is not None \ @@ -317,6 +346,7 @@ def build_and_store_phase_stats(run_id, sci=None): csv_buffer.write(generate_csv_line(run_id, 'software_carbon_intensity_global', '[SYSTEM]', f"{runtime_phase_idx:03}_[RUNTIME]", (software_carbon_intensity_global['machine_carbon_ug'] + software_carbon_intensity_global['embodied_carbon_share_ug']) / Decimal(sci['R']), 'TOTAL', None, None, None, None, None, f"ugCO2e/{sci['R_d']}")) # TODO End # pylint: disable=fixme + csv_buffer.seek(0) # Reset buffer position to the beginning DB().copy_from( csv_buffer, diff --git a/lib/scenario_runner.py b/lib/scenario_runner.py index 470d4c152..a82a54593 100644 --- a/lib/scenario_runner.py +++ b/lib/scenario_runner.py @@ -162,6 +162,7 @@ def __init__(self, ('_save_notes_runner', {}), ('_save_run_logs', {}), ('_save_warnings', {}), + ('_process_grid_carbon_intensity', {}), ('_process_phase_stats', {}), ) @@ -2068,6 +2069,34 @@ def _patch_phases(self): if self.__phases.get('[RUNTIME]', None) is not None and self.__phases['[RUNTIME]'].get('end', None) is None: self.__phases['[RUNTIME]']['end'] = int(time.time_ns() / 1_000) + def _process_grid_carbon_intensity(self): + if not self._run_id or self._dev_no_phase_stats or self._dev_no_save: + return + + print(TerminalColors.HEADER, '\nProcess grid carbon intensity values', TerminalColors.ENDC) + + config = GlobalConfig().config + dynamic_grid_carbon_intensity = config.get('dynamic_grid_carbon_intensity', None) + if dynamic_grid_carbon_intensity: + # Store dynamic carbon intensity from API + location = dynamic_grid_carbon_intensity.get('location', None) + if location is None: + raise ValueError("Dynamic grid carbon intensity is enabled, but location configuration is missing! Ensure it is set in your config.yml.") + + from lib.carbon_intensity import store_dynamic_carbon_intensity # pylint: disable=import-outside-toplevel + store_dynamic_carbon_intensity(self._run_id, location) + elif self._sci['I']: + # Store static carbon intensity from config as constant time series + from lib.carbon_intensity import store_static_carbon_intensity # pylint: disable=import-outside-toplevel + store_static_carbon_intensity(self._run_id, self._sci['I']) + else: + raise ValueError( + "No grid carbon intensity configured. Cannot proceed with carbon footprint calculations. " + "Please configure either: (1) Static carbon intensity by setting 'sci.I' in your config, " + "or (2) Dynamic carbon intensity by enabling 'grid_carbon_intensity.dynamic' and setting " + "'grid_carbon_intensity.location'." + ) + def _process_phase_stats(self): if not self._run_id or self._dev_no_phase_stats or self._dev_no_save: return diff --git a/tests/lib/test_carbon_intensity.py b/tests/lib/test_carbon_intensity.py new file mode 100644 index 000000000..e54868a1d --- /dev/null +++ b/tests/lib/test_carbon_intensity.py @@ -0,0 +1,362 @@ +import calendar +import os +import pytest +import requests +from unittest.mock import Mock, patch +from datetime import datetime +from datetime import timezone + +GMT_ROOT_DIR = os.path.dirname(os.path.abspath(__file__))+'/../../' + +from tests import test_functions as Tests +from lib.db import DB +from lib.carbon_intensity import ( + CarbonIntensityClient, + _microseconds_to_iso8601, + _calculate_sampling_rate_from_data, + _get_carbon_intensity_at_timestamp, + store_static_carbon_intensity, + store_dynamic_carbon_intensity +) + +class TestCarbonIntensityClient: + + @patch('lib.carbon_intensity.GlobalConfig') + def test_config_based_initialization(self, mock_global_config): + # Test that client reads URL from config when not provided + mock_config = Mock() + mock_config.config = { + 'dynamic_grid_carbon_intensity': { + 'elephant': { + 'protocol': 'https', + 'host': 'example.com', + 'port': 9000 + } + } + } + mock_global_config.return_value = mock_config + + client = CarbonIntensityClient() + assert client.base_url == "https://example.com:9000" + + @patch('lib.carbon_intensity.GlobalConfig') + def test_config_based_initialization_defaults(self, mock_global_config): + # Test that client uses defaults when config is empty + mock_config = Mock() + mock_config.config = {} + mock_global_config.return_value = mock_config + + client = CarbonIntensityClient() + assert client.base_url == "http://localhost:8000" + + def test__microseconds_to_iso8601(self): + # Test timestamp conversion + timestamp_us = 1727003400000000 # Some timestamp + result = _microseconds_to_iso8601(timestamp_us) + # Just verify format is correct ISO 8601 + assert len(result) == 20 + assert result.endswith('Z') + assert 'T' in result + # Verify it's a valid timestamp that can be parsed back + parsed = datetime.fromisoformat(result.replace('Z', '+00:00')) + assert parsed is not None + + def test__calculate_sampling_rate_from_data(self): + # Test with 1 hour interval using API format with 'time' field + carbon_data = [ + {"location": "DE", "time": "2025-09-23T10:00:00Z", "carbon_intensity": 253.0}, + {"location": "DE", "time": "2025-09-23T11:00:00Z", "carbon_intensity": 252.0} + ] + result = _calculate_sampling_rate_from_data(carbon_data) + assert result == 3600000 # 1 hour = 3600 seconds = 3600000 ms + + # Test with 30 minute interval + carbon_data_30min = [ + {"location": "DE", "time": "2025-09-23T10:00:00Z", "carbon_intensity": 253.0}, + {"location": "DE", "time": "2025-09-23T10:30:00Z", "carbon_intensity": 252.0} + ] + result = _calculate_sampling_rate_from_data(carbon_data_30min) + assert result == 1800000 # 30 minutes = 1800 seconds = 1800000 ms + + # Test with empty data (should return fallback) + result = _calculate_sampling_rate_from_data([]) + assert result == 0 + + # Test with single data point (should return fallback) + result = _calculate_sampling_rate_from_data([{"location": "DE", "time": "2025-09-23T10:00:00Z", "carbon_intensity": 253.0}]) + assert result == 0 + + # Test with invalid data (should return fallback) + result = _calculate_sampling_rate_from_data([{"invalid": "data"}, {"also": "invalid"}]) + assert result == 0 + + def test__get_carbon_intensity_at_timestamp_single_point(self): + # Test with single data point + carbon_data = [ + {"location": "DE", "timestamp_us": int(datetime(2024, 9, 22, 10, 0, 0, tzinfo=timezone.utc).timestamp() * 1_000_000), "carbon_intensity": 185.0, "sampling_rate_ms": 300000} + ] + timestamp_us = 1727003400000000 # 2024-09-22T10:50:00Z + result = _get_carbon_intensity_at_timestamp(timestamp_us, carbon_data) + assert result == 185.0 + + def test__get_carbon_intensity_at_timestamp_between_points(self): + # Test nearest point selection between two points + carbon_data = [ + {"location": "DE", "timestamp_us": int(datetime(2024, 9, 22, 10, 0, 0, tzinfo=timezone.utc).timestamp() * 1_000_000), "carbon_intensity": 180.0}, + {"location": "DE", "timestamp_us": int(datetime(2024, 9, 22, 11, 0, 0, tzinfo=timezone.utc).timestamp() * 1_000_000), "carbon_intensity": 200.0} + ] + # Calculate correct timestamp for 10:30:00 UTC + mid_time = datetime(2024, 9, 22, 10, 30, 0) # UTC time + timestamp_us = int(calendar.timegm(mid_time.timetuple()) * 1_000_000) + + result = _get_carbon_intensity_at_timestamp(timestamp_us, carbon_data) + assert result == 180.0 # Nearest point: 10:30 is closer to 10:00 than 11:00 + + def test__get_carbon_intensity_at_timestamp_before_range(self): + # Test with timestamp before data range + carbon_data = [ + {"location": "DE", "timestamp_us": int(datetime(2024, 9, 22, 11, 0, 0, tzinfo=timezone.utc).timestamp() * 1_000_000), "carbon_intensity": 185.0} + ] + timestamp_us = 1727001600000000 # 2024-09-22T10:20:00Z (before 11:00) + result = _get_carbon_intensity_at_timestamp(timestamp_us, carbon_data) + assert result == 185.0 # Should return first value + + def test__get_carbon_intensity_at_timestamp_after_range(self): + # Test with timestamp after data range + carbon_data = [ + {"location": "DE", "timestamp_us": int(datetime(2024, 9, 22, 10, 0, 0, tzinfo=timezone.utc).timestamp() * 1_000_000), "carbon_intensity": 185.0} + ] + timestamp_us = 1727007000000000 # 2024-09-22T11:50:00Z (after 10:00) + result = _get_carbon_intensity_at_timestamp(timestamp_us, carbon_data) + assert result == 185.0 # Should return last value + + def test__get_carbon_intensity_at_timestamp_empty_data(self): + # Test with empty data + with pytest.raises(ValueError, match="min\\(\\) iterable argument is empty"): + _get_carbon_intensity_at_timestamp(1727003400000000, []) + + @patch('lib.carbon_intensity.requests.get') + def test_carbon_intensity_client_success(self, mock_get): + # Test successful API call + mock_response = Mock() + mock_response.raise_for_status.return_value = None + mock_response.json.return_value = [ + {"location": "DE", "time": "2024-09-22T10:00:00Z", "carbon_intensity": 185.0}, + {"location": "DE", "time": "2024-09-22T11:00:00Z", "carbon_intensity": 183.0} + ] + mock_get.return_value = mock_response + + client = CarbonIntensityClient("http://localhost:8000") + result = client.get_carbon_intensity_history("DE", "2024-09-22T10:50:00Z", "2024-09-22T10:55:00Z") + + assert len(result) == 2 + assert result[0]['carbon_intensity'] == 185.0 + assert result[1]['carbon_intensity'] == 183.0 + + mock_get.assert_called_once_with( + "http://localhost:8000/carbon-intensity/history", + params={ + 'location': 'DE', + 'startTime': '2024-09-22T10:50:00Z', + 'endTime': '2024-09-22T10:55:00Z', + 'interpolate': 'true' + }, + timeout=30 + ) + + @patch('lib.carbon_intensity.requests.get') + def test_carbon_intensity_client_network_error(self, mock_get): + # Test network error handling + mock_get.side_effect = requests.exceptions.RequestException("Network error") + + client = CarbonIntensityClient("http://localhost:8000") + with pytest.raises(requests.exceptions.RequestException): + client.get_carbon_intensity_history("DE", "2024-09-22T10:50:00Z", "2024-09-22T10:55:00Z") + + @patch('lib.carbon_intensity.requests.get') + def test_carbon_intensity_client_invalid_response(self, mock_get): + # Test invalid response handling + mock_response = Mock() + mock_response.raise_for_status.return_value = None + mock_response.json.return_value = {"invalid": "response"} + mock_get.return_value = mock_response + + client = CarbonIntensityClient("http://localhost:8000") + with pytest.raises(ValueError, match="Expected list response from carbon intensity service"): + client.get_carbon_intensity_history("DE", "2024-09-22T10:50:00Z", "2024-09-22T10:55:00Z") + +class TestStoreCarbonIntensityAsMetrics: + + def test_store_carbon_intensity_static_value(self): + # Test that static carbon intensity is stored correctly at the relevant time points + run_id = Tests.insert_run() + static_carbon_intensity = 250.6 + + store_static_carbon_intensity(run_id, static_carbon_intensity) + + # Verify that measurement_metrics entry was created for static carbon intensity + metric_result = DB().fetch_one( + "SELECT metric, detail_name, unit FROM measurement_metrics WHERE run_id = %s", + (run_id,) + ) + + assert metric_result is not None + assert metric_result[0] == 'grid_carbon_intensity_config_location' + assert metric_result[1] == '[CONFIG]' + assert metric_result[2] == 'gCO2e/kWh' + + # Verify that static value was stored (should have up to 7 data points: start/end of run + middle of 5 phases, deduplicated) + values_result = DB().fetch_all( + """SELECT mv.value + FROM measurement_values mv + JOIN measurement_metrics mm ON mv.measurement_metric_id = mm.id + WHERE mm.run_id = %s AND mm.metric = 'grid_carbon_intensity_config_location'""", + (run_id,) + ) + + run_query = """ + SELECT phases, start_measurement, end_measurement + FROM runs + WHERE id = %s + """ + run_data = DB().fetch_one(run_query, (run_id,)) + print(run_data) + + assert len(values_result) == 8 # 5 phases + 1 flow + start of run + end of run + for result in values_result: + assert result[0] == 251 # 250.6 is rounded up + + def test_store_carbon_intensity_dynamic_grid_enabled(self): + # Test that dynamic grid carbon intensity is stored when enabled in measurement config + run_id = Tests.insert_run() + + # Mock the carbon intensity API call + # Use timestamps that align with the measurement timeframe (2024-12-24T13:33:10Z to 2024-12-24T13:41:00Z) + with patch('lib.carbon_intensity.CarbonIntensityClient') as mock_client_class: + mock_client = Mock() + mock_client_class.return_value = mock_client + mock_client.get_carbon_intensity_history.return_value = [ + {"location": "DE", "time": "2024-12-24T13:32:00Z", "carbon_intensity": 185.0}, # Before start (for extrapolation) + {"location": "DE", "time": "2024-12-24T13:35:00Z", "carbon_intensity": 190.0}, # Within timeframe + {"location": "DE", "time": "2024-12-24T13:38:00Z", "carbon_intensity": 188.0}, # Within timeframe + {"location": "DE", "time": "2024-12-24T13:42:00Z", "carbon_intensity": 183.0} # After end (for extrapolation) + ] + + # Call the function under test + store_dynamic_carbon_intensity(run_id, 'DE') + + # Verify that measurement_metrics entry was created for dynamic carbon intensity + metric_result = DB().fetch_one( + "SELECT metric, detail_name, unit FROM measurement_metrics WHERE run_id = %s", + (run_id,) + ) + + assert metric_result is not None + assert metric_result[0] == 'grid_carbon_intensity_api_location' + assert metric_result[1] == 'DE' + assert metric_result[2] == 'gCO2e/kWh' + + # Verify that measurement values were stored + values_result = DB().fetch_all( + """SELECT mv.value, mv.time + FROM measurement_values mv + JOIN measurement_metrics mm ON mv.measurement_metric_id = mm.id + WHERE mm.run_id = %s AND mm.metric = 'grid_carbon_intensity_api_location' + ORDER BY mv.time""", + (run_id,) + ) + + # Should have at least 7 data points: start/end of run + middle of 5 phases + API data points + # Actual count may vary due to deduplication of timestamps + assert len(values_result) >= 7 + # All values should be integers (nearest data point logic applied) + for value, _ in values_result: + assert isinstance(value, int) + + def test_store_carbon_intensity_dynamic_single_data_point(self): + run_id = Tests.insert_run() + + # Mock the carbon intensity API call with only one data point within timeframe + with patch('lib.carbon_intensity.CarbonIntensityClient') as mock_client_class: + mock_client = Mock() + mock_client_class.return_value = mock_client + mock_client.get_carbon_intensity_history.return_value = [ + {"location": "DE", "time": "2024-12-24T13:37:00Z", "carbon_intensity": 185.0} # Within measurement timeframe + ] + + # Call the function under test + store_dynamic_carbon_intensity(run_id, 'DE') + + # Verify that measurement_metrics entry was created for dynamic carbon intensity + metric_result = DB().fetch_one( + "SELECT metric, detail_name, unit FROM measurement_metrics WHERE run_id = %s", + (run_id,) + ) + + assert metric_result is not None + assert metric_result[0] == 'grid_carbon_intensity_api_location' + assert metric_result[1] == 'DE' + assert metric_result[2] == 'gCO2e/kWh' + + # Verify that measurement values were stored + values_result = DB().fetch_all( + """SELECT mv.value, mv.time + FROM measurement_values mv + JOIN measurement_metrics mm ON mv.measurement_metric_id = mm.id + WHERE mm.run_id = %s AND mm.metric = 'grid_carbon_intensity_api_location' + ORDER BY mv.time""", + (run_id,) + ) + + # Should have at least 7 data points: start/end of run + middle of 5 phases + API data point + # All using nearest data point (single API point in this case) + assert len(values_result) >= 7 + # All values should be the same (185) since only one API data point + for value, _ in values_result: + assert value == 185 + + def test_store_carbon_intensity_dynamic_data_outside_timeframe(self): + # Test that dynamic carbon intensity properly handles data outside measurement timeframe using extrapolation + run_id = Tests.insert_run() + + # Mock API data that is completely outside the measurement timeframe + with patch('lib.carbon_intensity.CarbonIntensityClient') as mock_client_class: + mock_client = Mock() + mock_client_class.return_value = mock_client + mock_client.get_carbon_intensity_history.return_value = [ + {"location": "DE", "time": "2024-12-24T12:00:00Z", "carbon_intensity": 200.0}, # Well before start + {"location": "DE", "time": "2024-12-24T12:30:00Z", "carbon_intensity": 210.0} # Still before start + ] + + # Call the function under test + store_dynamic_carbon_intensity(run_id, 'DE') + + # Verify that measurement_metrics entry was created + metric_result = DB().fetch_one( + "SELECT metric, detail_name, unit FROM measurement_metrics WHERE run_id = %s", + (run_id,) + ) + + assert metric_result is not None + assert metric_result[0] == 'grid_carbon_intensity_api_location' + + # Verify that measurement values were stored using extrapolation + values_result = DB().fetch_all( + """SELECT mv.value, mv.time + FROM measurement_values mv + JOIN measurement_metrics mm ON mv.measurement_metric_id = mm.id + WHERE mm.run_id = %s AND mm.metric = 'grid_carbon_intensity_api_location' + ORDER BY mv.time""", + (run_id,) + ) + + # Should have at least 7 data points: start/end of run + middle of 5 phases + # All using nearest data point logic with API data outside timeframe + assert len(values_result) >= 7 + + def test_store_carbon_intensity_dynamic_missing_location(self): + # Test error handling when dynamic method is called with None location + run_id = Tests.insert_run() + + with pytest.raises(Exception): + store_dynamic_carbon_intensity(run_id, None) diff --git a/tests/lib/test_phase_stats.py b/tests/lib/test_phase_stats.py index 7cbcc435b..41a001e3f 100644 --- a/tests/lib/test_phase_stats.py +++ b/tests/lib/test_phase_stats.py @@ -125,16 +125,18 @@ def test_phase_stats_multi(): assert data[5]['sampling_rate_max'] == 100688, 'MAX sampling rate not in expected range' assert data[5]['sampling_rate_95p'] == 99696, '95p sampling rate not in expected range' -def test_phase_embodied_and_operational_carbon(): +def test_phase_embodied_and_operational_carbon_using_static_intensity(): run_id = Tests.insert_run() Tests.import_machine_energy(run_id) sci = {"I":436,"R":0,"EL":4,"RS":1,"TE":181000,"R_d":"page request"} + Tests.import_static_carbon_intensity_metrics(run_id, sci['I']) + build_and_store_phase_stats(run_id, sci=sci) data = DB().fetch_all('SELECT metric, detail_name, unit, value, type, sampling_rate_avg, sampling_rate_max, sampling_rate_95p, phase FROM phase_stats WHERE phase = %s ', params=('004_[RUNTIME]', ), fetch_mode='dict') - assert len(data) == 5 + assert len(data) == 6 psu_energy_ac_mcp_machine = data[3] assert psu_energy_ac_mcp_machine['metric'] == 'psu_energy_ac_mcp_machine' @@ -162,6 +164,45 @@ def test_phase_embodied_and_operational_carbon(): assert embodied_carbon_share_machine['sampling_rate_max'] is None, 'MAX sampling rate not in expected range' assert embodied_carbon_share_machine['sampling_rate_95p'] is None, '95p sampling rate not in expected range' +def test_phase_embodied_and_operational_carbon_using_dynamic_intensity(): + run_id = Tests.insert_run() + Tests.import_machine_energy(run_id) + + sci = {"R":0,"EL":4,"RS":1,"TE":181000,"R_d":"page request"} # 'I' was removed, because it is not relevant here using dynamic values + grid_carbon_intensity = Tests.import_dynamic_carbon_intensity_metrics(run_id) + + build_and_store_phase_stats(run_id, sci=sci) + + data = DB().fetch_all('SELECT metric, detail_name, unit, value, type, sampling_rate_avg, sampling_rate_max, sampling_rate_95p, phase FROM phase_stats WHERE phase = %s ', params=('004_[RUNTIME]', ), fetch_mode='dict') + + assert len(data) == 6 + psu_energy_ac_mcp_machine = data[3] + assert psu_energy_ac_mcp_machine['metric'] == 'psu_energy_ac_mcp_machine' + + psu_carbon_ac_mcp_machine = data[2] + + assert psu_carbon_ac_mcp_machine['metric'] == 'psu_carbon_ac_mcp_machine' + assert psu_carbon_ac_mcp_machine['detail_name'] == '[MACHINE]' + assert psu_carbon_ac_mcp_machine['unit'] == 'ug' + + operational_carbon_expected = int(psu_energy_ac_mcp_machine['value'] * MICROJOULES_TO_KWH * grid_carbon_intensity * 1_000_000) + assert psu_carbon_ac_mcp_machine['value'] == operational_carbon_expected + assert psu_carbon_ac_mcp_machine['type'] == 'TOTAL' + + phase_time_in_years = Tests.TEST_MEASUREMENT_DURATION_S / (60 * 60 * 24 * 365) + embodied_carbon_expected = int((phase_time_in_years / sci['EL']) * sci['TE'] * sci['RS'] * 1_000_000) + + embodied_carbon_share_machine = data[0] + assert embodied_carbon_share_machine['metric'] == 'embodied_carbon_share_machine' + assert embodied_carbon_share_machine['detail_name'] == '[SYSTEM]' + assert embodied_carbon_share_machine['unit'] == 'ug' + assert embodied_carbon_share_machine['value'] == embodied_carbon_expected + assert embodied_carbon_share_machine['type'] == 'TOTAL' + + assert embodied_carbon_share_machine['sampling_rate_avg'] is None, 'AVG sampling rate not in expected range' + assert embodied_carbon_share_machine['sampling_rate_max'] is None, 'MAX sampling rate not in expected range' + assert embodied_carbon_share_machine['sampling_rate_95p'] is None, '95p sampling rate not in expected range' + def test_phase_stats_energy_one_measurement(): run_id = Tests.insert_run() Tests.import_single_cpu_energy_measurement(run_id) @@ -288,6 +329,7 @@ def test_phase_stats_network_data(): 'N': 0.001, # Network energy intensity (kWh/GB) 'I': 500, # Carbon intensity (gCO2e/kWh) } + Tests.import_static_carbon_intensity_metrics(run_id, test_sci_config['I']) build_and_store_phase_stats(run_id, sci=test_sci_config) @@ -332,6 +374,25 @@ def test_phase_stats_network_data(): assert network_carbon_entry['type'] == 'TOTAL' assert math.isclose(network_carbon_entry['value'], expected_network_carbon_ug, rel_tol=1e-5), f"Expected network carbon: {expected_network_carbon_ug}, got: {network_carbon_entry['value']}" + +def test_phase_stats_dynamic_grid_carbon_intensity(): + run_id = Tests.insert_run() + Tests.import_dynamic_carbon_intensity_metrics(run_id) + + build_and_store_phase_stats(run_id) + + data = DB().fetch_all('SELECT metric, detail_name, unit, value, type, sampling_rate_avg, sampling_rate_max, sampling_rate_95p FROM phase_stats WHERE phase = %s ', params=('004_[RUNTIME]', ), fetch_mode='dict') + + assert len(data) == 2 + assert data[1]['metric'] == 'grid_carbon_intensity_api_location' + assert data[1]['detail_name'] == 'DE' + assert data[1]['unit'] == 'gCO2e/kWh' + assert data[1]['value'] == 270 + assert data[1]['type'] == 'MEAN' + assert data[1]['sampling_rate_avg'] == 60000000, 'Configured sampling rate should be used' + assert data[1]['sampling_rate_max'] is None, 'MAX sampling rate should not be set' + assert data[1]['sampling_rate_95p'] is None, '95p sampling rate should not be set' + def test_sci_calculation(): run_id = Tests.insert_run() Tests.import_machine_energy(run_id) # Machine energy component @@ -347,6 +408,7 @@ def test_sci_calculation(): 'R': 10, # Functional unit count (10 runs) 'R_d': 'test runs' # Functional unit description } + Tests.import_static_carbon_intensity_metrics(run_id, test_sci_config['I']) build_and_store_phase_stats(run_id, sci=test_sci_config) @@ -421,5 +483,5 @@ def test_sci_multi_steps_run(): data = DB().fetch_all("SELECT value, unit FROM phase_stats WHERE phase = %s AND run_id = %s AND metric = 'software_carbon_intensity_global' ", params=('004_[RUNTIME]', run_id), fetch_mode='dict') assert len(data) == 1 - assert 8 < data[0]['value'] < 20 + assert 5 < data[0]['value'] < 20 assert data[0]['unit'] == 'ugCO2e/Cool run' diff --git a/tests/smoke_test.py b/tests/smoke_test.py index d75579e34..d872f60cf 100644 --- a/tests/smoke_test.py +++ b/tests/smoke_test.py @@ -61,6 +61,7 @@ def test_db_rows_are_written_and_presented(): # for every metric provider, check that there were rows written in the DB with info for that provider # also check (in the same test, to save on a DB call) that the output to STD.OUT # "Imported XXX metrics from {metric_provider}" displays the same count as in the DB + # The grid carbon intensity metrics are not provided by a classic metric provider and are therefore excluded from this test run_id = utils.get_run_data(RUN_NAME)['id'] assert(run_id is not None and run_id != '') @@ -69,7 +70,7 @@ def test_db_rows_are_written_and_presented(): FROM measurement_metrics as mm JOIN measurement_values as mv ON mm.id = mv.measurement_metric_id - WHERE mm.run_id = %s + WHERE mm.run_id = %s AND mm.metric NOT LIKE 'grid_carbon_intensity%%' GROUP BY mm.metric """ diff --git a/tests/test_functions.py b/tests/test_functions.py index 3ec0a33ff..d8f906f49 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -3,6 +3,7 @@ import hashlib import json +from io import StringIO from lib.db import DB from lib.global_config import GlobalConfig from lib.log_types import LogType @@ -23,6 +24,23 @@ TEST_MEASUREMENT_DURATION_S = TEST_MEASUREMENT_DURATION / 1_000_000 TEST_MEASUREMENT_DURATION_H = TEST_MEASUREMENT_DURATION_S/60/60 +PHASE_TIMESTAMPS = [ + TEST_MEASUREMENT_START_TIME-8, # [BASELINE] start + TEST_MEASUREMENT_START_TIME-7, # [BASELINE] end + TEST_MEASUREMENT_START_TIME-6, # [INSTALL] start + TEST_MEASUREMENT_START_TIME-5, # [INSTALL] end + TEST_MEASUREMENT_START_TIME-4, # [BOOT] start + TEST_MEASUREMENT_START_TIME-3, # [BOOT] end + TEST_MEASUREMENT_START_TIME-2, # [IDLE] start + TEST_MEASUREMENT_START_TIME-1, # [IDLE] end + TEST_MEASUREMENT_START_TIME, # [RUNTIME]/Only Phase start + TEST_MEASUREMENT_START_TIME + 60000000, # Mid-runtime - 1 minute + TEST_MEASUREMENT_START_TIME + 120000000, # Mid-runtime - 2 minutes + TEST_MEASUREMENT_END_TIME, # [RUNTIME]/Only Phase end + TEST_MEASUREMENT_END_TIME+1, # [REMOVE] start + TEST_MEASUREMENT_END_TIME+2, # [REMOVE] end +] + def shorten_sleep_times(duration_in_s): DB().query("UPDATE users SET capabilities = jsonb_set(capabilities,'{measurement,pre_test_sleep}',%s,false)", params=(str(duration_in_s), )) DB().query("UPDATE users SET capabilities = jsonb_set(capabilities,'{measurement,baseline_duration}',%s,false)", params=(str(duration_in_s), )) @@ -42,12 +60,14 @@ def insert_run(*, uri='test-uri', branch='test-branch', filename='test-filename' {"start": TEST_MEASUREMENT_START_TIME, "name": "Only Phase", "end": TEST_MEASUREMENT_END_TIME}, {"start": TEST_MEASUREMENT_END_TIME+1, "name": "[REMOVE]", "end": TEST_MEASUREMENT_END_TIME+2}, ] + start_measurement=TEST_MEASUREMENT_START_TIME-9 + end_measurement=TEST_MEASUREMENT_START_TIME+2 return DB().fetch_one(''' - INSERT INTO runs (uri, branch, filename, phases, user_id, machine_id) + INSERT INTO runs (uri, branch, filename, phases, user_id, machine_id, start_measurement, end_measurement) VALUES - (%s, %s, %s, %s, %s, %s) RETURNING id; - ''', params=(uri, branch, filename, json.dumps(phases), user_id, machine_id))[0] + (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id; + ''', params=(uri, branch, filename, json.dumps(phases), user_id, machine_id, start_measurement, end_measurement))[0] def import_single_cpu_energy_measurement(run_id): @@ -219,6 +239,71 @@ def import_demo_data_ee(): reset_db() raise RuntimeError('Import of Demo data into DB failed', ps.stderr) +def _import_carbon_intensity_metrics(run_id, static_value=None): + if static_value is not None: + metric_name = 'grid_carbon_intensity_config_location' + sampling_rate_configured = 0 + carbon_intensity_values = [static_value] * len(PHASE_TIMESTAMPS) + metric_type = 'static' + avg_carbon_intensity_during_runtime = static_value + else: + metric_name = 'grid_carbon_intensity_api_location' + sampling_rate_configured = 60000000 # 1 minute in milliseconds + carbon_intensity_values = [ + 180, # 180.0 gCO2e/kWh (baseline - low demand) + 175, # 175.0 gCO2e/kWh + 220, # 220.0 gCO2e/kWh (install - higher demand) + 230, # 230.0 gCO2e/kWh + 250, # 250.0 gCO2e/kWh (boot - peak demand) + 240, # 240.0 gCO2e/kWh + 190, # 190.0 gCO2e/kWh (idle - lower demand) + 185, # 185.0 gCO2e/kWh + 300, # 300.0 gCO2e/kWh (runtime start - high demand) + 280, # 280.0 gCO2e/kWh (mid-runtime) + 260, # 260.0 gCO2e/kWh (mid-runtime 2) + 240, # 240.0 gCO2e/kWh (runtime end) + 200, # 200.0 gCO2e/kWh (remove start) + 180, # 180.0 gCO2e/kWh (remove end - back to baseline) + ] + metric_type = 'dynamic' + avg_carbon_intensity_during_runtime = (carbon_intensity_values[9] + carbon_intensity_values[10]) / 2 + + detail_name = 'DE' + unit = 'gCO2e/kWh' + + measurement_metric_id = DB().fetch_one(''' + INSERT INTO measurement_metrics (run_id, metric, detail_name, unit, sampling_rate_configured) + VALUES (%s, %s, %s, %s, %s) + RETURNING id + ''', params=(run_id, metric_name, detail_name, unit, sampling_rate_configured))[0] + + values_data = [] + for timestamp, value in zip(PHASE_TIMESTAMPS, carbon_intensity_values): + values_data.append((measurement_metric_id, value, timestamp)) + + if values_data: + csv_data = '\n'.join([f"{row[0]},{row[1]},{row[2]}" for row in values_data]) + f = StringIO(csv_data) + DB().copy_from( + file=f, + table='measurement_values', + columns=['measurement_metric_id', 'value', 'time'], + sep=',' + ) + f.close() + + print(f"Imported {len(values_data)} {metric_type} carbon intensity data points for run {run_id}") + + return avg_carbon_intensity_during_runtime + +def import_static_carbon_intensity_metrics(run_id, static_value): + if static_value is None: + raise ValueError('Parameter "static_value" is missing!') + return _import_carbon_intensity_metrics(run_id, static_value) + +def import_dynamic_carbon_intensity_metrics(run_id): + return _import_carbon_intensity_metrics(run_id) + def assertion_info(expected, actual): return f"Expected: {expected}, Actual: {actual}"