Skip to content

climate_ref_core.testing #

Test infrastructure for diagnostic testing.

This module provides: - TestCase and TestDataSpecification for defining test scenarios - YAML serialization for dataset catalogs (with paths stored separately) - RegressionValidator for validating pre-stored outputs - Utilities for CMEC bundle validation

RegressionValidator #

Validate diagnostic outputs from pre-stored regression data.

Loads regression outputs and validates CMEC bundles without running the diagnostic. Suitable for fast CI validation.

The regression data is expected at: test_data_dir/{diagnostic}/{test_case}/regression/

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
@frozen
class RegressionValidator:
    """
    Validate diagnostic outputs from pre-stored regression data.

    Loads regression outputs and validates CMEC bundles without
    running the diagnostic. Suitable for fast CI validation.

    The regression data is expected at:
    test_data_dir/{diagnostic}/{test_case}/regression/
    """

    diagnostic: Diagnostic
    test_case_name: str
    test_data_dir: Path

    @property
    def paths(self) -> TestCasePaths:
        """Get paths for this test case."""
        return TestCasePaths.from_test_data_dir(self.test_data_dir, self.diagnostic.slug, self.test_case_name)

    def has_regression_data(self) -> bool:
        """Check if regression data exists for this test case."""
        regression_path = self.paths.regression
        return regression_path.exists() and (regression_path / "diagnostic.json").exists()

    def load_regression_definition(self, tmp_dir: Path) -> ExecutionDefinition:
        """
        Load regression data and create an ExecutionDefinition.

        Copies regression data to tmp_dir and replaces path placeholders.
        """
        regression_path = self.paths.regression
        catalog_path = self.paths.catalog

        if not catalog_path.exists():
            raise FileNotFoundError(
                f"No catalog file at {catalog_path} for test case datasets. Run `ref test-cases fetch` first."
            )
        if not regression_path.exists():
            raise FileNotFoundError(
                f"No regression data at {regression_path}. Run 'ref test-cases run --force-regen' first."
            )

        output_dir = tmp_dir / "output"
        output_dir.mkdir(parents=True, exist_ok=True)
        shutil.copytree(regression_path, output_dir, dirs_exist_ok=True)

        # Replace placeholders with actual paths
        for pattern in ("*.json", "*.txt", "*.yaml", "*.yml"):
            for file in output_dir.rglob(pattern):
                content = file.read_text()
                content = content.replace("<OUTPUT_DIR>", str(output_dir))
                content = content.replace("<TEST_DATA_DIR>", str(self.test_data_dir))
                file.write_text(content)

        # Load datasets from catalog
        datasets: ExecutionDatasetCollection = load_datasets_from_yaml(catalog_path)

        return ExecutionDefinition(
            diagnostic=self.diagnostic,
            key=f"test-{self.test_case_name}",
            datasets=datasets,
            output_directory=output_dir,
            root_directory=tmp_dir,
        )

    def validate(self, definition: ExecutionDefinition) -> None:
        """Validate CMEC bundles and series in the regression output."""
        result = self.diagnostic.build_execution_result(definition)
        result.to_output_path("out.log").touch()  # Log file not tracked in regression
        validate_cmec_bundles(self.diagnostic, result)
        validate_series_regression(
            expected_path=self.paths.regression / "series.json",
            actual_path=definition.output_directory / "series.json",
            slug=self.diagnostic.slug,
            replacements={
                str(definition.output_directory): "<OUTPUT_DIR>",
                str(self.test_data_dir): "<TEST_DATA_DIR>",
            },
        )

paths property #

Get paths for this test case.

has_regression_data() #

Check if regression data exists for this test case.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def has_regression_data(self) -> bool:
    """Check if regression data exists for this test case."""
    regression_path = self.paths.regression
    return regression_path.exists() and (regression_path / "diagnostic.json").exists()

load_regression_definition(tmp_dir) #

Load regression data and create an ExecutionDefinition.

