Skip to content

climate_ref.solve_helpers #

Helpers for understanding and regression-testing the solver's behavior.

This module provides functions to: - Generate parquet catalogs from local dataset directories - Load parquet catalogs for solver testing - Run the solver on catalogs and format results - Produce regression-friendly output for pytest-regressions

format_solve_results_json(results) #

Serialize solve results to JSON.

Parameters:

Name Type Description Default
results list[dict[str, Any]]

Results from :func:solve_to_results

required

Returns:

Type Description
str

JSON string of the results list

Source code in packages/climate-ref/src/climate_ref/solve_helpers.py
def format_solve_results_json(results: list[dict[str, Any]]) -> str:
    """
    Serialize solve results to JSON.

    Parameters
    ----------
    results
        Results from :func:`solve_to_results`

    Returns
    -------
    :
        JSON string of the results list
    """
    # Convert selectors (which contain tuples) to JSON-serializable form
    serializable = []
    for r in results:
        entry = dict(r)
        entry["selectors"] = {k: [[dim, val] for dim, val in v] for k, v in r["selectors"].items()}
        serializable.append(entry)
    return json.dumps(serializable, indent=2, sort_keys=True)

format_solve_results_table(results) #

Format solve results as a human-readable grouped text table.

Groups by provider, then diagnostic, showing dataset_key and matched instance_ids per source type.

Parameters:

Name Type Description Default
results list[dict[str, Any]]

Results from :func:solve_to_results

required

Returns:

Type Description
str

Human-readable text representation

Source code in packages/climate-ref/src/climate_ref/solve_helpers.py
def format_solve_results_table(results: list[dict[str, Any]]) -> str:
    """
    Format solve results as a human-readable grouped text table.

    Groups by provider, then diagnostic, showing dataset_key and matched
    instance_ids per source type.

    Parameters
    ----------
    results
        Results from :func:`solve_to_results`

    Returns
    -------
    :
        Human-readable text representation
    """
    if not results:
        return "No executions found."

    lines: list[str] = []
    current_provider = None
    current_diagnostic = None
    diagnostic_count = set()
    provider_count = set()

    for r in results:
        provider_count.add(r["provider"])
        diagnostic_count.add((r["provider"], r["diagnostic"]))

        if r["provider"] != current_provider:
            current_provider = r["provider"]
            current_diagnostic = None
            lines.append(f"\n## {current_provider}")

        if r["diagnostic"] != current_diagnostic:
            current_diagnostic = r["diagnostic"]
            lines.append(f"\n  ### {current_diagnostic}")

        lines.append(f"    {r['dataset_key']}")
        for source_type, instance_ids in sorted(r["datasets"].items()):
            for iid in instance_ids:
                lines.append(f"      [{source_type}] {iid}")

    lines.append("")
    lines.append(
        f"Summary: {len(results)} executions, "
        f"{len(diagnostic_count)} diagnostics, "
        f"{len(provider_count)} providers"
    )
    return "\n".join(lines)

generate_catalog(source_type, directories, strip_path_prefix=None) #

Scan directories using the appropriate DatasetAdapter and concatenate results.

Parameters:

Name Type Description Default
source_type str

Dataset source type (e.g. "cmip6", "obs4mips")

required
directories list[Path]

List of directories to scan for datasets

required
strip_path_prefix str | None

If provided, replace this prefix in path columns with {data_dir} for portability

None

Returns:

Type Description
DataFrame

DataFrame containing dataset metadata from all directories

