Skip to content

climate_ref.datasets.cmip7 #

CMIP7 Dataset Adapter

Adapter for parsing and registering CMIP7 datasets based on CMIP7 Global Attributes v1.0.

CMIP7DatasetAdapter #

Bases: FinaliseableDatasetAdapterMixin, DatasetAdapter

Adapter for CMIP7 datasets

Based on CMIP7 Global Attributes v1.0 (DOI: 10.5281/zenodo.17250297).

Source code in packages/climate-ref/src/climate_ref/datasets/cmip7.py
class CMIP7DatasetAdapter(FinaliseableDatasetAdapterMixin, DatasetAdapter):
    """
    Adapter for CMIP7 datasets

    Based on CMIP7 Global Attributes v1.0 (DOI: 10.5281/zenodo.17250297).
    """

    dataset_cls = CMIP7Dataset
    slug_column = "instance_id"

    columns_requiring_finalisation = frozenset(
        {
            # Optional information
            "realm",
            "nominal_resolution",
            "license_id",
            "external_variables",
            # Parent info
            "branch_time_in_child",
            "branch_time_in_parent",
            "parent_activity_id",
            "parent_experiment_id",
            "parent_mip_era",
            "parent_source_id",
            "parent_time_units",
            "parent_variant_label",
            # Variable metadata
            "standard_name",
            "long_name",
            "units",
            # Time metadata
            "time_units",
            "calendar",
        }
    )

    dataset_specific_metadata = (
        # Core DRS attributes
        "activity_id",
        "institution_id",
        "source_id",
        "experiment_id",
        "variant_label",
        "variable_id",
        "grid_label",
        "frequency",
        "region",
        "branding_suffix",
        "version",
        # Additional mandatory attributes
        "mip_era",
        "realm",
        "nominal_resolution",
        "license_id",
        # Conditionally required attributes
        "external_variables",
        # Parent info
        "branch_time_in_child",
        "branch_time_in_parent",
        "parent_activity_id",
        "parent_experiment_id",
        "parent_mip_era",
        "parent_source_id",
        "parent_time_units",
        "parent_variant_label",
        # Variable metadata
        "standard_name",
        "long_name",
        "units",
        # Time metadata
        "time_units",
        "calendar",
        # Finalisation status
        "finalised",
        # Unique identifier
        slug_column,
    )

    file_specific_metadata = ("start_time", "end_time", "path", "tracking_id")

    # Not stored in the DB; reconstructed by _add_derived_columns on every load.
    derived_metadata = ("branded_variable",)

    version_metadata = "version"

    # CMIP7 DRS directory structure (MIP-DRS7 spec):
    #   <drs_specs>/<mip_era>/<activity_id>/<institution_id>/.../<grid_label>/<version>
    # The leading drs_specs and mip_era are fixed values ("MIP-DRS7" and "CMIP7")
    # and are omitted here. They are added as the "CMIP7." prefix when building instance_id.
    dataset_id_metadata = (
        "activity_id",
        "institution_id",
        "source_id",
        "experiment_id",
        "variant_label",
        "region",
        "frequency",
        "variable_id",
        "branding_suffix",
        "grid_label",
    )

    def __init__(self, n_jobs: int = 1, config: Config | None = None):
        self.n_jobs = n_jobs
        self.config = config or Config.default()

    def get_complete_parser(self) -> DatasetParsingFunction:
        """
        Return the complete parser that opens files to extract full CMIP7 metadata.

        Returns
        -------
        :
            Complete CMIP7 parsing function
        """
        return parse_cmip7_complete

    def _post_finalise_fixes(self, datasets: pd.DataFrame) -> pd.DataFrame:
        """
        Apply CMIP7-specific fixes after finalisation.

        Cleans branch time values that may be stored as strings with units suffixes.

        Parameters
        ----------
        datasets
            DataFrame with finalised metadata

        Returns
        -------
        :
            DataFrame with fixes applied
        """
        if "branch_time_in_child" in datasets.columns:
            datasets["branch_time_in_child"] = clean_branch_time(datasets["branch_time_in_child"])
        if "branch_time_in_parent" in datasets.columns:
            datasets["branch_time_in_parent"] = clean_branch_time(datasets["branch_time_in_parent"])
        return datasets

    def get_parsing_function(self) -> DatasetParsingFunction:
        """
        Get the parsing function for CMIP7 datasets based on configuration

        The parsing function used is determined by the `cmip7_parser` configuration value:
        - "drs": Use the DRS parser (default)
        - "complete": Use the complete parser that extracts all available metadata

        Returns
        -------
        :
            The appropriate parsing function based on configuration
        """
        parser_type = self.config.cmip7_parser
        if parser_type == "complete":
            logger.info("Using complete CMIP7 parser")
            return parse_cmip7_complete
        else:
            logger.info(f"Using DRS CMIP7 parser (config value: {parser_type})")
            return parse_cmip7_drs

    def _enrich_parsed_catalog(self, datasets: pd.DataFrame) -> pd.DataFrame:
        """
        Apply CMIP7-specific post-parse enrichment to a raw catalog.

        Shared between :meth:`find_local_datasets` (whole-tree) and
        :meth:`iter_local_datasets` (streaming) so per-chunk processing is
        identical to a single-pass build.
        """
        # Convert the start_time and end_time columns to cftime objects
        cal = datasets["calendar"] if "calendar" in datasets.columns else "standard"
        if "start_time" in datasets.columns:
            datasets["start_time"] = parse_cftime_dates(datasets["start_time"], cal)
        if "end_time" in datasets.columns:
            datasets["end_time"] = parse_cftime_dates(datasets["end_time"], cal)

        # Clean branch times
        if "branch_time_in_child" in datasets.columns:
            datasets["branch_time_in_child"] = clean_branch_time(datasets["branch_time_in_child"])
        if "branch_time_in_parent" in datasets.columns:
            datasets["branch_time_in_parent"] = clean_branch_time(datasets["branch_time_in_parent"])

        # Build instance_id following CMIP7 DRS format
        # CMIP7.<activity_id>.<institution_id>.<source_id>.<experiment_id>.<variant_label>.
        # <region>.<frequency>.<variable_id>.<branding_suffix>.<grid_label>.<version>
        drs_items = [
            *self.dataset_id_metadata,
            self.version_metadata,
        ]
        datasets = build_instance_id(datasets, drs_items, prefix="CMIP7")

        # Add in any missing metadata columns
        missing_columns = set(self.dataset_specific_metadata + self.file_specific_metadata) - set(
            datasets.columns
        )
        for column in missing_columns:
            datasets[column] = pd.NA

        # Add branded_variable for the raw catalog (before DB ingestion)
        datasets = self._add_derived_columns(datasets)

        return datasets

    def _add_derived_columns(self, catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Add the derived ``branded_variable`` column (``{variable_id}_{branding_suffix}``).

        ``branded_variable`` is not stored in the database as it is derived from
        ``variable_id`` and ``branding_suffix``.
        Both inputs are mandatory CMIP7 DRS facets,
        so a catalog missing the columns or carrying null values is malformed and an exception is raised.
        """
        catalog = super()._add_derived_columns(catalog)

        required = ("variable_id", "branding_suffix")
        missing = [column for column in required if column not in catalog.columns]
        if missing:
            raise ValueError(
                f"Cannot derive 'branded_variable': catalog is missing required column(s) {missing}"
            )

        if catalog.empty:
            catalog["branded_variable"] = pd.Series(dtype="object")
            return catalog

        invalid = catalog["variable_id"].isna() | catalog["branding_suffix"].isna()
        if invalid.any():
            raise ValueError(
                "Cannot derive 'branded_variable': "
                f"'variable_id'/'branding_suffix' is null for {int(invalid.sum())} row(s)"
            )

        catalog["branded_variable"] = (
            catalog["variable_id"].astype(str) + "_" + catalog["branding_suffix"].astype(str)
        )
        return catalog

    def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame:
        """
        Generate a data catalog from the specified file or directory.

        Each dataset may contain multiple files, which are represented as rows in the data catalog.
        Each dataset has a unique identifier, which is in `slug_column`.

        Parameters
        ----------
        file_or_directory
            File or directory containing the datasets

        Returns
        -------
        :
            Data catalog containing the metadata for the dataset
        """
        parsing_function = self.get_parsing_function()

        datasets = build_catalog(
            paths=[str(file_or_directory)],
            parsing_func=parsing_function,
            include_patterns=["*.nc"],
            depth=10,
            n_jobs=self.n_jobs,
        )

        return self._enrich_parsed_catalog(datasets)

    def iter_local_datasets(
        self, file_or_directory: Path, chunk_size: int = 10_000
    ) -> Iterator[pd.DataFrame]:
        """
        Stream the data catalog in chunks to bound peak memory.

        Discovery walks the tree once, but parsing and DataFrame construction
        happen ``chunk_size`` files at a time. Chunks flush at directory
        boundaries so files belonging to the same dataset (which share a DRS
        version directory) stay together in a single chunk.

        Parameters
        ----------
        file_or_directory
            Root of the CMIP7 archive (or a single file) to ingest.
        chunk_size
            Soft target for the number of files per chunk. Increasing this
            trades higher peak memory for fewer per-chunk overheads.

        Yields
        ------
        :
            Catalog DataFrames, each containing metadata for one chunk of files.
            Empty chunks are skipped.
        """
        parsing_function = self.get_parsing_function()

        for raw_chunk in iter_built_catalogs(
            paths=[str(file_or_directory)],
            parsing_func=parsing_function,
            include_patterns=["*.nc"],
            depth=10,
            n_jobs=self.n_jobs,
            chunk_size=chunk_size,
        ):
            enriched = self._enrich_parsed_catalog(raw_chunk)
            if enriched.empty:
                continue
            yield enriched

find_local_datasets(file_or_directory) #

Generate a data catalog from the specified file or directory.

Each dataset may contain multiple files, which are represented as rows in the data catalog. Each dataset has a unique identifier, which is in slug_column.

Parameters:

Name Type Description Default
file_or_directory Path

File or directory containing the datasets

required

Returns:

Type Description
DataFrame

Data catalog containing the metadata for the dataset

Source code in packages/climate-ref/src/climate_ref/datasets/cmip7.py
def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame:
    """
    Generate a data catalog from the specified file or directory.

    Each dataset may contain multiple files, which are represented as rows in the data catalog.
    Each dataset has a unique identifier, which is in `slug_column`.

    Parameters
    ----------
    file_or_directory
        File or directory containing the datasets

    Returns
    -------
    :
        Data catalog containing the metadata for the dataset
    """
    parsing_function = self.get_parsing_function()

    datasets = build_catalog(
        paths=[str(file_or_directory)],
        parsing_func=parsing_function,
        include_patterns=["*.nc"],
        depth=10,
        n_jobs=self.n_jobs,
    )

    return self._enrich_parsed_catalog(datasets)

get_complete_parser() #

Return the complete parser that opens files to extract full CMIP7 metadata.

Returns:

Type Description
DatasetParsingFunction

Complete CMIP7 parsing function

Source code in packages/climate-ref/src/climate_ref/datasets/cmip7.py
def get_complete_parser(self) -> DatasetParsingFunction:
    """
    Return the complete parser that opens files to extract full CMIP7 metadata.

    Returns
    -------
    :
        Complete CMIP7 parsing function
    """
    return parse_cmip7_complete

get_parsing_function() #

Get the parsing function for CMIP7 datasets based on configuration

The parsing function used is determined by the cmip7_parser configuration value: - "drs": Use the DRS parser (default) - "complete": Use the complete parser that extracts all available metadata

Returns:

Type Description
DatasetParsingFunction

The appropriate parsing function based on configuration

Source code in packages/climate-ref/src/climate_ref/datasets/cmip7.py
def get_parsing_function(self) -> DatasetParsingFunction:
    """
    Get the parsing function for CMIP7 datasets based on configuration

    The parsing function used is determined by the `cmip7_parser` configuration value:
    - "drs": Use the DRS parser (default)
    - "complete": Use the complete parser that extracts all available metadata

    Returns
    -------
    :
        The appropriate parsing function based on configuration
    """
    parser_type = self.config.cmip7_parser
    if parser_type == "complete":
        logger.info("Using complete CMIP7 parser")
        return parse_cmip7_complete
    else:
        logger.info(f"Using DRS CMIP7 parser (config value: {parser_type})")
        return parse_cmip7_drs

iter_local_datasets(file_or_directory, chunk_size=10000) #

Stream the data catalog in chunks to bound peak memory.

Discovery walks the tree once, but parsing and DataFrame construction happen chunk_size files at a time. Chunks flush at directory boundaries so files belonging to the same dataset (which share a DRS version directory) stay together in a single chunk.

Parameters:

Name Type Description Default
file_or_directory Path

Root of the CMIP7 archive (or a single file) to ingest.

required
chunk_size int

Soft target for the number of files per chunk. Increasing this trades higher peak memory for fewer per-chunk overheads.

10000

Yields:

Type Description
DataFrame

Catalog DataFrames, each containing metadata for one chunk of files. Empty chunks are skipped.

Source code in packages/climate-ref/src/climate_ref/datasets/cmip7.py
def iter_local_datasets(
    self, file_or_directory: Path, chunk_size: int = 10_000
) -> Iterator[pd.DataFrame]:
    """
    Stream the data catalog in chunks to bound peak memory.

    Discovery walks the tree once, but parsing and DataFrame construction
    happen ``chunk_size`` files at a time. Chunks flush at directory
    boundaries so files belonging to the same dataset (which share a DRS
    version directory) stay together in a single chunk.

    Parameters
    ----------
    file_or_directory
        Root of the CMIP7 archive (or a single file) to ingest.
    chunk_size
        Soft target for the number of files per chunk. Increasing this
        trades higher peak memory for fewer per-chunk overheads.

    Yields
    ------
    :
        Catalog DataFrames, each containing metadata for one chunk of files.
        Empty chunks are skipped.
    """
    parsing_function = self.get_parsing_function()

    for raw_chunk in iter_built_catalogs(
        paths=[str(file_or_directory)],
        parsing_func=parsing_function,
        include_patterns=["*.nc"],
        depth=10,
        n_jobs=self.n_jobs,
        chunk_size=chunk_size,
    ):
        enriched = self._enrich_parsed_catalog(raw_chunk)
        if enriched.empty:
            continue
        yield enriched