Copies regression data to tmp_dir and replaces path placeholders.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def load_regression_definition(self, tmp_dir: Path) -> ExecutionDefinition:
    """
    Load regression data and create an ExecutionDefinition.

    Copies regression data to tmp_dir and replaces path placeholders.
    """
    regression_path = self.paths.regression
    catalog_path = self.paths.catalog

    if not catalog_path.exists():
        raise FileNotFoundError(
            f"No catalog file at {catalog_path} for test case datasets. Run `ref test-cases fetch` first."
        )
    if not regression_path.exists():
        raise FileNotFoundError(
            f"No regression data at {regression_path}. Run 'ref test-cases run --force-regen' first."
        )

    output_dir = tmp_dir / "output"
    output_dir.mkdir(parents=True, exist_ok=True)
    shutil.copytree(regression_path, output_dir, dirs_exist_ok=True)

    # Replace placeholders with actual paths
    for pattern in ("*.json", "*.txt", "*.yaml", "*.yml"):
        for file in output_dir.rglob(pattern):
            content = file.read_text()
            content = content.replace("<OUTPUT_DIR>", str(output_dir))
            content = content.replace("<TEST_DATA_DIR>", str(self.test_data_dir))
            file.write_text(content)

    # Load datasets from catalog
    datasets: ExecutionDatasetCollection = load_datasets_from_yaml(catalog_path)

    return ExecutionDefinition(
        diagnostic=self.diagnostic,
        key=f"test-{self.test_case_name}",
        datasets=datasets,
        output_directory=output_dir,
        root_directory=tmp_dir,
    )

validate(definition) #

Validate CMEC bundles and series in the regression output.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def validate(self, definition: ExecutionDefinition) -> None:
    """Validate CMEC bundles and series in the regression output."""
    result = self.diagnostic.build_execution_result(definition)
    result.to_output_path("out.log").touch()  # Log file not tracked in regression
    validate_cmec_bundles(self.diagnostic, result)
    validate_series_regression(
        expected_path=self.paths.regression / "series.json",
        actual_path=definition.output_directory / "series.json",
        slug=self.diagnostic.slug,
        replacements={
            str(definition.output_directory): "<OUTPUT_DIR>",
            str(self.test_data_dir): "<TEST_DATA_DIR>",
        },
    )

TestCase #

A single test case for a diagnostic.

Test cases define scenarios for testing, with data resolved via: - requests: ESGF requests to fetch data (use ref test-cases fetch) - datasets_file: Path to a pre-built catalog YAML file

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
@frozen
class TestCase:
    """
    A single test case for a diagnostic.

    Test cases define scenarios for testing, with data resolved via:
    - `requests`: ESGF requests to fetch data (use `ref test-cases fetch`)
    - `datasets_file`: Path to a pre-built catalog YAML file
    """

    name: str
    """Name of the test case (e.g., 'default', 'short-timeseries')."""

    description: str
    """Human-readable description of what this test case covers."""

    requests: tuple[ESGFRequest, ...] | None = None
    """Optional ESGF requests to fetch data for this test case."""

    datasets_file: str | None = None
    """Path to YAML file with dataset specification (relative to package)."""

datasets_file = None class-attribute instance-attribute #

Path to YAML file with dataset specification (relative to package).

description instance-attribute #

Human-readable description of what this test case covers.

name instance-attribute #

Name of the test case (e.g., 'default', 'short-timeseries').

requests = None class-attribute instance-attribute #

Optional ESGF requests to fetch data for this test case.

TestCasePaths #

Path resolver for test case data.

Provides access to all paths within a test case directory: - catalog.yaml: Dataset metadata (tracked in git) - catalog.paths.yaml: Local file paths (gitignored) - regression/: Regression outputs (tracked in git)