Source code in packages/climate-ref/src/climate_ref/solve_helpers.py
def generate_catalog(
    source_type: str,
    directories: list[Path],
    strip_path_prefix: str | None = None,
) -> pd.DataFrame:
    """
    Scan directories using the appropriate DatasetAdapter and concatenate results.

    Parameters
    ----------
    source_type
        Dataset source type (e.g. "cmip6", "obs4mips")
    directories
        List of directories to scan for datasets
    strip_path_prefix
        If provided, replace this prefix in path columns with ``{data_dir}``
        for portability

    Returns
    -------
    :
        DataFrame containing dataset metadata from all directories
    """
    adapter = get_dataset_adapter(source_type)
    frames = []
    for directory in directories:
        try:
            df = adapter.find_local_datasets(directory)
        except (FileNotFoundError, OSError, ValueError) as exc:
            logger.debug(f"Skipping directory {directory}: {exc}")
            continue
        if len(df) > 0:
            frames.append(df)

    if not frames:
        return pd.DataFrame()

    catalog = pd.concat(frames, ignore_index=True)

    if strip_path_prefix and "path" in catalog.columns:
        prefix = strip_path_prefix
        catalog["path"] = (
            catalog["path"]
            .astype(str)
            .apply(lambda p: "{data_dir}" + p[len(prefix) :] if p.startswith(prefix) else p)
        )

    return catalog

load_solve_catalog(catalog_dir) #

Load parquet catalog files from a directory.

Looks for cmip6_catalog.parquet, cmip7_catalog.parquet, obs4mips_catalog.parquet, and pmp_climatology_catalog.parquet.

Parameters:

Name Type Description Default
catalog_dir Path

Directory containing parquet catalog files

required

Returns:

Type Description
dict[SourceDatasetType, DataFrame] | None

Mapping of source type to catalog DataFrame, or None if no catalogs found

Source code in packages/climate-ref/src/climate_ref/solve_helpers.py
def load_solve_catalog(catalog_dir: Path) -> dict[SourceDatasetType, pd.DataFrame] | None:
    """
    Load parquet catalog files from a directory.

    Looks for ``cmip6_catalog.parquet``, ``cmip7_catalog.parquet``,
    ``obs4mips_catalog.parquet``, and ``pmp_climatology_catalog.parquet``.

    Parameters
    ----------
    catalog_dir
        Directory containing parquet catalog files

    Returns
    -------
    :
        Mapping of source type to catalog DataFrame, or None if no catalogs found
    """
    if not catalog_dir.exists():
        return None

    catalog_files = {
        SourceDatasetType.CMIP6: "cmip6_catalog.parquet",
        SourceDatasetType.CMIP7: "cmip7_catalog.parquet",
        SourceDatasetType.obs4MIPs: "obs4mips_catalog.parquet",
        SourceDatasetType.PMPClimatology: "pmp_climatology_catalog.parquet",
    }

    result: dict[SourceDatasetType, pd.DataFrame] = {}
    for source_type, filename in catalog_files.items():
        path = catalog_dir / filename
        if path.exists():
            catalog = pd.read_parquet(path)

            # Convert start_time/end_time strings back to cftime objects
            if "start_time" in catalog.columns:
                cal = catalog["calendar"] if "calendar" in catalog.columns else "standard"
                catalog["start_time"] = parse_cftime_dates(catalog["start_time"], cal)
                catalog["end_time"] = parse_cftime_dates(catalog["end_time"], cal)

            # Apply the same version deduplication as DatasetAdapter.load_catalog()
            adapter = get_dataset_adapter(source_type.value)
            result[source_type] = adapter.filter_latest_versions(catalog)

    return result if result else None

solve_results_for_regression(results) #

Convert solve results to the dict format used by data_regression.check().

Produces {dataset_key: {source_type: [instance_id, ...]}} for use with data_regression.check().

When called with results filtered to a single diagnostic (recommended), dataset_key is unique and no data is lost. If results span multiple diagnostics, duplicate dataset_key values will overwrite earlier entries.

Parameters:

Name Type Description Default
results list[dict[str, Any]]

Results from :func:solve_to_results, ideally filtered to one diagnostic

required

Returns:

Type Description
dict[str, dict[str, list[str]]]

Dict keyed by dataset_key with source_type -> instance_id list values

