From 2a48634fe091613477487a8e0351a8224fe87ff2 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Fri, 6 Jun 2025 15:58:44 +0100 Subject: [PATCH 01/12] Tidy conftest --- tests/conftest.py | 515 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 355 insertions(+), 160 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ba082220e..bb6e1bb7b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,22 +1,72 @@ -import random as rand +"""Common test fixtures for MUSE tests.""" + from collections.abc import Mapping +from contextlib import contextmanager +from copy import deepcopy +from importlib import import_module +from logging import CRITICAL, getLogger +from os import walk from pathlib import Path -from typing import Callable +from typing import Callable, Optional, Union from unittest.mock import patch +from warnings import filterwarnings, simplefilter import numpy as np -from pandas import DataFrame +import pandas as pd +from numpy.random import choice, rand, randint from pytest import fixture from xarray import DataArray, Dataset from muse.__main__ import patched_broadcast_compat_data from muse.agents import Agent +from muse.commodities import CommodityUsage +from muse.timeslices import TIMESLICE, setup_module + +# Constants +RANDOM_SEED = 123 +DEFAULT_FACTOR = 100.0 +DEFAULT_TECH_TYPES = ["solid", "liquid", "solid", "liquid"] +DEFAULT_FUELS = ["person", "person", "oil", "person"] + +DEFAULT_TIMESLICES = """ +[timeslices] +winter.weekday.night = 396 +winter.weekday.morning = 396 +winter.weekday.afternoon = 264 +winter.weekday.early-peak = 66 +winter.weekday.late-peak = 66 +winter.weekday.evening = 396 +winter.weekend.night = 156 +winter.weekend.morning = 156 +winter.weekend.afternoon = 156 +winter.weekend.evening = 156 +spring-autumn.weekday.night = 792 +spring-autumn.weekday.morning = 792 +spring-autumn.weekday.afternoon = 528 +spring-autumn.weekday.early-peak = 132 +spring-autumn.weekday.late-peak = 132 +spring-autumn.weekday.evening = 792 +spring-autumn.weekend.night = 300 +spring-autumn.weekend.morning = 300 +spring-autumn.weekend.afternoon = 300 +spring-autumn.weekend.evening = 300 +summer.weekday.night = 396 +summer.weekday.morning = 396 +summer.weekday.afternoon = 264 +summer.weekday.early-peak = 66 +summer.weekday.late-peak = 66 +summer.weekday.evening = 396 +summer.weekend.night = 150 +summer.weekend.morning = 150 +summer.weekend.afternoon = 150 +summer.weekend.evening = 150 +level_names = ["month", "day", "hour"] +""" @fixture(autouse=True) def logger(): - from logging import CRITICAL, getLogger - + """Configure logger for tests.""" logger = getLogger("muse") logger.setLevel(CRITICAL) return logger @@ -24,6 +74,7 @@ def logger(): @fixture(autouse=True) def patch_broadcast_compat_data(): + """Patch broadcast compatibility data.""" with patch( "xarray.core.variable._broadcast_compat_data", patched_broadcast_compat_data ): @@ -33,31 +84,44 @@ def patch_broadcast_compat_data(): @fixture(autouse=True) def random(): """Set random seed for all tests to make them reproducible.""" - rand.seed(123) - np.random.seed(123) + rand.seed(RANDOM_SEED) + np.random.seed(RANDOM_SEED) def compare_df( - expected: DataFrame, - actual: DataFrame, + expected: pd.DataFrame, + actual: pd.DataFrame, rtol: float = 1e-5, atol: float = 1e-8, - equal_nan=False, - msg=None, -): - """Compares two dataframes approximately. - - Uses `numpy.allclose` for columns which are floating points. + equal_nan: bool = False, + msg: Optional[str] = None, +) -> None: + """Compare two dataframes approximately. + + Args: + expected: Expected dataframe + actual: Actual dataframe to compare + rtol: Relative tolerance + atol: Absolute tolerance + equal_nan: Whether to consider NaN values equal + msg: Optional message to display on failure + + Raises: + AssertionError: If dataframes don't match within tolerances """ from pytest import approx - assert set(expected.columns) == set(actual.columns) - assert expected.shape == actual.shape - assert set(expected.index) == set(actual.index) + assert set(expected.columns) == set(actual.columns), "Columns don't match" + assert expected.shape == actual.shape, "Shapes don't match" + assert set(expected.index) == set(actual.index), "Indices don't match" floats = [u for (u, d) in zip(actual.columns, actual.dtypes) if d == "float"] nonfloats = [u for (u, d) in zip(actual.columns, actual.dtypes) if d != "float"] - assert all(expected[nonfloats] == actual.loc[expected.index, nonfloats]) + + assert all(expected[nonfloats] == actual.loc[expected.index, nonfloats]), ( + "Non-float columns don't match" + ) + for col in floats: actual_col = actual.loc[expected.index, col].values expected_col = expected[col].values @@ -80,23 +144,32 @@ def compare_df( @fixture def compare_dirs() -> Callable: - def compare_dirs(actual_dir, expected_dir, **kwargs): - """Compares all the csv files in a directory.""" - from os import walk + """Factory for directory comparison function.""" - from pandas import read_csv + def compare_dirs( + actual_dir: Union[str, Path], expected_dir: Union[str, Path], **kwargs + ) -> None: + """Compare all CSV files in two directories. + Args: + actual_dir: Path to directory with actual files + expected_dir: Path to directory with expected files + **kwargs: Additional arguments passed to compare_df + + Raises: + AssertionError: If directories don't match or test is not set up correctly + """ compared_something = False for dirpath, _, filenames in walk(expected_dir): subdir = Path(actual_dir) / Path(dirpath).relative_to(expected_dir) for filename in filenames: compared_something = True expected_filename = Path(dirpath) / filename - expected = read_csv(expected_filename) + expected = pd.read_csv(expected_filename) actual_filename = Path(subdir) / filename - assert actual_filename.exists() - assert actual_filename.is_file() - actual = read_csv(actual_filename) + assert actual_filename.exists(), f"Missing file: {actual_filename}" + assert actual_filename.is_file(), f"Not a file: {actual_filename}" + actual = pd.read_csv(actual_filename) try: compare_df(expected, actual, msg=filename, **kwargs) except Exception: @@ -116,56 +189,23 @@ def compare_dirs(actual_dir, expected_dir, **kwargs): @fixture def default_timeslice_globals(): - from muse.timeslices import setup_module - - default_timeslices = """ - [timeslices] - winter.weekday.night = 396 - winter.weekday.morning = 396 - winter.weekday.afternoon = 264 - winter.weekday.early-peak = 66 - winter.weekday.late-peak = 66 - winter.weekday.evening = 396 - winter.weekend.night = 156 - winter.weekend.morning = 156 - winter.weekend.afternoon = 156 - winter.weekend.evening = 156 - spring-autumn.weekday.night = 792 - spring-autumn.weekday.morning = 792 - spring-autumn.weekday.afternoon = 528 - spring-autumn.weekday.early-peak = 132 - spring-autumn.weekday.late-peak = 132 - spring-autumn.weekday.evening = 792 - spring-autumn.weekend.night = 300 - spring-autumn.weekend.morning = 300 - spring-autumn.weekend.afternoon = 300 - spring-autumn.weekend.evening = 300 - summer.weekday.night = 396 - summer.weekday.morning = 396 - summer.weekday.afternoon = 264 - summer.weekday.early-peak = 66 - summer.weekday.late-peak = 66 - summer.weekday.evening = 396 - summer.weekend.night = 150 - summer.weekend.morning = 150 - summer.weekend.afternoon = 150 - summer.weekend.evening = 150 - level_names = ["month", "day", "hour"] - """ - - setup_module(default_timeslices) + """Set up default timeslice configuration.""" + setup_module(DEFAULT_TIMESLICES) @fixture def timeslice(default_timeslice_globals) -> Dataset: - from muse.timeslices import TIMESLICE - + """Get the default timeslice dataset.""" return TIMESLICE @fixture def coords() -> Mapping: - """Technoeconomics coordinates.""" + """Return standard coordinates for test cases. + + Returns: + Mapping with technology, region, year, commodity and comm_type coordinates + """ return { "technology": ["burger_flipper", "soda_shaker", "deep_frier", "salad_arranger"], "region": ["ASEAN", "USA"], @@ -194,10 +234,15 @@ def coords() -> Mapping: @fixture -def agent_args(coords) -> Mapping: - """Some standard arguments defining an agent.""" - from numpy.random import choice, rand, randint +def agent_args(coords: Mapping) -> Mapping: + """Generate standard arguments for creating an agent. + + Args: + coords: Standard coordinate mapping + Returns: + Mapping with region, share, enduses and maturity_threshold + """ return { "region": choice(coords["region"]), "share": "agent_share", @@ -209,14 +254,33 @@ def agent_args(coords) -> Mapping: } +def var_generator( + result: Dataset, dims: list[str], factor: float = DEFAULT_FACTOR +) -> tuple: + """Generate random variables for a dataset. + + Args: + result: Dataset to generate variables for + dims: Dimensions to generate variables over + factor: Scaling factor for random values + + Returns: + Tuple of (dims, random_values) + """ + shape = tuple(len(result[u]) for u in dims) + return dims, (rand(*shape) * factor).astype(type(factor)) + + @fixture -def technologies(coords) -> Dataset: - """Randomly generated technology characteristics.""" - from numpy import nonzero, sum - from numpy.random import choice, rand, randint +def technologies(coords: Mapping) -> Dataset: + """Generate random technology characteristics. - from muse.commodities import CommodityUsage + Args: + coords: Standard coordinate mapping + Returns: + Dataset with technology characteristics + """ result = Dataset(coords=coords) result["comm_type"] = ("commodity", coords["comm_type"]) @@ -224,25 +288,22 @@ def technologies(coords) -> Dataset: result = result.set_coords(("comm_type", "tech_type")) - def var(*dims, factor=100.0): - shape = tuple(len(result[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) - - result["agent_share"] = var("technology", "region", "year") - result["agent_share"] /= sum(result.agent_share) + # Generate random variables + result["agent_share"] = var_generator(result, ["technology", "region", "year"]) + result["agent_share"] /= np.sum(result.agent_share) result["agent_share_zero"] = result["agent_share"] * 0 # first create a mask so each tech will have consistent inputs/outputs across years # and regions fuels = result.comm_type == "energy" - result["fixed_inputs"] = var("technology", "commodity") + result["fixed_inputs"] = var_generator(result, ["technology", "commodity"]) result.fixed_inputs[:] = randint(0, 3, result.fixed_inputs.shape) == 0 result.fixed_inputs.loc[{"commodity": ~fuels}] = 0 result["flexible_inputs"] = result.fixed_inputs * ( randint(0, 2, result.fixed_inputs.shape) == 0 ) - result["fixed_outputs"] = var("technology", "commodity") + result["fixed_outputs"] = var_generator(result, ["technology", "commodity"]) result.fixed_outputs[:] = randint(0, 3, result.fixed_outputs.shape) == 0 enduses = result.comm_type == "service" environmentals = result.comm_type == "environmental" @@ -252,15 +313,15 @@ def var(*dims, factor=100.0): for tech in result.technology: fin = result.fixed_inputs if (fin.sel(technology=tech, commodity=fuels) < 1e-12).all(): - i = result.commodity[choice(nonzero(fuels.values)[0])] + i = result.commodity[choice(np.nonzero(fuels.values)[0])] fin.loc[{"technology": tech, "commodity": i}] = 1 fout = result.fixed_outputs if (fout.sel(technology=tech, commodity=enduses) < 1e-12).all(): - i = result.commodity[choice(nonzero(enduses.values)[0])] + i = result.commodity[choice(np.nonzero(enduses.values)[0])] fout.loc[{"technology": tech, "commodity": i}] = 1 - # expand along year and region, and fill with random numbers + # Expand along year and region dimensions ones = (result.year == result.year) * (result.region == result.region) result["fixed_inputs"] = result.fixed_inputs * ones result.fixed_inputs[:] *= rand(*result.fixed_inputs.shape) @@ -269,27 +330,55 @@ def var(*dims, factor=100.0): result["fixed_outputs"] = result.fixed_outputs * ones result.fixed_outputs[:] *= rand(*result.fixed_outputs.shape) - result["total_capacity_limit"] = var("technology", "region", "year") + # Generate capacity and utilization parameters + result["total_capacity_limit"] = var_generator( + result, ["technology", "region", "year"] + ) result.total_capacity_limit.loc[{"year": 2030}] += result.total_capacity_limit.sel( year=2030 ) - result["max_capacity_addition"] = var("technology", "region", "year") - result["max_capacity_growth"] = var("technology", "region", "year") + result["max_capacity_addition"] = var_generator( + result, ["technology", "region", "year"] + ) + result["max_capacity_growth"] = var_generator( + result, ["technology", "region", "year"] + ) - result["utilization_factor"] = var("technology", "region", "year", factor=0.05) + result["utilization_factor"] = var_generator( + result, ["technology", "region", "year"], factor=0.05 + ) result.utilization_factor.values += 0.95 - result["fix_par"] = var("technology", "region", "year", factor=2.0) - result["cap_par"] = var("technology", "region", "year", factor=30.0) - result["var_par"] = var("technology", "region", "year", factor=1.0) - result["fix_exp"] = var("technology", "region", "year", factor=1.0) - result["cap_exp"] = var("technology", "region", "year", factor=1.0) - result["var_exp"] = var("technology", "region", "year", factor=1.0) - - result["technical_life"] = var("technology", "region", "year", factor=10) - result["technical_life"] = result.technical_life.astype(int).clip(min=1) - result["interest_rate"] = var("technology", "region", "year", factor=0.1) + # Generate cost parameters + result["fix_par"] = var_generator( + result, ["technology", "region", "year"], factor=2.0 + ) + result["cap_par"] = var_generator( + result, ["technology", "region", "year"], factor=30.0 + ) + result["var_par"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + result["fix_exp"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + result["cap_exp"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + result["var_exp"] = var_generator( + result, ["technology", "region", "year"], factor=1.0 + ) + + # Generate technical parameters + result["technical_life"] = var_generator( + result, ["technology", "region", "year"], factor=10 + ) + result["technical_life"] = result.technical_life.astype(int).clip(min=1) + result["interest_rate"] = var_generator( + result, ["technology", "region", "year"], factor=0.1 + ) + # Set commodity usage result["comm_usage"] = "commodity", CommodityUsage.from_technologies(result).values result = result.set_coords("comm_usage").drop_vars("comm_type") @@ -297,54 +386,88 @@ def var(*dims, factor=100.0): @fixture -def agent_market(coords, timeslice) -> Dataset: - from numpy.random import rand +def agent_market(coords: Mapping, timeslice: Dataset) -> Dataset: + """Generate market data for agent testing. + Args: + coords: Standard coordinate mapping + timeslice: Timeslice dataset + + Returns: + Dataset with market data for agents + """ result = Dataset(coords=timeslice.coords) result["commodity"] = "commodity", coords["commodity"] result["region"] = "region", coords["region"] result["technology"] = "technology", coords["technology"] result["year"] = "year", coords["year"] - def var(*dims, factor=100.0): - shape = tuple(len(result[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) - - result["capacity"] = var("technology", "region", "year") - result["supply"] = var("commodity", "region", "timeslice", "year") - result["consumption"] = var("commodity", "region", "timeslice", "year") - result["prices"] = var("commodity", "region", "year", "timeslice") + # Generate market variables + result["capacity"] = var_generator(result, ["technology", "region", "year"]) + result["supply"] = var_generator( + result, ["commodity", "region", "timeslice", "year"] + ) + result["consumption"] = var_generator( + result, ["commodity", "region", "timeslice", "year"] + ) + result["prices"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) return result @fixture -def market(coords, timeslice) -> Dataset: - from numpy.random import rand +def market(coords: Mapping, timeslice: Dataset) -> Dataset: + """Generate market data for testing. + + Args: + coords: Standard coordinate mapping + timeslice: Timeslice dataset + Returns: + Dataset with market data + """ result = Dataset(coords=timeslice.coords) result["commodity"] = "commodity", coords["commodity"] result["region"] = "region", coords["region"] result["year"] = "year", coords["year"] - def var(*dims, factor=100.0): - shape = tuple(len(result[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) - - result["consumption"] = var("commodity", "region", "year", "timeslice") - result["supply"] = var("commodity", "region", "year", "timeslice") - result["prices"] = var("commodity", "region", "year", "timeslice") + # Generate market variables + result["consumption"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) + result["supply"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) + result["prices"] = var_generator( + result, ["commodity", "region", "year", "timeslice"] + ) return result -def create_agent(agent_args, technologies, stock, agent_type="retrofit") -> Agent: - from numpy.random import choice +def create_agent( + agent_args: Mapping, + technologies: Dataset, + stock: Dataset, + agent_type: str = "retrofit", +) -> Agent: + """Create an agent for testing. + + Args: + agent_args: Arguments for agent creation + technologies: Technology characteristics + stock: Stock data + agent_type: Type of agent to create ("retrofit" or "newcapa") - from muse.agents.factories import create_agent + Returns: + Created agent instance + """ + from muse.agents.factories import create_agent as factory_create_agent region = agent_args["region"] - agent = create_agent( + agent = factory_create_agent( agent_type=agent_type, technologies=technologies.sel(region=region), capacity=stock.where(stock.region == region, drop=True).assign_coords( @@ -364,37 +487,71 @@ def create_agent(agent_args, technologies, stock, agent_type="retrofit") -> Agen list(technology_names), len(technology_names) // 2, replace=False ) agent.assets = agent.assets.where(agent.assets.technology.isin(techs)) + return agent @fixture -def newcapa_agent(agent_args, technologies, stock) -> Agent: +def newcapa_agent(agent_args: Mapping, technologies: Dataset, stock: Dataset) -> Agent: + """Create a new capacity agent for testing. + + Args: + agent_args: Arguments for agent creation + technologies: Technology characteristics + stock: Stock data + + Returns: + New capacity agent instance + """ return create_agent(agent_args, technologies, stock.capacity, "newcapa") @fixture -def retro_agent(agent_args, technologies, stock) -> Agent: +def retro_agent(agent_args: Mapping, technologies: Dataset, stock: Dataset) -> Agent: + """Create a retrofit agent for testing. + + Args: + agent_args: Arguments for agent creation + technologies: Technology characteristics + stock: Stock data + + Returns: + Retrofit agent instance + """ return create_agent(agent_args, technologies, stock.capacity, "retrofit") @fixture -def stock(coords, technologies) -> Dataset: +def stock(coords: Mapping, technologies: Dataset) -> Dataset: + """Generate stock data for testing. + + Args: + coords: Standard coordinate mapping + technologies: Technology characteristics + + Returns: + Dataset with stock data + """ return _stock(coords, technologies) -def _stock( - coords, - technologies, -) -> Dataset: +def _stock(coords: Mapping, technologies: Dataset) -> Dataset: + """Internal function to generate stock data. + + Args: + coords: Standard coordinate mapping + technologies: Technology characteristics + + Returns: + Dataset with stock data + """ from numpy import cumprod, stack - from numpy.random import choice, rand - from xarray import Dataset from muse.utilities import broadcast_over_assets n_assets = 10 - # Create assets + # Create asset coordinates asset_coords = { "technology": ("asset", choice(coords["technology"], n_assets, replace=True)), "region": ("asset", choice(coords["region"], n_assets, replace=True)), @@ -402,7 +559,7 @@ def _stock( } assets = Dataset(coords=asset_coords) - # Create random capacity data + # Generate random capacity data capacity_limits = broadcast_over_assets(technologies.total_capacity_limit, assets) factors = cumprod(rand(n_assets, len(coords["year"])) / 4 + 0.75, axis=1).clip( max=1 @@ -412,7 +569,7 @@ def _stock( axis=1, ) - # Create capacity dataset + # Create final dataset result = assets.copy() result["year"] = "year", coords["year"] result["capacity"] = ("asset", "year"), capacity @@ -420,10 +577,16 @@ def _stock( @fixture -def demand_share(coords, timeslice): - """Example demand share, as would be computed by an agent.""" - from numpy.random import choice, rand +def demand_share(coords: Mapping, timeslice: Dataset) -> DataArray: + """Generate demand share data for testing. + + Args: + coords: Standard coordinate mapping + timeslice: Timeslice dataset + Returns: + DataArray with demand share data + """ n_assets = 5 axes = { "commodity": coords["commodity"], @@ -431,24 +594,26 @@ def demand_share(coords, timeslice): "technology": (["asset"], choice(coords["technology"], n_assets, replace=True)), "region": (["asset"], choice(coords["region"], n_assets, replace=True)), } - shape = ( - len(axes["commodity"]), - len(axes["timeslice"]), - n_assets, - ) - result = DataArray( + shape = (len(axes["commodity"]), len(axes["timeslice"]), n_assets) + + return DataArray( rand(*shape), dims=["commodity", "timeslice", "asset"], coords=axes, name="demand_share", ) - return result def create_fake_capacity(n: int, technologies: Dataset) -> DataArray: - from numpy.random import choice, rand - from xarray import Dataset + """Create fake capacity data for testing. + + Args: + n: Number of assets to create + technologies: Technology characteristics + Returns: + DataArray with fake capacity data + """ years = technologies.year techs = choice(technologies.technology.values, 5) regions = choice(technologies.region.values, 5) @@ -465,21 +630,35 @@ def create_fake_capacity(n: int, technologies: Dataset) -> DataArray: @fixture def capacity(technologies: Dataset) -> DataArray: + """Generate capacity data for testing. + + Args: + technologies: Technology characteristics + + Returns: + DataArray with capacity data + """ return create_fake_capacity(20, technologies) @fixture def settings(tmpdir) -> dict: - """Creates a dummy settings dictionary out of the default settings.""" + """Generate settings for testing. + + Args: + tmpdir: Temporary directory path + + Returns: + Dictionary with test settings + """ import toml from muse.readers import DEFAULT_SETTINGS_PATH from muse.readers.toml import format_paths - def drop_optionals(settings): - from copy import copy - - for k, v in copy(settings).items(): + def drop_optionals(settings: dict) -> None: + """Remove optional settings from dictionary.""" + for k, v in list(settings.items()): if v == "OPTIONAL": settings.pop(k) elif isinstance(v, Mapping): @@ -489,6 +668,7 @@ def drop_optionals(settings): drop_optionals(settings) out = format_paths(settings, cwd=tmpdir, path=tmpdir, muse_sectors=tmpdir) + # Add required settings required = { "time_framework": [2010, 2015, 2020], "regions": ["MEX"], @@ -499,11 +679,11 @@ def drop_optionals(settings): } out.update(required) + # Add required carbon budget settings carbon_budget_required = { "budget": [420000, 413000, 403000], "commodities": ["CO2f", "CO2r", "CH4", "N2O"], } - out["carbon_budget_control"].update(carbon_budget_required) return out @@ -511,16 +691,19 @@ def drop_optionals(settings): @fixture(autouse=True) def warnings_as_errors(request): - from warnings import filterwarnings, simplefilter + """Configure warnings to be treated as errors during testing. - # disable fixture for some tests + Args: + request: Pytest request object + """ + # Disable fixture for specific tests if ( request.module.__name__ == "test_outputs" and request.node.name == "test_save_with_fullpath_to_excel_with_sink" ): return - # Fail test if the following warnings are raised + # Configure warning filters simplefilter("error", FutureWarning) simplefilter("error", DeprecationWarning) simplefilter("error", PendingDeprecationWarning) @@ -538,19 +721,23 @@ def warnings_as_errors(request): @fixture def save_registries(): - from contextlib import contextmanager + """Save and restore registry state during tests.""" @contextmanager def saveme(module_name: str, registry_name: str): - from copy import deepcopy - from importlib import import_module + """Save and restore a specific registry. + Args: + module_name: Name of module containing registry + registry_name: Name of registry to save/restore + """ module = import_module(module_name) old = getattr(module, registry_name) setattr(module, registry_name, deepcopy(old)) yield setattr(module, registry_name, deepcopy(old)) + # List of registries to save/restore iterators = [ saveme("muse.sectors", "SECTORS_REGISTERED"), saveme("muse.objectives", "OBJECTIVES"), @@ -582,6 +769,14 @@ def saveme(module_name: str, registry_name: str): @fixture def rng(request): + """Create a random number generator for testing. + + Args: + request: Pytest request object + + Returns: + Random number generator instance + """ from numpy.random import default_rng return default_rng(getattr(request.config.option, "randomly_seed", None)) From d606bf982ef9bc412a11725eb398dabe5d8f4963 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Fri, 6 Jun 2025 16:05:18 +0100 Subject: [PATCH 02/12] Fix error with seed --- tests/conftest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index bb6e1bb7b..dce3e48c5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,7 +13,7 @@ import numpy as np import pandas as pd -from numpy.random import choice, rand, randint +from numpy.random import choice, default_rng, rand, randint from pytest import fixture from xarray import DataArray, Dataset @@ -84,8 +84,9 @@ def patch_broadcast_compat_data(): @fixture(autouse=True) def random(): """Set random seed for all tests to make them reproducible.""" - rand.seed(RANDOM_SEED) + rng = default_rng(RANDOM_SEED) np.random.seed(RANDOM_SEED) + return rng def compare_df( From 7c7555442f8df8ef5c4b3ceb8fa737a8e4387435 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Fri, 6 Jun 2025 16:13:43 +0100 Subject: [PATCH 03/12] Fix mistake with timeslice fixture --- tests/conftest.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index dce3e48c5..1fac1b6f8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,7 +20,7 @@ from muse.__main__ import patched_broadcast_compat_data from muse.agents import Agent from muse.commodities import CommodityUsage -from muse.timeslices import TIMESLICE, setup_module +from muse.timeslices import TIMESLICE, read_timeslices, setup_module # Constants RANDOM_SEED = 123 @@ -192,11 +192,15 @@ def compare_dirs( def default_timeslice_globals(): """Set up default timeslice configuration.""" setup_module(DEFAULT_TIMESLICES) + return DEFAULT_TIMESLICES @fixture -def timeslice(default_timeslice_globals) -> Dataset: +def timeslice(default_timeslice_globals) -> DataArray: """Get the default timeslice dataset.""" + if TIMESLICE is None: + # If TIMESLICE is not set, create it from the default timeslices + return read_timeslices(default_timeslice_globals) return TIMESLICE @@ -387,7 +391,7 @@ def technologies(coords: Mapping) -> Dataset: @fixture -def agent_market(coords: Mapping, timeslice: Dataset) -> Dataset: +def agent_market(coords: Mapping, timeslice: DataArray) -> Dataset: """Generate market data for agent testing. Args: @@ -419,7 +423,7 @@ def agent_market(coords: Mapping, timeslice: Dataset) -> Dataset: @fixture -def market(coords: Mapping, timeslice: Dataset) -> Dataset: +def market(coords: Mapping, timeslice: DataArray) -> Dataset: """Generate market data for testing. Args: @@ -578,7 +582,7 @@ def _stock(coords: Mapping, technologies: Dataset) -> Dataset: @fixture -def demand_share(coords: Mapping, timeslice: Dataset) -> DataArray: +def demand_share(coords: Mapping, timeslice: DataArray) -> DataArray: """Generate demand share data for testing. Args: From 389fd5ebae640a5e84e1f4afba806554380074c2 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Fri, 6 Jun 2025 16:29:45 +0100 Subject: [PATCH 04/12] Minor manual changes --- tests/conftest.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 1fac1b6f8..825941cd0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,13 +20,10 @@ from muse.__main__ import patched_broadcast_compat_data from muse.agents import Agent from muse.commodities import CommodityUsage -from muse.timeslices import TIMESLICE, read_timeslices, setup_module +from muse.timeslices import setup_module # Constants RANDOM_SEED = 123 -DEFAULT_FACTOR = 100.0 -DEFAULT_TECH_TYPES = ["solid", "liquid", "solid", "liquid"] -DEFAULT_FUELS = ["person", "person", "oil", "person"] DEFAULT_TIMESLICES = """ [timeslices] @@ -74,7 +71,6 @@ def logger(): @fixture(autouse=True) def patch_broadcast_compat_data(): - """Patch broadcast compatibility data.""" with patch( "xarray.core.variable._broadcast_compat_data", patched_broadcast_compat_data ): @@ -192,15 +188,13 @@ def compare_dirs( def default_timeslice_globals(): """Set up default timeslice configuration.""" setup_module(DEFAULT_TIMESLICES) - return DEFAULT_TIMESLICES @fixture def timeslice(default_timeslice_globals) -> DataArray: """Get the default timeslice dataset.""" - if TIMESLICE is None: - # If TIMESLICE is not set, create it from the default timeslices - return read_timeslices(default_timeslice_globals) + from muse.timeslices import TIMESLICE + return TIMESLICE @@ -259,9 +253,7 @@ def agent_args(coords: Mapping) -> Mapping: } -def var_generator( - result: Dataset, dims: list[str], factor: float = DEFAULT_FACTOR -) -> tuple: +def var_generator(result: Dataset, dims: list[str], factor: float = 100.0) -> tuple: """Generate random variables for a dataset. Args: @@ -782,6 +774,4 @@ def rng(request): Returns: Random number generator instance """ - from numpy.random import default_rng - return default_rng(getattr(request.config.option, "randomly_seed", None)) From bf5d4534b938e0cc8017168c5dcd954315cc60c6 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Sat, 7 Jun 2025 12:03:04 +0100 Subject: [PATCH 05/12] Move some fixtures --- tests/conftest.py | 74 ++++--------------------------------------- tests/test_filters.py | 16 +++++----- tests/test_readers.py | 30 ++++++++++++++++++ 3 files changed, 44 insertions(+), 76 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 825941cd0..c3d5a987b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,7 +22,6 @@ from muse.commodities import CommodityUsage from muse.timeslices import setup_module -# Constants RANDOM_SEED = 123 DEFAULT_TIMESLICES = """ @@ -207,6 +206,8 @@ def coords() -> Mapping: """ return { "technology": ["burger_flipper", "soda_shaker", "deep_frier", "salad_arranger"], + "tech_type": ["solid", "liquid", "solid", "liquid"], + "fuel": ["person", "person", "oil", "person"], "region": ["ASEAN", "USA"], "year": [2010, 2030], "commodity": [ @@ -279,16 +280,14 @@ def technologies(coords: Mapping) -> Dataset: Dataset with technology characteristics """ result = Dataset(coords=coords) - - result["comm_type"] = ("commodity", coords["comm_type"]) - result["tech_type"] = "technology", ["solid", "liquid", "solid", "liquid"] - + result["comm_type"] = "commodity", coords["comm_type"] + result["tech_type"] = "technology", coords["tech_type"] + result["fuel"] = "technology", coords["fuel"] result = result.set_coords(("comm_type", "tech_type")) - # Generate random variables + # We have a single agent with a share of 1 for all technologies result["agent_share"] = var_generator(result, ["technology", "region", "year"]) result["agent_share"] /= np.sum(result.agent_share) - result["agent_share_zero"] = result["agent_share"] * 0 # first create a mask so each tech will have consistent inputs/outputs across years # and regions @@ -638,54 +637,6 @@ def capacity(technologies: Dataset) -> DataArray: return create_fake_capacity(20, technologies) -@fixture -def settings(tmpdir) -> dict: - """Generate settings for testing. - - Args: - tmpdir: Temporary directory path - - Returns: - Dictionary with test settings - """ - import toml - - from muse.readers import DEFAULT_SETTINGS_PATH - from muse.readers.toml import format_paths - - def drop_optionals(settings: dict) -> None: - """Remove optional settings from dictionary.""" - for k, v in list(settings.items()): - if v == "OPTIONAL": - settings.pop(k) - elif isinstance(v, Mapping): - drop_optionals(v) - - settings = toml.load(DEFAULT_SETTINGS_PATH) - drop_optionals(settings) - out = format_paths(settings, cwd=tmpdir, path=tmpdir, muse_sectors=tmpdir) - - # Add required settings - required = { - "time_framework": [2010, 2015, 2020], - "regions": ["MEX"], - "equilibrium": False, - "maximum_iterations": 3, - "tolerance": 0.1, - "interpolation_mode": "linear", - } - out.update(required) - - # Add required carbon budget settings - carbon_budget_required = { - "budget": [420000, 413000, 403000], - "commodities": ["CO2f", "CO2r", "CH4", "N2O"], - } - out["carbon_budget_control"].update(carbon_budget_required) - - return out - - @fixture(autouse=True) def warnings_as_errors(request): """Configure warnings to be treated as errors during testing. @@ -762,16 +713,3 @@ def saveme(module_name: str, registry_name: str): map(next, iterators) yield map(next, iterators) - - -@fixture -def rng(request): - """Create a random number generator for testing. - - Args: - request: Pytest request object - - Returns: - Random number generator instance - """ - return default_rng(getattr(request.config.option, "randomly_seed", None)) diff --git a/tests/test_filters.py b/tests/test_filters.py index ad36047cf..edd832cda 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -146,7 +146,7 @@ def test_similar_fuels(retro_agent, search_space, technologies): assert (actual == expected).all() -def test_currently_existing(retro_agent, search_space, technologies, agent_market, rng): +def test_currently_existing(retro_agent, search_space, technologies, agent_market): # Test with zero capacity agent_market.capacity[:] = 0 actual = currently_existing_tech( @@ -165,9 +165,9 @@ def test_currently_existing(retro_agent, search_space, technologies, agent_marke assert actual.sel(replacement=in_market).all() # Test with partial capacity - techs = rng.choice( + techs = np.random.choice( list(set(agent_market.technology.values)), - 1 + rng.choice(range(len(set(agent_market.technology.values)))), + 1 + np.random.choice(range(len(set(agent_market.technology.values)))), replace=False, ) agent_market.capacity[:] = 0 @@ -228,13 +228,13 @@ def test_init_from_tech(demand_share, technologies, agent_market): assert not space.any() -def test_init_from_asset(technologies, rng): +def test_init_from_asset(technologies): # Create test data - technology = rng.choice(technologies.technology, 5) - installed = rng.choice((2020, 2025), len(technology)) + technology = np.random.choice(technologies.technology, 5) + installed = np.random.choice((2020, 2025), len(technology)) year = np.arange(2020, 2040, 5) capacity = xr.DataArray( - rng.choice([0, 0, 1, 10], (len(technology), len(year))), + np.random.choice([0, 0, 1, 10], (len(technology), len(year))), coords={ "technology": ("asset", technology), "installed": ("asset", installed), @@ -253,7 +253,7 @@ def test_init_from_asset(technologies, rng): assert set(space.asset.asset.values) == set(capacity.technology.values) -def test_init_from_asset_no_assets(technologies, rng): +def test_init_from_asset_no_assets(technologies): agent = namedtuple("DummyAgent", ["assets"])( xr.Dataset(dict(capacity=xr.DataArray(0))) ) diff --git a/tests/test_readers.py b/tests/test_readers.py index ab566cc5e..7847b7987 100644 --- a/tests/test_readers.py +++ b/tests/test_readers.py @@ -1,3 +1,4 @@ +from collections.abc import Mapping from itertools import chain, permutations from pathlib import Path from unittest.mock import patch @@ -16,6 +17,35 @@ ] +@fixture +def settings(tmpdir) -> dict: + """Generate settings for testing. + + Args: + tmpdir: Temporary directory path + + Returns: + Dictionary with test settings + """ + import toml + + from muse.readers import DEFAULT_SETTINGS_PATH + from muse.readers.toml import format_paths + + def drop_optionals(settings: dict) -> None: + """Remove optional settings from dictionary.""" + for k, v in list(settings.items()): + if v == "OPTIONAL": + settings.pop(k) + elif isinstance(v, Mapping): + drop_optionals(v) + + settings = toml.load(DEFAULT_SETTINGS_PATH) + drop_optionals(settings) + settings = format_paths(settings, cwd=tmpdir, path=tmpdir, muse_sectors=tmpdir) + return settings + + @fixture def user_data_files(settings: dict) -> None: """Creates test files related to user data.""" From fa8d52e2a7c3e843047b58c7cf4a2eba5e57a6aa Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Mon, 9 Jun 2025 17:15:09 +0100 Subject: [PATCH 06/12] Combine fixtures in test_costs --- tests/test_costs.py | 362 ++++++++++++++++++++++++++------------------ 1 file changed, 215 insertions(+), 147 deletions(-) diff --git a/tests/test_costs.py b/tests/test_costs.py index f66237b04..621e11d16 100644 --- a/tests/test_costs.py +++ b/tests/test_costs.py @@ -18,301 +18,369 @@ supply_cost, variable_costs, ) -from muse.quantities import production_amplitude -from muse.timeslices import broadcast_timeslice +from muse.quantities import capacity_to_service_demand, production_amplitude +from muse.timeslices import broadcast_timeslice, distribute_timeslice +from muse.utilities import broadcast_over_assets YEAR = 2030 @fixture -def _capacity(_technologies, demand_share): - """Capacity for each asset.""" - from muse.quantities import capacity_to_service_demand - - return capacity_to_service_demand(technologies=_technologies, demand=demand_share) - - -@fixture -def _technologies(technologies, demand_share): - """Technology parameters for each asset.""" - from muse.utilities import broadcast_over_assets - - return broadcast_over_assets(technologies.sel(year=YEAR), demand_share) - - -@fixture -def _prices(market, demand_share): - """Prices relevant to each asset.""" - from muse.utilities import broadcast_over_assets - - prices = market.prices.sel(year=YEAR) - return broadcast_over_assets(prices, demand_share, installed_as_year=False) +def cost_data(technologies, market, demand_share): + """Creates the complete dataset needed for cost calculations. + + The transformation follows these steps: + 1. Extract year-specific data from technologies and market + 2. Transform data to asset level + 3. Calculate capacity for each asset + 4. Calculate production and consumption data + + Returns: + dict: Contains all necessary data for cost calculations: + - technologies: Technology parameters for each asset + - prices: Prices relevant to each asset + - capacity: Capacity for each asset + - production: Production data for each asset + - consumption: Consumption data for each asset + """ + # Step 1: Extract year-specific data + tech_year = technologies.sel(year=YEAR) + prices_year = market.prices.sel(year=YEAR) + + # Step 2: Transform to asset level + tech_assets = broadcast_over_assets(tech_year, demand_share) + prices_assets = broadcast_over_assets( + prices_year, demand_share, installed_as_year=False + ) + # Step 3: Calculate capacity + capacity = capacity_to_service_demand(technologies=tech_assets, demand=demand_share) -@fixture -def _production(_technologies, _capacity): - """Production data for each asset.""" - from muse.timeslices import broadcast_timeslice, distribute_timeslice - - return ( - broadcast_timeslice(_capacity) - * distribute_timeslice(_technologies.fixed_outputs) - * broadcast_timeslice(_technologies.utilization_factor) + # Step 4: Calculate production and consumption + production = ( + broadcast_timeslice(capacity) + * distribute_timeslice(tech_assets.fixed_outputs) + * broadcast_timeslice(tech_assets.utilization_factor) ) - -@fixture -def _consumption(_technologies, _capacity): - """Consumption data for each asset.""" - from muse.timeslices import broadcast_timeslice, distribute_timeslice - - return ( - broadcast_timeslice(_capacity) - * distribute_timeslice(_technologies.fixed_inputs) - * broadcast_timeslice(_technologies.utilization_factor) + consumption = ( + broadcast_timeslice(capacity) + * distribute_timeslice(tech_assets.fixed_inputs) + * broadcast_timeslice(tech_assets.utilization_factor) ) + return { + "technologies": tech_assets, + "prices": prices_assets, + "capacity": capacity, + "production": production, + "consumption": consumption, + } + -def test_fixtures(_technologies, _prices, _capacity, _production, _consumption): +def test_fixtures(cost_data): """Validate fixture dimensions.""" - assert set(_technologies.dims) == {"asset", "commodity"} - assert set(_prices.dims) == {"asset", "commodity", "timeslice"} - assert set(_capacity.dims) == {"asset"} - assert set(_production.dims) == {"asset", "commodity", "timeslice"} - assert set(_consumption.dims) == {"asset", "commodity", "timeslice"} + assert set(cost_data["technologies"].dims) == {"asset", "commodity"} + assert set(cost_data["prices"].dims) == {"asset", "commodity", "timeslice"} + assert set(cost_data["capacity"].dims) == {"asset"} + assert set(cost_data["production"].dims) == {"asset", "commodity", "timeslice"} + assert set(cost_data["consumption"].dims) == {"asset", "commodity", "timeslice"} -def test_capital_costs(_technologies, _capacity): - result = capital_costs(_technologies, _capacity) +def test_capital_costs(cost_data): + result = capital_costs(cost_data["technologies"], cost_data["capacity"]) assert set(result.dims) == {"asset"} -def test_environmental_costs(_technologies, _prices, _production): - result = environmental_costs(_technologies, _prices, _production) +def test_environmental_costs(cost_data): + result = environmental_costs( + cost_data["technologies"], cost_data["prices"], cost_data["production"] + ) assert set(result.dims) == {"asset", "timeslice"} -def test_fuel_costs(_technologies, _prices, _consumption): - result = fuel_costs(_technologies, _prices, _consumption) +def test_fuel_costs(cost_data): + result = fuel_costs( + cost_data["technologies"], cost_data["prices"], cost_data["consumption"] + ) assert set(result.dims) == {"asset", "timeslice"} -def test_material_costs(_technologies, _prices, _consumption): - result = material_costs(_technologies, _prices, _consumption) +def test_material_costs(cost_data): + result = material_costs( + cost_data["technologies"], cost_data["prices"], cost_data["consumption"] + ) assert set(result.dims) == {"asset", "timeslice"} -def test_fixed_costs(_technologies, _capacity): - result = fixed_costs(_technologies, _capacity) +def test_fixed_costs(cost_data): + result = fixed_costs(cost_data["technologies"], cost_data["capacity"]) assert set(result.dims) == {"asset"} -def test_variable_costs(_technologies, _production): - result = variable_costs(_technologies, _production) +def test_variable_costs(cost_data): + result = variable_costs(cost_data["technologies"], cost_data["production"]) assert set(result.dims) == {"asset"} -def test_running_costs(_technologies, _prices, _capacity, _production, _consumption): - result = running_costs(_technologies, _prices, _capacity, _production, _consumption) +def test_running_costs(cost_data): + result = running_costs( + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + ) assert set(result.dims) == {"asset", "timeslice"} -def test_net_present_value( - _technologies, _prices, _capacity, _production, _consumption -): +def test_net_present_value(cost_data): result = net_present_value( - _technologies, _prices, _capacity, _production, _consumption + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} -def test_net_present_cost(_technologies, _prices, _capacity, _production, _consumption): +def test_net_present_cost(cost_data): result = net_present_cost( - _technologies, _prices, _capacity, _production, _consumption + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} -def test_equivalent_annual_cost( - _technologies, _prices, _capacity, _production, _consumption -): +def test_equivalent_annual_cost(cost_data): result = equivalent_annual_cost( - _technologies, _prices, _capacity, _production, _consumption + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} @mark.parametrize("method", ["annual", "lifetime"]) -def test_levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_levelized_cost_of_energy(cost_data, method): result = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + method=method, ) assert set(result.dims) == {"asset", "timeslice"} -def test_supply_cost(_technologies, _prices, _capacity, _production, _consumption): +def test_supply_cost(cost_data): lcoe = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method="annual" + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + method="annual", ) - result = supply_cost(_production, lcoe) + result = supply_cost(cost_data["production"], lcoe) assert set(result.dims) == {"commodity", "region", "timeslice"} -def test_capital_recovery_factor(_technologies): - result = capital_recovery_factor(_technologies) - assert set(result.dims) == set(_technologies.interest_rate.dims) +def test_capital_recovery_factor(cost_data): + result = capital_recovery_factor(cost_data["technologies"]) + assert set(result.dims) == set(cost_data["technologies"].interest_rate.dims) # Test zero interest rates - _technologies["interest_rate"] = 0 - result = capital_recovery_factor(_technologies) + cost_data["technologies"]["interest_rate"] = 0 + result = capital_recovery_factor(cost_data["technologies"]) assert isfinite(result).all() -def test_annual_to_lifetime(_technologies, _prices, _consumption): - _fuel_costs = fuel_costs(_technologies, _prices, _consumption) - _fuel_costs_lifetime = annual_to_lifetime(_fuel_costs, _technologies) +def test_annual_to_lifetime(cost_data): + _fuel_costs = fuel_costs( + cost_data["technologies"], cost_data["prices"], cost_data["consumption"] + ) + _fuel_costs_lifetime = annual_to_lifetime(_fuel_costs, cost_data["technologies"]) assert set(_fuel_costs.dims) == set(_fuel_costs_lifetime.dims) assert (_fuel_costs_lifetime > _fuel_costs).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_flow_scaling( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_flow_scaling(cost_data, method): """Test LCOE independence of input/output flow scaling.""" - _technologies["var_exp"] = 1 + cost_data["technologies"]["var_exp"] = 1 # Original LCOE lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + method=method, ) # Scale inputs/outputs and var_par by 2 - _technologies_scaled = _technologies.copy() - _technologies_scaled["fixed_inputs"] *= 2 - _technologies_scaled["flexible_inputs"] *= 2 - _technologies_scaled["fixed_outputs"] *= 2 - _technologies_scaled["var_par"] *= 2 + technologies_scaled = cost_data["technologies"].copy() + technologies_scaled["fixed_inputs"] *= 2 + technologies_scaled["flexible_inputs"] *= 2 + technologies_scaled["fixed_outputs"] *= 2 + technologies_scaled["var_par"] *= 2 lcoe2 = levelized_cost_of_energy( - _technologies_scaled, - _prices, - _capacity, - _production, - _consumption, + technologies_scaled, + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], method=method, ) assert isclose(lcoe1, lcoe2).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_prod_scaling( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_prod_scaling(cost_data, method): """Test LCOE independence of production scaling with linear costs.""" - _technologies["var_exp"] = 1 - _technologies["cap_exp"] = 1 - _technologies["fix_exp"] = 1 + cost_data["technologies"]["var_exp"] = 1 + cost_data["technologies"]["cap_exp"] = 1 + cost_data["technologies"]["fix_exp"] = 1 lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + method=method, ) lcoe2 = levelized_cost_of_energy( - _technologies, - _prices, - _capacity * 2, - _production * 2, - _consumption * 2, + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"] * 2, + cost_data["production"] * 2, + cost_data["consumption"] * 2, method=method, ) assert isclose(lcoe1, lcoe2).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_equal_prices( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_equal_prices(cost_data, method): """Test LCOE behavior with uniform prices across timeslices.""" lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + method=method, ) with raises(AssertionError): assert_allclose(lcoe1, broadcast_timeslice(lcoe1.isel(timeslice=0))) # Test with uniform prices - _prices = broadcast_timeslice(_prices.mean("timeslice")) + prices_uniform = broadcast_timeslice(cost_data["prices"].mean("timeslice")) lcoe2 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_data["technologies"], + prices_uniform, + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + method=method, ) assert_allclose(lcoe2, broadcast_timeslice(lcoe2.isel(timeslice=0))) -def test_npv_equal_prices(_technologies, _prices, _capacity, _production, _consumption): +def test_npv_equal_prices(cost_data): """Test NPV linearity with production under uniform prices.""" npv1 = net_present_value( - _technologies, _prices, _capacity, _production, _consumption + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + ) + tech_activity = production_amplitude( + cost_data["production"], cost_data["technologies"] ) - tech_activity = production_amplitude(_production, _technologies) npv1_scaled = npv1 / tech_activity with raises(AssertionError): assert_allclose(npv1_scaled, broadcast_timeslice(npv1_scaled.isel(timeslice=0))) # Test with uniform prices - _prices = broadcast_timeslice(_prices.mean("timeslice")) + prices_uniform = broadcast_timeslice(cost_data["prices"].mean("timeslice")) npv2 = net_present_value( - _technologies, _prices, _capacity, _production, _consumption + cost_data["technologies"], + prices_uniform, + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], ) npv2_scaled = npv2 / tech_activity assert_allclose(npv2_scaled, broadcast_timeslice(npv2_scaled.isel(timeslice=0))) @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_zero_production( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_zero_production(cost_data, method): """Test LCOE behavior with zero production.""" lcoe1 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], + method=method, ) assert not (lcoe1.isel(timeslice=0) == 0).all() # Test with zero production in first timeslice - _production.isel(timeslice=0)[:] = 0 - _consumption.isel(timeslice=0)[:] = 0 + production_zero = cost_data["production"].copy() + consumption_zero = cost_data["consumption"].copy() + production_zero.isel(timeslice=0)[:] = 0 + consumption_zero.isel(timeslice=0)[:] = 0 + lcoe2 = levelized_cost_of_energy( - _technologies, _prices, _capacity, _production, _consumption, method=method + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + production_zero, + consumption_zero, + method=method, ) assert (lcoe2.isel(timeslice=0) == 0).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_aggregate( - _technologies, _prices, _capacity, _production, _consumption, method -): +def test_lcoe_aggregate(cost_data, method): """Test LCOE aggregation over timeslices.""" result = levelized_cost_of_energy( - _technologies, - _prices, - _capacity, - _production, - _consumption, + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], method=method, aggregate_timeslices=True, ) assert set(result.dims) == {"asset"} -def test_npv_aggregate(_technologies, _prices, _capacity, _production, _consumption): +def test_npv_aggregate(cost_data): """Test NPV aggregation over timeslices.""" result = net_present_value( - _technologies, - _prices, - _capacity, - _production, - _consumption, + cost_data["technologies"], + cost_data["prices"], + cost_data["capacity"], + cost_data["production"], + cost_data["consumption"], aggregate_timeslices=True, ) assert set(result.dims) == {"asset"} From 0e3f40d6886c9ef93da4ce29262e680d8fea7380 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Mon, 9 Jun 2025 17:17:27 +0100 Subject: [PATCH 07/12] Combine fixtures in test_objectives --- tests/test_objectives.py | 183 ++++++++++++++++++++++++++------------- 1 file changed, 122 insertions(+), 61 deletions(-) diff --git a/tests/test_objectives.py b/tests/test_objectives.py index 06a42fef0..d9eec568a 100644 --- a/tests/test_objectives.py +++ b/tests/test_objectives.py @@ -1,34 +1,64 @@ +from numpy.random import rand from pytest import fixture, mark YEAR = 2030 @fixture -def _demand(demand_share): - return demand_share +def objective_data(technologies, market, demand_share): + """Creates the complete dataset needed for objective calculations. + + The transformation follows these steps: + 1. Extract year-specific data from technologies and market + 2. Transform technology data to asset level with replacement dimension + 3. Transform price data to asset level + 4. Add any additional variables needed for specific objectives + + Returns: + dict: Contains all necessary data for objective calculations: + - technologies: Technology parameters with replacement dimension + - prices: Prices relevant to each asset + - demand: Demand share data + """ + from muse.utilities import broadcast_over_assets + # Step 1: Extract year-specific data + tech_year = technologies.sel(year=YEAR).rename(technology="replacement") + prices_year = market.prices.sel(year=YEAR) -@fixture -def _technologies(technologies, demand_share): - from muse.utilities import broadcast_over_assets + # Step 2 & 3: Transform to asset level + tech_assets = broadcast_over_assets(tech_year, demand_share) + prices_assets = broadcast_over_assets( + prices_year, demand_share, installed_as_year=False + ) - techs = technologies.sel(year=YEAR).rename(technology="replacement") - return broadcast_over_assets(techs, demand_share) + # Step 4: Add computed variables needed by some objectives + tech_assets["comfort"] = _add_var(tech_assets, "replacement") + tech_assets["efficiency"] = _add_var(tech_assets, "replacement") + tech_assets["scaling_size"] = _add_var(tech_assets, "replacement") + return { + "technologies": tech_assets, + "prices": prices_assets, + "demand": demand_share, + } -@fixture -def _prices(market, demand_share): - from muse.utilities import broadcast_over_assets - prices = market.prices.sel(year=YEAR) - return broadcast_over_assets(prices, demand_share, installed_as_year=False) +def _add_var(coordinates, *dims, factor=100.0): + """Helper function to add random variables with specified dimensions.""" + shape = tuple(len(coordinates[u]) for u in dims) + return dims, (rand(*shape) * factor).astype(type(factor)) -def test_fixtures(_technologies, _demand, _prices): - """Validating that the fixtures have appropriate dimensions.""" - assert set(_technologies.dims) == {"asset", "commodity", "replacement"} - assert set(_demand.dims) == {"asset", "commodity", "timeslice"} - assert set(_prices.dims) == {"asset", "commodity", "timeslice"} +def test_fixtures(objective_data): + """Validating that the fixture data has appropriate dimensions.""" + assert set(objective_data["technologies"].dims) == { + "asset", + "commodity", + "replacement", + } + assert set(objective_data["demand"].dims) == {"asset", "commodity", "timeslice"} + assert set(objective_data["prices"].dims) == {"asset", "commodity", "timeslice"} @mark.usefixtures("save_registries") @@ -51,7 +81,7 @@ def b_objective(*args, **kwargs): @mark.usefixtures("save_registries") -def test_computing_objectives(_technologies, _demand, _prices): +def test_computing_objectives(objective_data): from muse.objectives import factory, register_objective @register_objective @@ -78,20 +108,27 @@ def second(technologies, demand, assets=None, *args, **kwargs): # Test first objective with/without switch objectives = factory("first")( - technologies=_technologies, demand=_demand, prices=_prices, switch=True + technologies=objective_data["technologies"], + demand=objective_data["demand"], + prices=objective_data["prices"], + switch=True, ) assert set(objectives.data_vars) == {"first"} assert (objectives.first == 1).all() + objectives = factory("first")( - technologies=_technologies, demand=_demand, prices=_prices, switch=False + technologies=objective_data["technologies"], + demand=objective_data["demand"], + prices=objective_data["prices"], + switch=False, ) assert (objectives.first == 2).all() # Test multiple objectives objectives = factory(["first", "second"])( - technologies=_technologies, - demand=_demand, - prices=_prices, + technologies=objective_data["technologies"], + demand=objective_data["demand"], + prices=objective_data["prices"], switch=False, assets=0, ) @@ -103,109 +140,133 @@ def second(technologies, demand, assets=None, *args, **kwargs): assert (objectives.second.isel(asset=1) == 5).all() -def test_comfort(_technologies, _demand): +def test_comfort(objective_data): from muse.objectives import comfort - _technologies["comfort"] = add_var(_technologies, "replacement") - result = comfort(_technologies, _demand) + result = comfort(objective_data["technologies"], objective_data["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_efficiency(_technologies, _demand): +def test_efficiency(objective_data): from muse.objectives import efficiency - _technologies["efficiency"] = add_var(_technologies, "replacement") - result = efficiency(_technologies, _demand) + result = efficiency(objective_data["technologies"], objective_data["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_capacity_to_service_demand(_technologies, _demand): +def test_capacity_to_service_demand(objective_data): from muse.objectives import capacity_to_service_demand - result = capacity_to_service_demand(_technologies, _demand) + result = capacity_to_service_demand( + objective_data["technologies"], objective_data["demand"] + ) assert set(result.dims) == {"replacement", "asset"} -def test_capacity_in_use(_technologies, _demand): +def test_capacity_in_use(objective_data): from muse.objectives import capacity_in_use - result = capacity_in_use(_technologies, _demand) + result = capacity_in_use(objective_data["technologies"], objective_data["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_consumption(_technologies, _demand, _prices): +def test_consumption(objective_data): from muse.objectives import consumption - result = consumption(_technologies, _demand, _prices) + result = consumption( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_fixed_costs(_technologies, _demand): +def test_fixed_costs(objective_data): from muse.objectives import fixed_costs - result = fixed_costs(_technologies, _demand) + result = fixed_costs(objective_data["technologies"], objective_data["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_capital_costs(_technologies, _demand): +def test_capital_costs(objective_data): from muse.objectives import capital_costs - _technologies["scaling_size"] = add_var(_technologies, "replacement") - result = capital_costs(_technologies, _demand) + result = capital_costs(objective_data["technologies"], objective_data["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_emission_cost(_technologies, _demand, _prices): +def test_emission_cost(objective_data): from muse.objectives import emission_cost - result = emission_cost(_technologies, _demand, _prices) + result = emission_cost( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_fuel_consumption_cost(_technologies, _demand, _prices): +def test_fuel_consumption_cost(objective_data): from muse.objectives import fuel_consumption_cost - result = fuel_consumption_cost(_technologies, _demand, _prices) + result = fuel_consumption_cost( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_annual_levelized_cost_of_energy(_technologies, _demand, _prices): +def test_annual_levelized_cost_of_energy(objective_data): from muse.objectives import annual_levelized_cost_of_energy - result = annual_levelized_cost_of_energy(_technologies, _demand, _prices) + result = annual_levelized_cost_of_energy( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_lifetime_levelized_cost_of_energy(_technologies, _demand, _prices): +def test_lifetime_levelized_cost_of_energy(objective_data): from muse.objectives import lifetime_levelized_cost_of_energy - result = lifetime_levelized_cost_of_energy(_technologies, _demand, _prices) + result = lifetime_levelized_cost_of_energy( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_net_present_value(_technologies, _demand, _prices): +def test_net_present_value(objective_data): from muse.objectives import net_present_value - result = net_present_value(_technologies, _demand, _prices) + result = net_present_value( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_net_present_cost(_technologies, _demand, _prices): +def test_net_present_cost(objective_data): from muse.objectives import net_present_cost - result = net_present_cost(_technologies, _demand, _prices) + result = net_present_cost( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset"} -def test_equivalent_annual_cost(_technologies, _demand, _prices): +def test_equivalent_annual_cost(objective_data): from muse.objectives import equivalent_annual_cost - result = equivalent_annual_cost(_technologies, _demand, _prices) + result = equivalent_annual_cost( + objective_data["technologies"], + objective_data["demand"], + objective_data["prices"], + ) assert set(result.dims) == {"replacement", "asset"} - - -def add_var(coordinates, *dims, factor=100.0): - from numpy.random import rand - - shape = tuple(len(coordinates[u]) for u in dims) - return dims, (rand(*shape) * factor).astype(type(factor)) From c251d533224fe38d12c41b676c2000a920a96994 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Mon, 9 Jun 2025 17:19:30 +0100 Subject: [PATCH 08/12] Combine fixtures in test_constraints --- tests/test_constraints.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tests/test_constraints.py b/tests/test_constraints.py index 42bfb934d..740b19341 100644 --- a/tests/test_constraints.py +++ b/tests/test_constraints.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd import xarray as xr -from pytest import approx, fixture, raises +from pytest import approx, fixture, mark, raises from muse import examples from muse.constraints import ( @@ -65,7 +65,6 @@ def model_data(): ).sel(year=INVESTMENT_YEAR).groupby("technology").sum("asset").rename( technology="asset" ) - # Remove un-demanded commodities market_demand = market_demand.sel( commodity=(market_demand > 0).any(dim=["timeslice", "asset"]) @@ -285,6 +284,9 @@ def test_lp_constraints_matrix_b_is_scalar(lpcosts, constraints): """B is a scalar - output should be equivalent to a single row matrix.""" constraint = constraints["max_production"] + constraint = constraint_data["constraints"]["max_production"] + lpcosts = constraint_data["lp_costs"] + for attr in ["capacity", "production"]: lpconstraint = lp_constraint_matrix( xr.DataArray(1), getattr(constraint, attr), getattr(lpcosts, attr) @@ -300,6 +302,9 @@ def test_max_production_constraint_diagonal(lpcosts, constraints): """Test production side of max capacity production is diagonal.""" constraint = constraints["max_production"] + constraint = constraint_data["constraints"]["max_production"] + lpcosts = constraint_data["lp_costs"] + # Test capacity constraints result = lp_constraint_matrix(constraint.b, constraint.capacity, lpcosts.capacity) decision_dims = {f"d({x})" for x in lpcosts.capacity.dims} @@ -329,6 +334,9 @@ def test_lp_constraint(lpcosts, constraints): constraint = constraints["max_production"] + constraint = constraint_data["constraints"]["max_production"] + lpcosts = constraint_data["lp_costs"] + result = lp_constraint(constraint, lpcosts) constraint_dims = { f"c({x})" for x in set(lpcosts.production.dims).union(constraint.b.dims) @@ -533,6 +541,13 @@ def test_scipy_solver(model_data, lp_inputs, constraints): """Test the scipy solver for demand matching.""" from muse.investments import scipy_match_demand + constraints = [ + constraint_data["constraints"]["max_production"], + constraint_data["constraints"]["max_capacity_expansion"], + constraint_data["constraints"]["demand"], + constraint_data["constraints"]["demand_limiting_capacity"], + ] + solution = scipy_match_demand( costs=lp_inputs["capacity_costs"], commodities=lp_inputs["commodities"], From 5f7fd6b2e928ea30e7157d4b703678832ba149b5 Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Mon, 9 Jun 2025 20:12:24 +0100 Subject: [PATCH 09/12] Rename fixtures --- tests/test_costs.py | 2 +- tests/test_objectives.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_costs.py b/tests/test_costs.py index 621e11d16..b3ba0e913 100644 --- a/tests/test_costs.py +++ b/tests/test_costs.py @@ -26,7 +26,7 @@ @fixture -def cost_data(technologies, market, demand_share): +def cost_inputs(technologies, market, demand_share): """Creates the complete dataset needed for cost calculations. The transformation follows these steps: diff --git a/tests/test_objectives.py b/tests/test_objectives.py index d9eec568a..b51a5d903 100644 --- a/tests/test_objectives.py +++ b/tests/test_objectives.py @@ -5,7 +5,7 @@ @fixture -def objective_data(technologies, market, demand_share): +def objective_inputs(technologies, market, demand_share): """Creates the complete dataset needed for objective calculations. The transformation follows these steps: From f4d10495b557ae1fcac0333abc24becced54f78c Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Mon, 9 Jun 2025 20:20:37 +0100 Subject: [PATCH 10/12] Properly rename fixtures --- tests/test_costs.py | 270 +++++++++++++++++++-------------------- tests/test_objectives.py | 118 ++++++++--------- 2 files changed, 195 insertions(+), 193 deletions(-) diff --git a/tests/test_costs.py b/tests/test_costs.py index b3ba0e913..8b7b54785 100644 --- a/tests/test_costs.py +++ b/tests/test_costs.py @@ -78,157 +78,157 @@ def cost_inputs(technologies, market, demand_share): } -def test_fixtures(cost_data): +def test_fixtures(cost_inputs): """Validate fixture dimensions.""" - assert set(cost_data["technologies"].dims) == {"asset", "commodity"} - assert set(cost_data["prices"].dims) == {"asset", "commodity", "timeslice"} - assert set(cost_data["capacity"].dims) == {"asset"} - assert set(cost_data["production"].dims) == {"asset", "commodity", "timeslice"} - assert set(cost_data["consumption"].dims) == {"asset", "commodity", "timeslice"} + assert set(cost_inputs["technologies"].dims) == {"asset", "commodity"} + assert set(cost_inputs["prices"].dims) == {"asset", "commodity", "timeslice"} + assert set(cost_inputs["capacity"].dims) == {"asset"} + assert set(cost_inputs["production"].dims) == {"asset", "commodity", "timeslice"} + assert set(cost_inputs["consumption"].dims) == {"asset", "commodity", "timeslice"} -def test_capital_costs(cost_data): - result = capital_costs(cost_data["technologies"], cost_data["capacity"]) +def test_capital_costs(cost_inputs): + result = capital_costs(cost_inputs["technologies"], cost_inputs["capacity"]) assert set(result.dims) == {"asset"} -def test_environmental_costs(cost_data): +def test_environmental_costs(cost_inputs): result = environmental_costs( - cost_data["technologies"], cost_data["prices"], cost_data["production"] + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["production"] ) assert set(result.dims) == {"asset", "timeslice"} -def test_fuel_costs(cost_data): +def test_fuel_costs(cost_inputs): result = fuel_costs( - cost_data["technologies"], cost_data["prices"], cost_data["consumption"] + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["consumption"] ) assert set(result.dims) == {"asset", "timeslice"} -def test_material_costs(cost_data): +def test_material_costs(cost_inputs): result = material_costs( - cost_data["technologies"], cost_data["prices"], cost_data["consumption"] + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["consumption"] ) assert set(result.dims) == {"asset", "timeslice"} -def test_fixed_costs(cost_data): - result = fixed_costs(cost_data["technologies"], cost_data["capacity"]) +def test_fixed_costs(cost_inputs): + result = fixed_costs(cost_inputs["technologies"], cost_inputs["capacity"]) assert set(result.dims) == {"asset"} -def test_variable_costs(cost_data): - result = variable_costs(cost_data["technologies"], cost_data["production"]) +def test_variable_costs(cost_inputs): + result = variable_costs(cost_inputs["technologies"], cost_inputs["production"]) assert set(result.dims) == {"asset"} -def test_running_costs(cost_data): +def test_running_costs(cost_inputs): result = running_costs( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} -def test_net_present_value(cost_data): +def test_net_present_value(cost_inputs): result = net_present_value( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} -def test_net_present_cost(cost_data): +def test_net_present_cost(cost_inputs): result = net_present_cost( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} -def test_equivalent_annual_cost(cost_data): +def test_equivalent_annual_cost(cost_inputs): result = equivalent_annual_cost( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) assert set(result.dims) == {"asset", "timeslice"} @mark.parametrize("method", ["annual", "lifetime"]) -def test_levelized_cost_of_energy(cost_data, method): +def test_levelized_cost_of_energy(cost_inputs, method): result = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) assert set(result.dims) == {"asset", "timeslice"} -def test_supply_cost(cost_data): +def test_supply_cost(cost_inputs): lcoe = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method="annual", ) - result = supply_cost(cost_data["production"], lcoe) + result = supply_cost(cost_inputs["production"], lcoe) assert set(result.dims) == {"commodity", "region", "timeslice"} -def test_capital_recovery_factor(cost_data): - result = capital_recovery_factor(cost_data["technologies"]) - assert set(result.dims) == set(cost_data["technologies"].interest_rate.dims) +def test_capital_recovery_factor(cost_inputs): + result = capital_recovery_factor(cost_inputs["technologies"]) + assert set(result.dims) == set(cost_inputs["technologies"].interest_rate.dims) # Test zero interest rates - cost_data["technologies"]["interest_rate"] = 0 - result = capital_recovery_factor(cost_data["technologies"]) + cost_inputs["technologies"]["interest_rate"] = 0 + result = capital_recovery_factor(cost_inputs["technologies"]) assert isfinite(result).all() -def test_annual_to_lifetime(cost_data): +def test_annual_to_lifetime(cost_inputs): _fuel_costs = fuel_costs( - cost_data["technologies"], cost_data["prices"], cost_data["consumption"] + cost_inputs["technologies"], cost_inputs["prices"], cost_inputs["consumption"] ) - _fuel_costs_lifetime = annual_to_lifetime(_fuel_costs, cost_data["technologies"]) + _fuel_costs_lifetime = annual_to_lifetime(_fuel_costs, cost_inputs["technologies"]) assert set(_fuel_costs.dims) == set(_fuel_costs_lifetime.dims) assert (_fuel_costs_lifetime > _fuel_costs).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_flow_scaling(cost_data, method): +def test_lcoe_flow_scaling(cost_inputs, method): """Test LCOE independence of input/output flow scaling.""" - cost_data["technologies"]["var_exp"] = 1 + cost_inputs["technologies"]["var_exp"] = 1 # Original LCOE lcoe1 = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) # Scale inputs/outputs and var_par by 2 - technologies_scaled = cost_data["technologies"].copy() + technologies_scaled = cost_inputs["technologies"].copy() technologies_scaled["fixed_inputs"] *= 2 technologies_scaled["flexible_inputs"] *= 2 technologies_scaled["fixed_outputs"] *= 2 @@ -236,79 +236,79 @@ def test_lcoe_flow_scaling(cost_data, method): lcoe2 = levelized_cost_of_energy( technologies_scaled, - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) assert isclose(lcoe1, lcoe2).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_prod_scaling(cost_data, method): +def test_lcoe_prod_scaling(cost_inputs, method): """Test LCOE independence of production scaling with linear costs.""" - cost_data["technologies"]["var_exp"] = 1 - cost_data["technologies"]["cap_exp"] = 1 - cost_data["technologies"]["fix_exp"] = 1 + cost_inputs["technologies"]["var_exp"] = 1 + cost_inputs["technologies"]["cap_exp"] = 1 + cost_inputs["technologies"]["fix_exp"] = 1 lcoe1 = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) lcoe2 = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"] * 2, - cost_data["production"] * 2, - cost_data["consumption"] * 2, + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"] * 2, + cost_inputs["production"] * 2, + cost_inputs["consumption"] * 2, method=method, ) assert isclose(lcoe1, lcoe2).all() @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_equal_prices(cost_data, method): +def test_lcoe_equal_prices(cost_inputs, method): """Test LCOE behavior with uniform prices across timeslices.""" lcoe1 = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) with raises(AssertionError): assert_allclose(lcoe1, broadcast_timeslice(lcoe1.isel(timeslice=0))) # Test with uniform prices - prices_uniform = broadcast_timeslice(cost_data["prices"].mean("timeslice")) + prices_uniform = broadcast_timeslice(cost_inputs["prices"].mean("timeslice")) lcoe2 = levelized_cost_of_energy( - cost_data["technologies"], + cost_inputs["technologies"], prices_uniform, - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) assert_allclose(lcoe2, broadcast_timeslice(lcoe2.isel(timeslice=0))) -def test_npv_equal_prices(cost_data): +def test_npv_equal_prices(cost_inputs): """Test NPV linearity with production under uniform prices.""" npv1 = net_present_value( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) tech_activity = production_amplitude( - cost_data["production"], cost_data["technologies"] + cost_inputs["production"], cost_inputs["technologies"] ) npv1_scaled = npv1 / tech_activity @@ -316,41 +316,41 @@ def test_npv_equal_prices(cost_data): assert_allclose(npv1_scaled, broadcast_timeslice(npv1_scaled.isel(timeslice=0))) # Test with uniform prices - prices_uniform = broadcast_timeslice(cost_data["prices"].mean("timeslice")) + prices_uniform = broadcast_timeslice(cost_inputs["prices"].mean("timeslice")) npv2 = net_present_value( - cost_data["technologies"], + cost_inputs["technologies"], prices_uniform, - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], ) npv2_scaled = npv2 / tech_activity assert_allclose(npv2_scaled, broadcast_timeslice(npv2_scaled.isel(timeslice=0))) @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_zero_production(cost_data, method): +def test_lcoe_zero_production(cost_inputs, method): """Test LCOE behavior with zero production.""" lcoe1 = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, ) assert not (lcoe1.isel(timeslice=0) == 0).all() # Test with zero production in first timeslice - production_zero = cost_data["production"].copy() - consumption_zero = cost_data["consumption"].copy() + production_zero = cost_inputs["production"].copy() + consumption_zero = cost_inputs["consumption"].copy() production_zero.isel(timeslice=0)[:] = 0 consumption_zero.isel(timeslice=0)[:] = 0 lcoe2 = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], production_zero, consumption_zero, method=method, @@ -359,28 +359,28 @@ def test_lcoe_zero_production(cost_data, method): @mark.parametrize("method", ["annual", "lifetime"]) -def test_lcoe_aggregate(cost_data, method): +def test_lcoe_aggregate(cost_inputs, method): """Test LCOE aggregation over timeslices.""" result = levelized_cost_of_energy( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], method=method, aggregate_timeslices=True, ) assert set(result.dims) == {"asset"} -def test_npv_aggregate(cost_data): +def test_npv_aggregate(cost_inputs): """Test NPV aggregation over timeslices.""" result = net_present_value( - cost_data["technologies"], - cost_data["prices"], - cost_data["capacity"], - cost_data["production"], - cost_data["consumption"], + cost_inputs["technologies"], + cost_inputs["prices"], + cost_inputs["capacity"], + cost_inputs["production"], + cost_inputs["consumption"], aggregate_timeslices=True, ) assert set(result.dims) == {"asset"} diff --git a/tests/test_objectives.py b/tests/test_objectives.py index b51a5d903..78461b643 100644 --- a/tests/test_objectives.py +++ b/tests/test_objectives.py @@ -50,15 +50,15 @@ def _add_var(coordinates, *dims, factor=100.0): return dims, (rand(*shape) * factor).astype(type(factor)) -def test_fixtures(objective_data): +def test_fixtures(objective_inputs): """Validating that the fixture data has appropriate dimensions.""" - assert set(objective_data["technologies"].dims) == { + assert set(objective_inputs["technologies"].dims) == { "asset", "commodity", "replacement", } - assert set(objective_data["demand"].dims) == {"asset", "commodity", "timeslice"} - assert set(objective_data["prices"].dims) == {"asset", "commodity", "timeslice"} + assert set(objective_inputs["demand"].dims) == {"asset", "commodity", "timeslice"} + assert set(objective_inputs["prices"].dims) == {"asset", "commodity", "timeslice"} @mark.usefixtures("save_registries") @@ -81,7 +81,7 @@ def b_objective(*args, **kwargs): @mark.usefixtures("save_registries") -def test_computing_objectives(objective_data): +def test_computing_objectives(objective_inputs): from muse.objectives import factory, register_objective @register_objective @@ -108,27 +108,27 @@ def second(technologies, demand, assets=None, *args, **kwargs): # Test first objective with/without switch objectives = factory("first")( - technologies=objective_data["technologies"], - demand=objective_data["demand"], - prices=objective_data["prices"], + technologies=objective_inputs["technologies"], + demand=objective_inputs["demand"], + prices=objective_inputs["prices"], switch=True, ) assert set(objectives.data_vars) == {"first"} assert (objectives.first == 1).all() objectives = factory("first")( - technologies=objective_data["technologies"], - demand=objective_data["demand"], - prices=objective_data["prices"], + technologies=objective_inputs["technologies"], + demand=objective_inputs["demand"], + prices=objective_inputs["prices"], switch=False, ) assert (objectives.first == 2).all() # Test multiple objectives objectives = factory(["first", "second"])( - technologies=objective_data["technologies"], - demand=objective_data["demand"], - prices=objective_data["prices"], + technologies=objective_inputs["technologies"], + demand=objective_inputs["demand"], + prices=objective_inputs["prices"], switch=False, assets=0, ) @@ -140,133 +140,135 @@ def second(technologies, demand, assets=None, *args, **kwargs): assert (objectives.second.isel(asset=1) == 5).all() -def test_comfort(objective_data): +def test_comfort(objective_inputs): from muse.objectives import comfort - result = comfort(objective_data["technologies"], objective_data["demand"]) + result = comfort(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_efficiency(objective_data): +def test_efficiency(objective_inputs): from muse.objectives import efficiency - result = efficiency(objective_data["technologies"], objective_data["demand"]) + result = efficiency(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_capacity_to_service_demand(objective_data): +def test_capacity_to_service_demand(objective_inputs): from muse.objectives import capacity_to_service_demand result = capacity_to_service_demand( - objective_data["technologies"], objective_data["demand"] + objective_inputs["technologies"], objective_inputs["demand"] ) assert set(result.dims) == {"replacement", "asset"} -def test_capacity_in_use(objective_data): +def test_capacity_in_use(objective_inputs): from muse.objectives import capacity_in_use - result = capacity_in_use(objective_data["technologies"], objective_data["demand"]) + result = capacity_in_use( + objective_inputs["technologies"], objective_inputs["demand"] + ) assert set(result.dims) == {"replacement", "asset"} -def test_consumption(objective_data): +def test_consumption(objective_inputs): from muse.objectives import consumption result = consumption( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_fixed_costs(objective_data): +def test_fixed_costs(objective_inputs): from muse.objectives import fixed_costs - result = fixed_costs(objective_data["technologies"], objective_data["demand"]) + result = fixed_costs(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_capital_costs(objective_data): +def test_capital_costs(objective_inputs): from muse.objectives import capital_costs - result = capital_costs(objective_data["technologies"], objective_data["demand"]) + result = capital_costs(objective_inputs["technologies"], objective_inputs["demand"]) assert set(result.dims) == {"replacement", "asset"} -def test_emission_cost(objective_data): +def test_emission_cost(objective_inputs): from muse.objectives import emission_cost result = emission_cost( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_fuel_consumption_cost(objective_data): +def test_fuel_consumption_cost(objective_inputs): from muse.objectives import fuel_consumption_cost result = fuel_consumption_cost( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset", "timeslice"} -def test_annual_levelized_cost_of_energy(objective_data): +def test_annual_levelized_cost_of_energy(objective_inputs): from muse.objectives import annual_levelized_cost_of_energy result = annual_levelized_cost_of_energy( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset"} -def test_lifetime_levelized_cost_of_energy(objective_data): +def test_lifetime_levelized_cost_of_energy(objective_inputs): from muse.objectives import lifetime_levelized_cost_of_energy result = lifetime_levelized_cost_of_energy( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset"} -def test_net_present_value(objective_data): +def test_net_present_value(objective_inputs): from muse.objectives import net_present_value result = net_present_value( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset"} -def test_net_present_cost(objective_data): +def test_net_present_cost(objective_inputs): from muse.objectives import net_present_cost result = net_present_cost( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset"} -def test_equivalent_annual_cost(objective_data): +def test_equivalent_annual_cost(objective_inputs): from muse.objectives import equivalent_annual_cost result = equivalent_annual_cost( - objective_data["technologies"], - objective_data["demand"], - objective_data["prices"], + objective_inputs["technologies"], + objective_inputs["demand"], + objective_inputs["prices"], ) assert set(result.dims) == {"replacement", "asset"} From d89ea09bc30ed48248828c568ca31578eb8ec5eb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 10 Jun 2025 14:13:25 +0000 Subject: [PATCH 11/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_constraints.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_constraints.py b/tests/test_constraints.py index 740b19341..0a89ee4b7 100644 --- a/tests/test_constraints.py +++ b/tests/test_constraints.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd import xarray as xr -from pytest import approx, fixture, mark, raises +from pytest import approx, fixture, raises from muse import examples from muse.constraints import ( From 37553711691db8f355517e8cb8551155ca048b3f Mon Sep 17 00:00:00 2001 From: Tom Bland Date: Tue, 10 Jun 2025 15:14:45 +0100 Subject: [PATCH 12/12] Revert changes to test_constraints --- tests/test_constraints.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/tests/test_constraints.py b/tests/test_constraints.py index 740b19341..42bfb934d 100644 --- a/tests/test_constraints.py +++ b/tests/test_constraints.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd import xarray as xr -from pytest import approx, fixture, mark, raises +from pytest import approx, fixture, raises from muse import examples from muse.constraints import ( @@ -65,6 +65,7 @@ def model_data(): ).sel(year=INVESTMENT_YEAR).groupby("technology").sum("asset").rename( technology="asset" ) + # Remove un-demanded commodities market_demand = market_demand.sel( commodity=(market_demand > 0).any(dim=["timeslice", "asset"]) @@ -284,9 +285,6 @@ def test_lp_constraints_matrix_b_is_scalar(lpcosts, constraints): """B is a scalar - output should be equivalent to a single row matrix.""" constraint = constraints["max_production"] - constraint = constraint_data["constraints"]["max_production"] - lpcosts = constraint_data["lp_costs"] - for attr in ["capacity", "production"]: lpconstraint = lp_constraint_matrix( xr.DataArray(1), getattr(constraint, attr), getattr(lpcosts, attr) @@ -302,9 +300,6 @@ def test_max_production_constraint_diagonal(lpcosts, constraints): """Test production side of max capacity production is diagonal.""" constraint = constraints["max_production"] - constraint = constraint_data["constraints"]["max_production"] - lpcosts = constraint_data["lp_costs"] - # Test capacity constraints result = lp_constraint_matrix(constraint.b, constraint.capacity, lpcosts.capacity) decision_dims = {f"d({x})" for x in lpcosts.capacity.dims} @@ -334,9 +329,6 @@ def test_lp_constraint(lpcosts, constraints): constraint = constraints["max_production"] - constraint = constraint_data["constraints"]["max_production"] - lpcosts = constraint_data["lp_costs"] - result = lp_constraint(constraint, lpcosts) constraint_dims = { f"c({x})" for x in set(lpcosts.production.dims).union(constraint.b.dims) @@ -541,13 +533,6 @@ def test_scipy_solver(model_data, lp_inputs, constraints): """Test the scipy solver for demand matching.""" from muse.investments import scipy_match_demand - constraints = [ - constraint_data["constraints"]["max_production"], - constraint_data["constraints"]["max_capacity_expansion"], - constraint_data["constraints"]["demand"], - constraint_data["constraints"]["demand_limiting_capacity"], - ] - solution = scipy_match_demand( costs=lp_inputs["capacity_costs"], commodities=lp_inputs["commodities"],