Can be constructed from: - A diagnostic + test case name (auto-resolves provider's test-data dir) - An explicit test_data_dir + diagnostic slug + test case name

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
@frozen
class TestCasePaths:
    """
    Path resolver for test case data.

    Provides access to all paths within a test case directory:
    - catalog.yaml: Dataset metadata (tracked in git)
    - catalog.paths.yaml: Local file paths (gitignored)
    - regression/: Regression outputs (tracked in git)

    Can be constructed from:
    - A diagnostic + test case name (auto-resolves provider's test-data dir)
    - An explicit test_data_dir + diagnostic slug + test case name
    """

    root: Path
    """The test case directory (test_data_dir / diagnostic_slug / test_case_name)."""

    @classmethod
    def from_diagnostic(cls, diagnostic: Diagnostic, test_case: str) -> TestCasePaths | None:
        """
        Create from a diagnostic, auto-resolving the provider's test-data directory.

        Returns None if the provider's test-data directory cannot be determined
        (e.g., not a development checkout).

        Parameters
        ----------
        diagnostic
            The diagnostic to get paths for
        test_case
            Test case name (e.g., 'default')
        """
        test_data_dir = _get_provider_test_data_dir(diagnostic)
        if test_data_dir is None:
            return None
        return cls(root=test_data_dir / diagnostic.slug / test_case)

    @classmethod
    def from_test_data_dir(
        cls,
        test_data_dir: Path,
        diagnostic_slug: str,
        test_case: str,
    ) -> TestCasePaths:
        """
        Create from an explicit test data directory.

        Use this when you have a test_data_dir fixture (in tests) or
        know the base path explicitly.

        Parameters
        ----------
        test_data_dir
            Base test data directory (e.g., from test fixture)
        diagnostic_slug
            The diagnostic slug
        test_case
            Test case name (e.g., 'default')
        """
        return cls(root=test_data_dir / diagnostic_slug / test_case)

    @property
    def catalog(self) -> Path:
        """Path to catalog.yaml."""
        return self.root / "catalog.yaml"

    @property
    def catalog_paths(self) -> Path:
        """Path to catalog.paths.yaml (gitignored, contains local file paths)."""
        return self.root / "catalog.paths.yaml"

    @property
    def regression(self) -> Path:
        """Path to regression/ directory."""
        return self.root / "regression"

    @property
    def regression_catalog_hash(self) -> Path:
        """Path to catalog hash file in regression directory."""
        return self.regression / ".catalog_hash"

    @property
    def test_data_dir(self) -> Path:
        """Path to the test-data directory (parent of diagnostic slug dir)."""
        return self.root.parent.parent

    def exists(self) -> bool:
        """Check if the test case directory exists."""
        return self.root.exists()

    def create(self) -> None:
        """Create the test case directory if it doesn't exist."""
        self.root.mkdir(parents=True, exist_ok=True)

catalog property #

Path to catalog.yaml.

catalog_paths property #

Path to catalog.paths.yaml (gitignored, contains local file paths).

regression property #

Path to regression/ directory.

regression_catalog_hash property #

Path to catalog hash file in regression directory.

root instance-attribute #

The test case directory (test_data_dir / diagnostic_slug / test_case_name).

test_data_dir property #

Path to the test-data directory (parent of diagnostic slug dir).

create() #

Create the test case directory if it doesn't exist.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def create(self) -> None:
    """Create the test case directory if it doesn't exist."""
    self.root.mkdir(parents=True, exist_ok=True)

exists() #

Check if the test case directory exists.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def exists(self) -> bool:
    """Check if the test case directory exists."""
    return self.root.exists()

from_diagnostic(diagnostic, test_case) classmethod #

Create from a diagnostic, auto-resolving the provider's test-data directory.

Returns None if the provider's test-data directory cannot be determined (e.g., not a development checkout).

Parameters:

Name Type Description Default
diagnostic Diagnostic

The diagnostic to get paths for

required
test_case str

Test case name (e.g., 'default')

required
Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
@classmethod
def from_diagnostic(cls, diagnostic: Diagnostic, test_case: str) -> TestCasePaths | None:
    """
    Create from a diagnostic, auto-resolving the provider's test-data directory.

    Returns None if the provider's test-data directory cannot be determined
    (e.g., not a development checkout).

    Parameters
    ----------
    diagnostic
        The diagnostic to get paths for
    test_case
        Test case name (e.g., 'default')
    """
    test_data_dir = _get_provider_test_data_dir(diagnostic)
    if test_data_dir is None:
        return None
    return cls(root=test_data_dir / diagnostic.slug / test_case)

from_test_data_dir(test_data_dir, diagnostic_slug, test_case) classmethod #

Create from an explicit test data directory.

Use this when you have a test_data_dir fixture (in tests) or know the base path explicitly.

Parameters:

Name Type Description Default
test_data_dir Path

Base test data directory (e.g., from test fixture)

required
diagnostic_slug str

The diagnostic slug

required
test_case str

Test case name (e.g., 'default')

required
Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
@classmethod
def from_test_data_dir(
    cls,
    test_data_dir: Path,
    diagnostic_slug: str,
    test_case: str,
) -> TestCasePaths:
    """
    Create from an explicit test data directory.

    Use this when you have a test_data_dir fixture (in tests) or
    know the base path explicitly.

    Parameters
    ----------
    test_data_dir
        Base test data directory (e.g., from test fixture)
    diagnostic_slug
        The diagnostic slug
    test_case
        Test case name (e.g., 'default')
    """
    return cls(root=test_data_dir / diagnostic_slug / test_case)

TestDataSpecification #

Test data specification for a diagnostic.

Contains multiple named test cases for testing different input datasets.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
@frozen
class TestDataSpecification:
    """
    Test data specification for a diagnostic.

    Contains multiple named test cases for testing different input datasets.
    """

    test_cases: tuple[TestCase, ...] = field(factory=tuple)
    """Collection of test cases for this diagnostic."""

    def get_case(self, name: str) -> TestCase:
        """
        Get a test case by name.

        Parameters
        ----------
        name
            Name of the test case to retrieve

        Returns
        -------
        TestCase
            The matching test case

        Raises
        ------
        StopIteration
            If no test case with that name exists
        """
        return next(tc for tc in self.test_cases if tc.name == name)

    def has_case(self, name: str) -> bool:
        """
        Check if a test case with the given name exists.

        Parameters
        ----------
        name
            Name of the test case to check

        Returns
        -------
        bool
            True if the test case exists
        """
        return any(tc.name == name for tc in self.test_cases)

    @property
    def case_names(self) -> list[str]:
        """Get names of all test cases."""
        return [tc.name for tc in self.test_cases]

case_names property #

Get names of all test cases.

test_cases = field(factory=tuple) class-attribute instance-attribute #

Collection of test cases for this diagnostic.

get_case(name) #

Get a test case by name.

Parameters:

Name Type Description Default
name str

Name of the test case to retrieve

required

Returns:

Type Description
TestCase

The matching test case

Raises:

Type Description
StopIteration

If no test case with that name exists

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def get_case(self, name: str) -> TestCase:
    """
    Get a test case by name.

    Parameters
    ----------
    name
        Name of the test case to retrieve

    Returns
    -------
    TestCase
        The matching test case

    Raises
    ------
    StopIteration
        If no test case with that name exists
    """
    return next(tc for tc in self.test_cases if tc.name == name)

has_case(name) #

Check if a test case with the given name exists.

Parameters:

Name Type Description Default
name str

Name of the test case to check

required

Returns:

Type Description
bool

True if the test case exists

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def has_case(self, name: str) -> bool:
    """
    Check if a test case with the given name exists.

    Parameters
    ----------
    name
        Name of the test case to check

    Returns
    -------
    bool
        True if the test case exists
    """
    return any(tc.name == name for tc in self.test_cases)

catalog_changed_since_regression(paths) #

Check if the catalog has changed since regression data was generated.

Returns True if: - No regression data exists (new test case) - No stored catalog hash exists (legacy regression data) - The catalog hash differs from the stored one

Parameters:

Name Type Description Default
paths TestCasePaths

TestCasePaths for the test case

required

Returns:

Type Description
bool

True if regression should be regenerated, False otherwise

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def catalog_changed_since_regression(paths: TestCasePaths) -> bool:
    """
    Check if the catalog has changed since regression data was generated.

    Returns True if:
    - No regression data exists (new test case)
    - No stored catalog hash exists (legacy regression data)
    - The catalog hash differs from the stored one

    Parameters
    ----------
    paths
        TestCasePaths for the test case

    Returns
    -------
    :
        True if regression should be regenerated, False otherwise
    """
    if not paths.regression.exists():
        return True  # No regression data, needs to run
    if not paths.regression_catalog_hash.exists():
        return True  # No stored hash, needs to run
    if not paths.catalog.exists():
        return True  # No catalog file, needs to run

    stored_hash = paths.regression_catalog_hash.read_text().strip()
    current_hash = get_catalog_hash(paths.catalog)

    return stored_hash != current_hash

collect_test_case_params(provider) #

Collect all diagnostic/test_case pairs from a provider for parameterized testing.

Returns a list of pytest.param objects with (diagnostic, test_case_name) tuples, each with an id of "{diagnostic.slug}/{test_case.name}".

Parameters:

Name Type Description Default
provider DiagnosticProvider

The diagnostic provider to collect test cases from

required

Returns:

Type Description
list[ParameterSet]

List of pytest.param objects for use with @pytest.mark.parametrize

Example
from climate_ref_core.testing import collect_test_case_params
from my_provider import provider

test_case_params = collect_test_case_params(provider)


@pytest.mark.parametrize("diagnostic,test_case_name", test_case_params)
def test_my_test(diagnostic, test_case_name): ...
Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def collect_test_case_params(provider: DiagnosticProvider) -> list[ParameterSet]:
    """
    Collect all diagnostic/test_case pairs from a provider for parameterized testing.

    Returns a list of pytest.param objects with (diagnostic, test_case_name) tuples,
    each with an id of "{diagnostic.slug}/{test_case.name}".

    Parameters
    ----------
    provider
        The diagnostic provider to collect test cases from

    Returns
    -------
    :
        List of pytest.param objects for use with @pytest.mark.parametrize

    Example
    -------
    ```python
    from climate_ref_core.testing import collect_test_case_params
    from my_provider import provider

    test_case_params = collect_test_case_params(provider)


    @pytest.mark.parametrize("diagnostic,test_case_name", test_case_params)
    def test_my_test(diagnostic, test_case_name): ...
    ```
    """
    import pytest  # noqa: PLC0415

    params: list[ParameterSet] = []
    for diagnostic in provider.diagnostics():
        if diagnostic.test_data_spec is None:
            continue
        for test_case in diagnostic.test_data_spec.test_cases:
            params.append(
                pytest.param(
                    diagnostic,
                    test_case.name,
                    id=f"{diagnostic.slug}/{test_case.name}",
                )
            )
    return params

get_catalog_hash(path) #

Get the hash stored in an existing catalog file.

Parameters:

Name Type Description Default
path Path

Path to the catalog YAML file

required

Returns:

Type Description
str | None

The hash string if found, None if file doesn't exist or has no hash

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def get_catalog_hash(path: Path) -> str | None:
    """
    Get the hash stored in an existing catalog file.

    Parameters
    ----------
    path
        Path to the catalog YAML file

    Returns
    -------
    :
        The hash string if found, None if file doesn't exist or has no hash
    """
    if not path.exists():
        return None
    with open(path) as f:
        data = yaml.safe_load(f)
    if data is None:
        return None
    hash_value = data.get("_metadata", {}).get("hash")
    return str(hash_value) if hash_value is not None else None

load_datasets_from_yaml(path) #

Load ExecutionDatasetCollection from a YAML file.

The YAML file structure:

cmip6:
  slug_column: instance_id
  selector:
    source_id: ACCESS-ESM1-5
  datasets:
    - instance_id: CMIP6.CMIP...
      variable_id: tas
      filename: tas_Amon_ACCESS-ESM1-5_historical_r1i1p1f1_gn_185001-201412.nc
      # ... other metadata

Paths are loaded from a separate .paths.yaml file if it exists, allowing the main catalog to be version-controlled while paths remain user-specific. Multi-file datasets have multiple rows with paths keyed by {instance_id}::{filename}.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def load_datasets_from_yaml(path: Path) -> ExecutionDatasetCollection:
    """
    Load ExecutionDatasetCollection from a YAML file.

    The YAML file structure:

    ```yaml
    cmip6:
      slug_column: instance_id
      selector:
        source_id: ACCESS-ESM1-5
      datasets:
        - instance_id: CMIP6.CMIP...
          variable_id: tas
          filename: tas_Amon_ACCESS-ESM1-5_historical_r1i1p1f1_gn_185001-201412.nc
          # ... other metadata
    ```

    Paths are loaded from a separate `.paths.yaml` file if it exists,
    allowing the main catalog to be version-controlled while paths
    remain user-specific. Multi-file datasets have multiple rows with
    paths keyed by `{instance_id}::{filename}`.
    """
    with open(path) as f:
        data = yaml.safe_load(f)

    # Load paths from separate file if it exists
    paths_file = _get_paths_file(path)
    paths_map: dict[str, str] = {}
    if paths_file.exists():
        with open(paths_file) as f:
            paths_map = yaml.safe_load(f) or {}

    collections: dict[SourceDatasetType | str, DatasetCollection] = {}

    for source_type_str, source_data in data.items():
        if source_type_str == "_metadata":
            continue  # Skip metadata section
        source_type = SourceDatasetType(source_type_str)
        selector_dict = source_data.get("selector", {})
        selector: Selector = tuple(sorted(selector_dict.items()))
        datasets_list = source_data.get("datasets", [])
        slug_column = source_data.get("slug_column", "instance_id")

        # Merge paths from paths file using composite key
        for dataset in datasets_list:
            instance_id = dataset.get(slug_column)
            filename = dataset.get("filename")
            if instance_id and filename:
                # Try composite key first (new format for multi-file datasets)
                composite_key = f"{instance_id}::{filename}"
                if composite_key in paths_map:
                    dataset["path"] = paths_map[composite_key]
                elif instance_id in paths_map:
                    # Fall back to simple key for backward compatibility
                    dataset["path"] = paths_map[instance_id]
            elif instance_id and instance_id in paths_map:
                # Legacy format without filename
                dataset["path"] = paths_map[instance_id]

        collections[source_type] = DatasetCollection(
            datasets=pd.DataFrame(datasets_list),
            slug_column=slug_column,
            selector=selector,
        )

    return ExecutionDatasetCollection(collections)

save_datasets_to_yaml(datasets, path, *, force=False) #

Save ExecutionDatasetCollection to a YAML file.

Paths are saved to a separate .paths.yaml file to allow the main catalog to be version-controlled while paths remain user-specific.

Multi-file datasets (e.g., time-chunked data) are stored as multiple rows, one per file. Paths are keyed by {instance_id}::{filename} to support multiple files per dataset.

By default, the catalog is only written if the content has changed (detected via hash comparison). Use force=True to always write.

Parameters:

Name Type Description Default
datasets ExecutionDatasetCollection

The datasets to save

required
path Path

Path to write the YAML file

required
force bool

If True, always write the catalog even if unchanged

False

Returns:

Type Description
bool

True if the catalog was written, False if skipped (unchanged)

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def save_datasets_to_yaml(
    datasets: ExecutionDatasetCollection,
    path: Path,
    *,
    force: bool = False,
) -> bool:
    """
    Save ExecutionDatasetCollection to a YAML file.

    Paths are saved to a separate `.paths.yaml` file to allow the main
    catalog to be version-controlled while paths remain user-specific.

    Multi-file datasets (e.g., time-chunked data) are stored as multiple rows,
    one per file. Paths are keyed by `{instance_id}::{filename}` to support
    multiple files per dataset.

    By default, the catalog is only written if the content has changed
    (detected via hash comparison). Use `force=True` to always write.

    Parameters
    ----------
    datasets
        The datasets to save
    path
        Path to write the YAML file
    force
        If True, always write the catalog even if unchanged

    Returns
    -------
    :
        True if the catalog was written, False if skipped (unchanged)
    """
    # Compute the hash first to check if we need to write
    new_hash = datasets.hash

    if not force:
        existing_hash = get_catalog_hash(path)
        if existing_hash == new_hash:
            logger.info(f"Catalog unchanged, skipping write: {path}")
            return False

    data: dict[str, Any] = {
        "_metadata": {"hash": new_hash},
    }
    paths_map: dict[str, str] = {}

    for source_type, collection in datasets.items():
        slug_column = collection.slug_column
        datasets_records = collection.datasets.to_dict(orient="records")

        # Extract paths to separate map, keeping all rows (including multi-file datasets)
        filtered_records = []
        for record in datasets_records:
            instance_id = record.get(slug_column)
            if instance_id and "path" in record:  # pragma: no branch
                file_path = record.pop("path")
                filename = Path(file_path).name
                # Store filename in record for matching when loading
                record["filename"] = filename
                # Use composite key to support multiple files per instance_id
                paths_map[f"{instance_id}::{filename}"] = file_path
                # Sanitize and sort fields within each record alphabetically
                sanitized_record = {k: _sanitize_for_yaml(v) for k, v in record.items()}
                sorted_record = dict(sorted(sanitized_record.items()))
                filtered_records.append(sorted_record)

        # Sort records by instance_id, then by filename for stability
        filtered_records.sort(key=lambda r: (r.get(slug_column, ""), r.get("filename", "")))

        data[source_type.value] = {
            "slug_column": slug_column,
            "selector": dict(collection.selector),
            "datasets": filtered_records,
        }

    path.parent.mkdir(parents=True, exist_ok=True)

    with open(path, "w") as f:
        yaml.dump(data, f, default_flow_style=False, sort_keys=False)

    paths_file = _get_paths_file(path)
    with open(paths_file, "w") as f:
        yaml.dump(paths_map, f, default_flow_style=False, sort_keys=False)
    logger.info(f"Saved catalog to {path} (paths: {paths_file})")
    return True

validate_cmec_bundles(diagnostic, result) #

Validate CMEC bundles in an execution result.

Performs structural validation of the metric and output bundles.

Raises:

Type Description
AssertionError

If the result is not successful or bundles are invalid

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def validate_cmec_bundles(diagnostic: Diagnostic, result: ExecutionResult) -> None:
    """
    Validate CMEC bundles in an execution result.

    Performs structural validation of the metric and output bundles.

    Raises
    ------
    AssertionError
        If the result is not successful or bundles are invalid
    """
    # TODO: Add content regression checks for the CMEC bundles (diagnostic.json /
    # output.json), mirroring `validate_series_regression`. These bundles are only
    # structurally validated here, so a diagnostic can change the metric/output
    # values without any regression test failing (see issue #703 for the series
    # equivalent). A content comparison must sanitise the regenerated bundle first
    # via `ExecutionRegression.output_replacements`, since the committed bundles
    # store `<OUTPUT_DIR>` / `<TEST_DATA_DIR>` placeholders.
    assert result.successful, f"Execution failed: {result}"

    # Validate metric bundle
    metric_bundle = CMECMetric.load_from_json(result.to_output_path(result.metric_bundle_filename))
    CMECMetric.model_validate(metric_bundle)

    # Check dimensions are a subset of diagnostic facets
    # Different data requirements may have different group_by fields,
    # so bundle dimensions vary per execution but must all be recognized facets
    bundle_dimensions = set(metric_bundle.DIMENSIONS.root["json_structure"])
    assert bundle_dimensions.issubset(set(diagnostic.facets)), (
        f"Bundle dimensions {bundle_dimensions} are not a subset of diagnostic facets {diagnostic.facets}"
    )

    # Validate output bundle
    CMECOutput.load_from_json(result.to_output_path(result.output_bundle_filename))

validate_series_regression(expected_path, actual_path, *, slug, replacements=None) #

Assert that regenerated series match the committed regression series.

If expected_path does not exist (legacy regression data without a stored series), the check is skipped.

Parameters:

Name Type Description Default
expected_path Path

Path to the committed series.json regression artifact. This file stores sanitized placeholders (e.g. <OUTPUT_DIR>).

required
actual_path Path

Path to the freshly regenerated series.json in the output directory. This file contains expanded absolute paths.

required
slug str

The diagnostic slug, used for error messages.

required
replacements dict[str, str] | None

Optional real path -> placeholder mapping applied to the regenerated series before comparison, mirroring the sanitisation used when the regression data was written.

None

Raises:

Type Description
AssertionError

If the regenerated series differ from the committed series.

Source code in packages/climate-ref-core/src/climate_ref_core/testing.py
def validate_series_regression(
    expected_path: Path,
    actual_path: Path,
    *,
    slug: str,
    replacements: dict[str, str] | None = None,
) -> None:
    """
    Assert that regenerated series match the committed regression series.

    If ``expected_path`` does not exist (legacy regression data without a stored series),
    the check is skipped.

    Parameters
    ----------
    expected_path
        Path to the committed ``series.json`` regression artifact. This file
        stores sanitized placeholders (e.g. ``<OUTPUT_DIR>``).
    actual_path
        Path to the freshly regenerated ``series.json`` in the output directory.
        This file contains expanded absolute paths.
    slug
        The diagnostic slug, used for error messages.
    replacements
        Optional ``real path -> placeholder`` mapping applied to the regenerated series before comparison,
        mirroring the sanitisation used when the regression data was written.

    Raises
    ------
    AssertionError
        If the regenerated series differ from the committed series.
    """
    if not expected_path.exists():
        return

    expected = SeriesMetricValue.load_from_json(expected_path)
    actual = _load_series_sanitised(actual_path, replacements or {})

    def _sorted_dump(series: list[SeriesMetricValue]) -> list[dict[str, Any]]:
        return sorted((s.model_dump(mode="json") for s in series), key=lambda s: repr(s["dimensions"]))

    assert len(actual) == len(expected), (
        f"Diagnostic {slug} produced {len(actual)} series but the committed series.json "
        f"has {len(expected)}. Regenerate the regression data with `--force-regen`."
    )
    assert _sorted_dump(actual) == _sorted_dump(expected), (
        f"Diagnostic {slug} produced series that differ from the committed series.json. "
        f"Regenerate the regression data with `--force-regen`."
    )