Source code in packages/climate-ref/src/climate_ref/solve_helpers.py
def solve_results_for_regression(
    results: list[dict[str, Any]],
) -> dict[str, dict[str, list[str]]]:
    """
    Convert solve results to the dict format used by ``data_regression.check()``.

    Produces ``{dataset_key: {source_type: [instance_id, ...]}}``
    for use with ``data_regression.check()``.

    When called with results filtered to a single diagnostic (recommended),
    ``dataset_key`` is unique and no data is lost. If results span multiple
    diagnostics, duplicate ``dataset_key`` values will overwrite earlier entries.

    Parameters
    ----------
    results
        Results from :func:`solve_to_results`, ideally filtered to one diagnostic

    Returns
    -------
    :
        Dict keyed by ``dataset_key`` with source_type -> instance_id list values
    """
    output: dict[str, dict[str, list[str]]] = {}
    for r in results:
        output[r["dataset_key"]] = r["datasets"]
    return output

solve_to_results(data_catalog, providers, filters=None) #

Run the solver on a data catalog and collect results into a sorted list of dicts.

Parameters:

Name Type Description Default
data_catalog dict[SourceDatasetType, DataFrame]

Mapping of source type to catalog DataFrame

required
providers list[DiagnosticProvider]

List of diagnostic providers to solve for

required
filters SolveFilterOptions | None

Optional filters to restrict which diagnostics are solved

None

Returns:

Type Description
list[dict[str, Any]]

Sorted list of result dicts, each with keys: provider, diagnostic, dataset_key, selectors, datasets

Source code in packages/climate-ref/src/climate_ref/solve_helpers.py
def solve_to_results(
    data_catalog: dict[SourceDatasetType, pd.DataFrame],
    providers: list[DiagnosticProvider],
    filters: SolveFilterOptions | None = None,
) -> list[dict[str, Any]]:
    """
    Run the solver on a data catalog and collect results into a sorted list of dicts.

    Parameters
    ----------
    data_catalog
        Mapping of source type to catalog DataFrame
    providers
        List of diagnostic providers to solve for
    filters
        Optional filters to restrict which diagnostics are solved

    Returns
    -------
    :
        Sorted list of result dicts, each with keys: ``provider``, ``diagnostic``,
        ``dataset_key``, ``selectors``, ``datasets``
    """
    registry = ProviderRegistry(providers=providers)

    _data_catalog = {
        SourceDatasetType(source_type): DataCatalog.from_frame(df) for source_type, df in data_catalog.items()
    }
    solver = ExecutionSolver(provider_registry=registry, data_catalog=_data_catalog)

    results = []
    for execution in solver.solve(filters=filters):
        datasets: dict[str, list[str]] = {}
        for source_type, ds_collection in execution.datasets.items():
            instance_ids = sorted(ds_collection.instance_id.unique().tolist())
            datasets[str(source_type.value)] = instance_ids

        results.append(
            {
                "provider": execution.provider.slug,
                "diagnostic": execution.diagnostic.slug,
                "dataset_key": execution.dataset_key,
                "selectors": execution.selectors,
                "datasets": datasets,
            }
        )

    results.sort(key=lambda r: (r["provider"], r["diagnostic"], r["dataset_key"]))
    return results

write_catalog_parquet(catalog, output_path) #

Write a catalog DataFrame to parquet.

cftime.datetime objects in start_time/end_time are converted to strings before writing because pyarrow cannot serialize them.

Parameters:

Name Type Description Default
catalog DataFrame

DataFrame to write

required
output_path Path

Path for the output parquet file

required
Source code in packages/climate-ref/src/climate_ref/solve_helpers.py
def write_catalog_parquet(catalog: pd.DataFrame, output_path: Path) -> None:
    """
    Write a catalog DataFrame to parquet.

    cftime.datetime objects in ``start_time``/``end_time`` are converted to
    strings before writing because pyarrow cannot serialize them.

    Parameters
    ----------
    catalog
        DataFrame to write
    output_path
        Path for the output parquet file
    """
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Convert cftime objects to strings for parquet serialization
    catalog = catalog.copy()
    for col in ("start_time", "end_time"):
        if col in catalog.columns:
            catalog[col] = catalog[col].apply(
                lambda x: str(x) if x is not None and not isinstance(x, str) else x
            )

    catalog.to_parquet(output_path, index=False)