Skip to content

climate_ref_core.esgf #

ESGF dataset fetching

This module provides classes for searching and fetching datasets from ESGF (Earth System Grid Federation) and other data registries.

CMIP6Request #

Bases: IntakeESGFMixin

Represents a CMIP6 dataset request.

These data are fetched from ESGF based on the provided facets.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/cmip6.py
class CMIP6Request(IntakeESGFMixin):
    """
    Represents a CMIP6 dataset request.

    These data are fetched from ESGF based on the provided facets.
    """

    source_type = "CMIP6"

    cmip6_path_items = (
        "mip_era",
        "activity_drs",
        "institution_id",
        "source_id",
        "experiment_id",
        "member_id",
        "table_id",
        "variable_id",
        "grid_label",
    )

    cmip6_filename_paths = (
        "variable_id",
        "table_id",
        "source_id",
        "experiment_id",
        "member_id",
        "grid_label",
    )

    available_facets = (
        "mip_era",
        "activity_drs",
        "institution_id",
        "source_id",
        "experiment_id",
        "member_id",
        "table_id",
        "variable_id",
        "grid_label",
        "version",
        "data_node",
    )

    def __init__(
        self,
        slug: str,
        facets: dict[str, Any],
        remove_ensembles: bool = False,
        time_span: tuple[str, str] | None = None,
    ):
        """
        Initialize a CMIP6 request.

        Parameters
        ----------
        slug
            Unique identifier for this request
        facets
            ESGF search facets (e.g., source_id, variable_id, experiment_id)
        remove_ensembles
            If True, keep only one ensemble member per model
        time_span
            Optional time range filter (start, end) in YYYY-MM format
        """
        self.slug = slug
        self.facets = facets
        self.remove_ensembles = remove_ensembles
        self.time_span = time_span

        for key in self.cmip6_path_items:
            if key not in self.available_facets:
                raise ValueError(f"Path item {key!r} not in available facets")
        for key in self.cmip6_filename_paths:
            if key not in self.available_facets:
                raise ValueError(f"Filename path {key!r} not in available facets")

    def __repr__(self) -> str:
        return f"CMIP6Request(slug={self.slug!r}, facets={self.facets!r})"

__init__(slug, facets, remove_ensembles=False, time_span=None) #

Initialize a CMIP6 request.

Parameters:

Name Type Description Default
slug str

Unique identifier for this request

required
facets dict[str, Any]

ESGF search facets (e.g., source_id, variable_id, experiment_id)

required
remove_ensembles bool

If True, keep only one ensemble member per model

False
time_span tuple[str, str] | None

Optional time range filter (start, end) in YYYY-MM format

None
Source code in packages/climate-ref-core/src/climate_ref_core/esgf/cmip6.py
def __init__(
    self,
    slug: str,
    facets: dict[str, Any],
    remove_ensembles: bool = False,
    time_span: tuple[str, str] | None = None,
):
    """
    Initialize a CMIP6 request.

    Parameters
    ----------
    slug
        Unique identifier for this request
    facets
        ESGF search facets (e.g., source_id, variable_id, experiment_id)
    remove_ensembles
        If True, keep only one ensemble member per model
    time_span
        Optional time range filter (start, end) in YYYY-MM format
    """
    self.slug = slug
    self.facets = facets
    self.remove_ensembles = remove_ensembles
    self.time_span = time_span

    for key in self.cmip6_path_items:
        if key not in self.available_facets:
            raise ValueError(f"Path item {key!r} not in available facets")
    for key in self.cmip6_filename_paths:
        if key not in self.available_facets:
            raise ValueError(f"Filename path {key!r} not in available facets")

CMIP7Request #

Represents a CMIP7 dataset request.

Since CMIP7 data is not yet available on ESGF, this class fetches CMIP6 data and converts it to CMIP7 format using convert_cmip6_dataset().

The facets use CMIP7 naming conventions (e.g., variant_label instead of member_id).

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/cmip7.py
class CMIP7Request:
    """
    Represents a CMIP7 dataset request.

    Since CMIP7 data is not yet available on ESGF, this class fetches
    CMIP6 data and converts it to CMIP7 format using convert_cmip6_dataset().

    The facets use CMIP7 naming conventions (e.g., variant_label instead of member_id).
    """

    source_type = "CMIP7"

    # Map CMIP7 facets to CMIP6 facets
    facet_mapping: ClassVar[dict[str, str]] = {
        "variant_label": "member_id",
    }

    # CMIP7-only facets that should not be passed to CMIP6 ESGF searches
    cmip7_only_facets: ClassVar[set[str]] = {
        "branded_variable",
        "region",
    }

    available_facets = (
        "activity_id",
        "institution_id",
        "source_id",
        "experiment_id",
        "variant_label",  # CMIP7 name for member_id
        "variable_id",
        "grid_label",
        "frequency",
        "table_id",  # Used for mapping to CMIP6
        "version",
        "region",
        "branded_variable",
    )

    def __init__(
        self,
        slug: str,
        facets: dict[str, Any],
        remove_ensembles: bool = False,
        time_span: tuple[str, str] | None = None,
    ):
        """
        Initialize a CMIP7 request.

        Parameters
        ----------
        slug
            Unique identifier for this request
        facets
            CMIP7 search facets (e.g., source_id, variable_id, variant_label)
        remove_ensembles
            If True, keep only one ensemble member per model
        time_span
            Optional time range filter (start, end) in YYYY-MM format
        """
        self.slug = slug
        self.facets = facets
        self.remove_ensembles = remove_ensembles
        self.time_span = time_span

        # Create corresponding CMIP6 facets
        self._cmip6_facets = self._convert_to_cmip6_facets(facets)

    def _convert_to_cmip6_facets(self, cmip7_facets: dict[str, Any]) -> dict[str, Any]:
        """Convert CMIP7 facets to CMIP6 facets for fetching."""
        cmip6_facets = {}
        for key, value in cmip7_facets.items():
            # Skip CMIP7-only facets that don't exist in CMIP6 ESGF
            if key in self.cmip7_only_facets:
                continue
            # Map CMIP7 facet names to CMIP6
            cmip6_key = self.facet_mapping.get(key, key)
            cmip6_facets[cmip6_key] = value
        return cmip6_facets

    def _convert_to_cmip7_metadata(self, cmip6_row: dict[str, Any]) -> dict[str, Any]:
        """Convert a subset of CMIP6 metadata to CMIP7 format.

        This is the single location for DReq enrichment: it updates
        ``variable_id`` and adds ``region``, ``branding_suffix``, and
        ``branded_variable`` from the Data Request when available.
        """
        cmip7_row = dict(cmip6_row)

        # Map member_id to variant_label
        if "member_id" in cmip7_row:
            cmip7_row["variant_label"] = cmip7_row.pop("member_id")

        # Add CMIP7-specific metadata
        cmip7_row["mip_era"] = "CMIP7"

        # CMIP6 activity_id can contain multiple activities separated by spaces
        # (e.g. "C4MIP CDRMIP"). Use only the first activity for CMIP7.
        if "activity_id" in cmip7_row and " " in str(cmip7_row["activity_id"]):
            cmip7_row["activity_id"] = str(cmip7_row["activity_id"]).split()[0]

        # Map table_id to frequency if not present
        if "frequency" not in cmip7_row and "table_id" in cmip7_row:
            cmip7_row["frequency"] = get_frequency_from_table(cmip7_row["table_id"])

        # Enrich with DReq metadata
        table_id = cmip7_row.get("table_id")
        variable_id = cmip7_row.get("variable_id")
        if table_id and variable_id:
            try:
                entry = get_dreq_entry(table_id, variable_id)
                cmip7_row["region"] = entry.region
                cmip7_row["variable_id"] = entry.variable_id
                cmip7_row["branding_suffix"] = entry.branding_suffix
                cmip7_row["branded_variable"] = entry.branded_variable
            except KeyError:
                logger.error(
                    f"No DReq entry for {table_id}.{variable_id}, region/branding_suffix will not be set"
                )

        return cmip7_row

    def fetch_datasets(self) -> pd.DataFrame:
        """
        Fetch CMIP6 datasets and convert them to CMIP7 format.

        Returns
        -------
        pd.DataFrame
            DataFrame containing CMIP7 dataset metadata and file paths.
        """
        # Create a CMIP6 request with converted facets
        cmip6_request = CMIP6Request(
            slug=f"{self.slug}-cmip6-source",
            facets=self._cmip6_facets,
            remove_ensembles=self.remove_ensembles,
            time_span=self.time_span,
        )

        # Fetch CMIP6 datasets
        cmip6_df = cmip6_request.fetch_datasets()

        if cmip6_df.empty:
            return cmip6_df

        # Convert each file and update metadata
        # The returned DataFrame does not need to have the complete set of CMIP7 metadata
        # Datasets will be re-read during DB ingestion during the tests
        converted_rows = []
        for _, row in cmip6_df.iterrows():
            row_dict: dict[str, Any] = {str(k): v for k, v in row.to_dict().items()}

            # Build CMIP7 facets for this row (includes DReq enrichment)
            cmip7_row = self._convert_to_cmip7_metadata(row_dict)

            # Get file paths and convert them
            files = row_dict.get("files", [])
            converted_files = []
            for file_path in files:
                cmip6_path = Path(file_path)
                if cmip6_path.exists():
                    try:
                        cmip7_path = _convert_file_to_cmip7(cmip6_path, cmip7_row)
                        converted_files.append(str(cmip7_path))
                    except Exception as e:
                        logger.exception(f"Failed to convert {cmip6_path.name}: {e}")
                        continue
                else:
                    logger.warning(f"CMIP6 file not found: {file_path}")

            if converted_files:
                cmip7_row["files"] = converted_files
                converted_rows.append(cmip7_row)

        if not converted_rows:
            logger.warning(f"No files converted for request: {self.slug}")
            return pd.DataFrame()

        return pd.DataFrame(converted_rows)

    def __repr__(self) -> str:
        return f"CMIP7Request(slug={self.slug!r}, facets={self.facets!r})"

__init__(slug, facets, remove_ensembles=False, time_span=None) #

Initialize a CMIP7 request.

Parameters:

Name Type Description Default
slug str

Unique identifier for this request

required
facets dict[str, Any]

CMIP7 search facets (e.g., source_id, variable_id, variant_label)

required
remove_ensembles bool

If True, keep only one ensemble member per model

False
time_span tuple[str, str] | None

Optional time range filter (start, end) in YYYY-MM format

None
Source code in packages/climate-ref-core/src/climate_ref_core/esgf/cmip7.py
def __init__(
    self,
    slug: str,
    facets: dict[str, Any],
    remove_ensembles: bool = False,
    time_span: tuple[str, str] | None = None,
):
    """
    Initialize a CMIP7 request.

    Parameters
    ----------
    slug
        Unique identifier for this request
    facets
        CMIP7 search facets (e.g., source_id, variable_id, variant_label)
    remove_ensembles
        If True, keep only one ensemble member per model
    time_span
        Optional time range filter (start, end) in YYYY-MM format
    """
    self.slug = slug
    self.facets = facets
    self.remove_ensembles = remove_ensembles
    self.time_span = time_span

    # Create corresponding CMIP6 facets
    self._cmip6_facets = self._convert_to_cmip6_facets(facets)

fetch_datasets() #

Fetch CMIP6 datasets and convert them to CMIP7 format.

Returns:

Type Description
DataFrame

DataFrame containing CMIP7 dataset metadata and file paths.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/cmip7.py
def fetch_datasets(self) -> pd.DataFrame:
    """
    Fetch CMIP6 datasets and convert them to CMIP7 format.

    Returns
    -------
    pd.DataFrame
        DataFrame containing CMIP7 dataset metadata and file paths.
    """
    # Create a CMIP6 request with converted facets
    cmip6_request = CMIP6Request(
        slug=f"{self.slug}-cmip6-source",
        facets=self._cmip6_facets,
        remove_ensembles=self.remove_ensembles,
        time_span=self.time_span,
    )

    # Fetch CMIP6 datasets
    cmip6_df = cmip6_request.fetch_datasets()

    if cmip6_df.empty:
        return cmip6_df

    # Convert each file and update metadata
    # The returned DataFrame does not need to have the complete set of CMIP7 metadata
    # Datasets will be re-read during DB ingestion during the tests
    converted_rows = []
    for _, row in cmip6_df.iterrows():
        row_dict: dict[str, Any] = {str(k): v for k, v in row.to_dict().items()}

        # Build CMIP7 facets for this row (includes DReq enrichment)
        cmip7_row = self._convert_to_cmip7_metadata(row_dict)

        # Get file paths and convert them
        files = row_dict.get("files", [])
        converted_files = []
        for file_path in files:
            cmip6_path = Path(file_path)
            if cmip6_path.exists():
                try:
                    cmip7_path = _convert_file_to_cmip7(cmip6_path, cmip7_row)
                    converted_files.append(str(cmip7_path))
                except Exception as e:
                    logger.exception(f"Failed to convert {cmip6_path.name}: {e}")
                    continue
            else:
                logger.warning(f"CMIP6 file not found: {file_path}")

        if converted_files:
            cmip7_row["files"] = converted_files
            converted_rows.append(cmip7_row)

    if not converted_rows:
        logger.warning(f"No files converted for request: {self.slug}")
        return pd.DataFrame()

    return pd.DataFrame(converted_rows)

ESGFFetcher #

Fetches datasets from ESGF and returns metadata with file paths.

Uses intake-esgf to search and download datasets. Files that cannot be found locally are stored in intake-esgf's cache directory.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/fetcher.py
class ESGFFetcher:
    """
    Fetches datasets from ESGF and returns metadata with file paths.

    Uses intake-esgf to search and download datasets.
    Files that cannot be found locally are stored in intake-esgf's cache directory.
    """

    def fetch_request(self, request: ESGFRequest) -> pd.DataFrame:
        """
        Fetch datasets for a single ESGF request.

        Parameters
        ----------
        request
            The ESGF request specifying what to fetch

        Returns
        -------
        pd.DataFrame
            DataFrame containing dataset metadata and file paths.
            Each row represents one file, with a 'path' column pointing
            to the file (either in intake-esgf's cache or one of the root data locations).

            This format is not identical to the DataCatalog, but it is broadly compatible.
        """
        logger.info(f"Fetching datasets for request: {request.slug}")

        # Search ESGF for matching datasets
        datasets_df = request.fetch_datasets()

        if datasets_df.empty:
            logger.warning(f"No datasets found for request: {request.slug}")
            return pd.DataFrame()

        logger.info(f"Found {len(datasets_df)} datasets for request: {request.slug}")

        # Expand files column - each file becomes a row with a 'path' column
        rows = []
        for _, row in datasets_df.iterrows():
            files = row.get("files", [])
            if not files:
                logger.warning(f"No files for dataset: {row.get('key', 'unknown')}")
                continue

            for file_path in files:
                if not Path(file_path).exists():
                    logger.warning(f"File not found (may need to download from ESGF): {file_path}")
                    continue

                row_copy = row.to_dict()
                row_copy["path"] = str(file_path)
                rows.append(row_copy)

        if not rows:
            logger.warning(f"No files found for request: {request.slug}")
            return pd.DataFrame()

        result = pd.DataFrame(rows)
        result["source_type"] = request.source_type

        logger.info(f"Fetched {len(result)} files for request: {request.slug}")
        return result

    def fetch_for_test_case(
        self,
        requests: tuple[ESGFRequest, ...] | None,
    ) -> pd.DataFrame:
        """
        Fetch all data for a test case's requests.

        Parameters
        ----------
        requests
            The ESGF requests from the test case

        Returns
        -------
        pd.DataFrame
            Combined DataFrame with all datasets, grouped by source_type
        """
        if not requests:
            return pd.DataFrame()

        dfs = []
        for request in requests:
            df = self.fetch_request(request)
            if not df.empty:
                dfs.append(df)

        if not dfs:
            return pd.DataFrame()

        return pd.concat(dfs, ignore_index=True)

    def list_requests_for_diagnostic(self, diagnostic: Diagnostic) -> list[tuple[str, ESGFRequest]]:
        """
        List all ESGF requests for a diagnostic across all test cases.

        Parameters
        ----------
        diagnostic
            The diagnostic to list requests for

        Returns
        -------
        list[tuple[str, ESGFRequest]]
            List of (test_case_name, request) tuples
        """
        if diagnostic.test_data_spec is None:
            return []

        results: list[tuple[str, ESGFRequest]] = []

        for test_case in diagnostic.test_data_spec.test_cases:
            if test_case.requests:
                for request in test_case.requests:
                    results.append((test_case.name, request))

        return results

fetch_for_test_case(requests) #

Fetch all data for a test case's requests.

Parameters:

Name Type Description Default
requests tuple[ESGFRequest, ...] | None

The ESGF requests from the test case

required

Returns:

Type Description
DataFrame

Combined DataFrame with all datasets, grouped by source_type

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/fetcher.py
def fetch_for_test_case(
    self,
    requests: tuple[ESGFRequest, ...] | None,
) -> pd.DataFrame:
    """
    Fetch all data for a test case's requests.

    Parameters
    ----------
    requests
        The ESGF requests from the test case

    Returns
    -------
    pd.DataFrame
        Combined DataFrame with all datasets, grouped by source_type
    """
    if not requests:
        return pd.DataFrame()

    dfs = []
    for request in requests:
        df = self.fetch_request(request)
        if not df.empty:
            dfs.append(df)

    if not dfs:
        return pd.DataFrame()

    return pd.concat(dfs, ignore_index=True)

fetch_request(request) #

Fetch datasets for a single ESGF request.

Parameters:

Name Type Description Default
request ESGFRequest

The ESGF request specifying what to fetch

required

Returns:

Type Description
DataFrame

DataFrame containing dataset metadata and file paths. Each row represents one file, with a 'path' column pointing to the file (either in intake-esgf's cache or one of the root data locations).

This format is not identical to the DataCatalog, but it is broadly compatible.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/fetcher.py
def fetch_request(self, request: ESGFRequest) -> pd.DataFrame:
    """
    Fetch datasets for a single ESGF request.

    Parameters
    ----------
    request
        The ESGF request specifying what to fetch

    Returns
    -------
    pd.DataFrame
        DataFrame containing dataset metadata and file paths.
        Each row represents one file, with a 'path' column pointing
        to the file (either in intake-esgf's cache or one of the root data locations).

        This format is not identical to the DataCatalog, but it is broadly compatible.
    """
    logger.info(f"Fetching datasets for request: {request.slug}")

    # Search ESGF for matching datasets
    datasets_df = request.fetch_datasets()

    if datasets_df.empty:
        logger.warning(f"No datasets found for request: {request.slug}")
        return pd.DataFrame()

    logger.info(f"Found {len(datasets_df)} datasets for request: {request.slug}")

    # Expand files column - each file becomes a row with a 'path' column
    rows = []
    for _, row in datasets_df.iterrows():
        files = row.get("files", [])
        if not files:
            logger.warning(f"No files for dataset: {row.get('key', 'unknown')}")
            continue

        for file_path in files:
            if not Path(file_path).exists():
                logger.warning(f"File not found (may need to download from ESGF): {file_path}")
                continue

            row_copy = row.to_dict()
            row_copy["path"] = str(file_path)
            rows.append(row_copy)

    if not rows:
        logger.warning(f"No files found for request: {request.slug}")
        return pd.DataFrame()

    result = pd.DataFrame(rows)
    result["source_type"] = request.source_type

    logger.info(f"Fetched {len(result)} files for request: {request.slug}")
    return result

list_requests_for_diagnostic(diagnostic) #

List all ESGF requests for a diagnostic across all test cases.

Parameters:

Name Type Description Default
diagnostic AbstractDiagnostic

The diagnostic to list requests for

required

Returns:

Type Description
list[tuple[str, ESGFRequest]]

List of (test_case_name, request) tuples

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/fetcher.py
def list_requests_for_diagnostic(self, diagnostic: Diagnostic) -> list[tuple[str, ESGFRequest]]:
    """
    List all ESGF requests for a diagnostic across all test cases.

    Parameters
    ----------
    diagnostic
        The diagnostic to list requests for

    Returns
    -------
    list[tuple[str, ESGFRequest]]
        List of (test_case_name, request) tuples
    """
    if diagnostic.test_data_spec is None:
        return []

    results: list[tuple[str, ESGFRequest]] = []

    for test_case in diagnostic.test_data_spec.test_cases:
        if test_case.requests:
            for request in test_case.requests:
                results.append((test_case.name, request))

    return results

ESGFRequest #

Bases: Protocol

Protocol for ESGF dataset requests.

Implementations provide the logic for searching ESGF and generating output paths for downloaded datasets.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
@runtime_checkable
class ESGFRequest(Protocol):
    """
    Protocol for ESGF dataset requests.

    Implementations provide the logic for searching ESGF and generating
    output paths for downloaded datasets.
    """

    slug: str
    """Unique identifier for this request."""

    source_type: str
    """Type of dataset (e.g., 'CMIP6', 'obs4MIPs')."""

    time_span: tuple[str, str] | None
    """Optional time range to filter datasets (start, end)."""

    def fetch_datasets(self) -> pd.DataFrame:
        """
        Fetch dataset metadata from ESGF.

        Returns
        -------
        pd.DataFrame
            DataFrame containing dataset metadata and file paths.
            Must contain at minimum:
            - key: A unique identifier for the dataset
            - files: A list of files for the dataset
        """
        ...

slug instance-attribute #

Unique identifier for this request.

source_type instance-attribute #

Type of dataset (e.g., 'CMIP6', 'obs4MIPs').

time_span instance-attribute #

Optional time range to filter datasets (start, end).

fetch_datasets() #

Fetch dataset metadata from ESGF.

Returns:

Type Description
DataFrame

DataFrame containing dataset metadata and file paths. Must contain at minimum: - key: A unique identifier for the dataset - files: A list of files for the dataset

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
def fetch_datasets(self) -> pd.DataFrame:
    """
    Fetch dataset metadata from ESGF.

    Returns
    -------
    pd.DataFrame
        DataFrame containing dataset metadata and file paths.
        Must contain at minimum:
        - key: A unique identifier for the dataset
        - files: A list of files for the dataset
    """
    ...

IntakeESGFMixin #

Mixin that fetches datasets from ESGF using intake-esgf.

Subclasses must define: - facets: dict[str, str | tuple[str, ...]] - remove_ensembles: bool - time_span: tuple[str, str] | None

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
class IntakeESGFMixin:
    """
    Mixin that fetches datasets from ESGF using intake-esgf.

    Subclasses must define:
    - facets: dict[str, str | tuple[str, ...]]
    - remove_ensembles: bool
    - time_span: tuple[str, str] | None
    """

    facets: dict[str, str | tuple[str, ...]]
    remove_ensembles: bool
    time_span: tuple[str, str] | None

    def fetch_datasets(self) -> pd.DataFrame:
        """Fetch dataset metadata from ESGF."""
        facets: dict[str, Any] = dict(self.facets)
        if self.time_span:
            facets["file_start"] = self.time_span[0]
            facets["file_end"] = self.time_span[1]

        # Convert tuples to lists for intake-esgf compatibility
        for key, value in facets.items():
            if isinstance(value, tuple):
                facets[key] = list(value)

        cat = ESGFCatalog()  # type: ignore[no-untyped-call]
        cat.search(**facets)

        if self.remove_ensembles:
            cat.remove_ensembles()

        path_dict = cat.to_path_dict(prefer_streaming=False, minimal_keys=False, quiet=True)
        if cat.df is None or cat.df.empty:
            raise ValueError("No datasets found for the given ESGF request")
        merged_df = cat.df.merge(pd.Series(path_dict, name="files"), left_on="key", right_index=True)

        if self.time_span:
            merged_df["time_start"] = self.time_span[0]
            merged_df["time_end"] = self.time_span[1]

        return _deduplicate_datasets(merged_df)

fetch_datasets() #

Fetch dataset metadata from ESGF.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
def fetch_datasets(self) -> pd.DataFrame:
    """Fetch dataset metadata from ESGF."""
    facets: dict[str, Any] = dict(self.facets)
    if self.time_span:
        facets["file_start"] = self.time_span[0]
        facets["file_end"] = self.time_span[1]

    # Convert tuples to lists for intake-esgf compatibility
    for key, value in facets.items():
        if isinstance(value, tuple):
            facets[key] = list(value)

    cat = ESGFCatalog()  # type: ignore[no-untyped-call]
    cat.search(**facets)

    if self.remove_ensembles:
        cat.remove_ensembles()

    path_dict = cat.to_path_dict(prefer_streaming=False, minimal_keys=False, quiet=True)
    if cat.df is None or cat.df.empty:
        raise ValueError("No datasets found for the given ESGF request")
    merged_df = cat.df.merge(pd.Series(path_dict, name="files"), left_on="key", right_index=True)

    if self.time_span:
        merged_df["time_start"] = self.time_span[0]
        merged_df["time_end"] = self.time_span[1]

    return _deduplicate_datasets(merged_df)

Obs4MIPsRequest #

Bases: IntakeESGFMixin

Represents an Obs4MIPs dataset request.

These data are fetched from ESGF based on the provided facets.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/obs4mips.py
class Obs4MIPsRequest(IntakeESGFMixin):
    """
    Represents an Obs4MIPs dataset request.

    These data are fetched from ESGF based on the provided facets.
    """

    source_type = "obs4MIPs"

    obs4mips_path_items = (
        "activity_id",
        "institution_id",
        "source_id",
        "variable_id",
        "grid_label",
    )

    obs4mips_filename_paths = (
        "variable_id",
        "source_id",
        "grid_label",
    )

    avail_facets = (
        "activity_id",
        "institution_id",
        "source_id",
        "frequency",
        "variable_id",
        "grid_label",
        "version",
        "data_node",
        "project",
    )

    def __init__(
        self,
        slug: str,
        facets: dict[str, Any],
        remove_ensembles: bool = False,
        time_span: tuple[str, str] | None = None,
    ):
        """
        Initialize an Obs4MIPs request.

        Parameters
        ----------
        slug
            Unique identifier for this request
        facets
            ESGF search facets (e.g., source_id, variable_id)
        remove_ensembles
            If True, keep only one ensemble member (typically not relevant for obs)
        time_span
            Optional time range filter (start, end) in YYYY-MM format
        """
        self.slug = slug
        self.facets = {"project": "obs4MIPs", **facets}
        self.remove_ensembles = remove_ensembles
        self.time_span = time_span

        for key in self.obs4mips_path_items:
            if key not in self.avail_facets:
                raise ValueError(f"Path item {key!r} not in available facets")
        for key in self.obs4mips_filename_paths:
            if key not in self.avail_facets:
                raise ValueError(f"Filename path {key!r} not in available facets")

    def fetch_datasets(self) -> pd.DataFrame:
        """Fetch dataset metadata from ESGF with project=obs4MIPs."""
        # Ensure project facet is set to obs4MIPs
        if "project" not in self.facets:
            self.facets["project"] = "obs4MIPs"

        return super().fetch_datasets()

    def __repr__(self) -> str:
        return f"Obs4MIPsRequest(slug={self.slug!r}, facets={self.facets!r})"

__init__(slug, facets, remove_ensembles=False, time_span=None) #

Initialize an Obs4MIPs request.

Parameters:

Name Type Description Default
slug str

Unique identifier for this request

required
facets dict[str, Any]

ESGF search facets (e.g., source_id, variable_id)

required
remove_ensembles bool

If True, keep only one ensemble member (typically not relevant for obs)

False
time_span tuple[str, str] | None

Optional time range filter (start, end) in YYYY-MM format

None
Source code in packages/climate-ref-core/src/climate_ref_core/esgf/obs4mips.py
def __init__(
    self,
    slug: str,
    facets: dict[str, Any],
    remove_ensembles: bool = False,
    time_span: tuple[str, str] | None = None,
):
    """
    Initialize an Obs4MIPs request.

    Parameters
    ----------
    slug
        Unique identifier for this request
    facets
        ESGF search facets (e.g., source_id, variable_id)
    remove_ensembles
        If True, keep only one ensemble member (typically not relevant for obs)
    time_span
        Optional time range filter (start, end) in YYYY-MM format
    """
    self.slug = slug
    self.facets = {"project": "obs4MIPs", **facets}
    self.remove_ensembles = remove_ensembles
    self.time_span = time_span

    for key in self.obs4mips_path_items:
        if key not in self.avail_facets:
            raise ValueError(f"Path item {key!r} not in available facets")
    for key in self.obs4mips_filename_paths:
        if key not in self.avail_facets:
            raise ValueError(f"Filename path {key!r} not in available facets")

fetch_datasets() #

Fetch dataset metadata from ESGF with project=obs4MIPs.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/obs4mips.py
def fetch_datasets(self) -> pd.DataFrame:
    """Fetch dataset metadata from ESGF with project=obs4MIPs."""
    # Ensure project facet is set to obs4MIPs
    if "project" not in self.facets:
        self.facets["project"] = "obs4MIPs"

    return super().fetch_datasets()

RegistryRequest #

Request for data from a pooch registry (e.g., pmp-climatology).

These data are fetched from a pooch registry rather than ESGF. This is useful for pre-processed datasets like PMP climatologies that are hosted externally but not on ESGF.

Parameters:

Name Type Description Default
slug str

Unique identifier for this request

required
registry_name str

Name of the registry to fetch from (e.g., "pmp-climatology")

required
facets dict[str, str | tuple[str, ...]]

Facets to filter datasets (e.g., {"variable_id": "psl", "source_id": "ERA-5"})

required
source_type str

Type of dataset source (default: "PMPClimatology")

'PMPClimatology'
time_span tuple[str, str] | None

Optional time range filter (not used for registry filtering, but required for protocol)

None
Example
request = RegistryRequest(
    slug="era5-psl",
    registry_name="pmp-climatology",
    facets={"variable_id": "psl", "source_id": "ERA-5"},
)
df = request.fetch_datasets()
Source code in packages/climate-ref-core/src/climate_ref_core/esgf/registry.py
class RegistryRequest:
    """
    Request for data from a pooch registry (e.g., pmp-climatology).

    These data are fetched from a pooch registry rather than ESGF.
    This is useful for pre-processed datasets like PMP climatologies
    that are hosted externally but not on ESGF.

    Parameters
    ----------
    slug
        Unique identifier for this request
    registry_name
        Name of the registry to fetch from (e.g., "pmp-climatology")
    facets
        Facets to filter datasets (e.g., {"variable_id": "psl", "source_id": "ERA-5"})
    source_type
        Type of dataset source (default: "PMPClimatology")
    time_span
        Optional time range filter (not used for registry filtering, but required for protocol)

    Example
    -------
    ```python
    request = RegistryRequest(
        slug="era5-psl",
        registry_name="pmp-climatology",
        facets={"variable_id": "psl", "source_id": "ERA-5"},
    )
    df = request.fetch_datasets()
    ```
    """

    def __init__(
        self,
        slug: str,
        registry_name: str,
        facets: dict[str, str | tuple[str, ...]],
        source_type: str = "PMPClimatology",
        time_span: tuple[str, str] | None = None,
    ) -> None:
        self.slug = slug
        self.registry_name = registry_name
        self.facets = facets
        self.source_type = source_type
        self.time_span = time_span

    def __repr__(self) -> str:
        return (
            f"RegistryRequest(slug={self.slug!r}, registry_name={self.registry_name!r}, "
            f"facets={self.facets!r}, source_type={self.source_type!r}, time_span={self.time_span!r})"
        )

    def _get_parser(self) -> Callable[[str], dict[str, Any]]:
        """Get the appropriate parser function based on registry name."""
        if self.registry_name == "pmp-climatology":
            return _parse_pmp_climatology_key
        elif self.registry_name == "obs4ref":
            return _parse_obs4ref_key
        else:
            # Default to obs4ref parser as fallback
            logger.warning(f"Unknown registry '{self.registry_name}', using obs4ref parser")
            return _parse_obs4ref_key

    def fetch_datasets(self) -> pd.DataFrame:
        """
        Fetch matching datasets from the registry.

        Returns
        -------
            DataFrame containing dataset metadata and file paths.
            Each row represents one file, with columns for metadata
            and a 'files' column containing a list with the file path.
        """
        logger.info(f"Fetching from registry '{self.registry_name}' for request: {self.slug}")

        try:
            registry = dataset_registry_manager[self.registry_name]
        except KeyError:
            raise ValueError(
                f"Registry '{self.registry_name}' not found. "
                f"Available registries: {list(dataset_registry_manager.keys())}"
            )

        parser = self._get_parser()
        matching_rows: list[dict[str, Any]] = []

        for key in registry.registry.keys():
            # Parse metadata from the registry key
            metadata = parser(key)
            if not metadata:
                continue

            # Check if it matches the requested facets
            if not _matches_facets(metadata, self.facets):
                continue

            # Fetch the file (downloads if not cached)
            try:
                file_path = registry.fetch(key)
                logger.debug(f"Fetched: {key} -> {file_path}")
            except Exception as e:
                logger.warning(f"Failed to fetch {key}: {e}")
                continue

            # Build row compatible with ESGFFetcher expectations
            row = {
                **metadata,
                "files": [file_path],
                "path": file_path,
            }
            matching_rows.append(row)

        if not matching_rows:
            logger.warning(f"No datasets found matching facets: {self.facets}")
            return pd.DataFrame()

        result = pd.DataFrame(matching_rows)

        # Filter to only the latest version for each unique dataset
        # Datasets are identified by source_id, variable_id, and grid_label
        if "version" in result.columns:
            group_by_cols = ["source_id", "variable_id", "grid_label"]
            # Only group by columns that exist in the DataFrame
            group_by_cols = [col for col in group_by_cols if col in result.columns]
            if group_by_cols:
                max_version = result.groupby(group_by_cols, sort=False)["version"].transform("max")
                result = result[result["version"] == max_version]

        logger.info(f"Found {len(result)} datasets matching request: {self.slug}")

        return result

fetch_datasets() #

Fetch matching datasets from the registry.

Returns:

Type Description
DataFrame containing dataset metadata and file paths.

Each row represents one file, with columns for metadata and a 'files' column containing a list with the file path.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/registry.py
def fetch_datasets(self) -> pd.DataFrame:
    """
    Fetch matching datasets from the registry.

    Returns
    -------
        DataFrame containing dataset metadata and file paths.
        Each row represents one file, with columns for metadata
        and a 'files' column containing a list with the file path.
    """
    logger.info(f"Fetching from registry '{self.registry_name}' for request: {self.slug}")

    try:
        registry = dataset_registry_manager[self.registry_name]
    except KeyError:
        raise ValueError(
            f"Registry '{self.registry_name}' not found. "
            f"Available registries: {list(dataset_registry_manager.keys())}"
        )

    parser = self._get_parser()
    matching_rows: list[dict[str, Any]] = []

    for key in registry.registry.keys():
        # Parse metadata from the registry key
        metadata = parser(key)
        if not metadata:
            continue

        # Check if it matches the requested facets
        if not _matches_facets(metadata, self.facets):
            continue

        # Fetch the file (downloads if not cached)
        try:
            file_path = registry.fetch(key)
            logger.debug(f"Fetched: {key} -> {file_path}")
        except Exception as e:
            logger.warning(f"Failed to fetch {key}: {e}")
            continue

        # Build row compatible with ESGFFetcher expectations
        row = {
            **metadata,
            "files": [file_path],
            "path": file_path,
        }
        matching_rows.append(row)

    if not matching_rows:
        logger.warning(f"No datasets found matching facets: {self.facets}")
        return pd.DataFrame()

    result = pd.DataFrame(matching_rows)

    # Filter to only the latest version for each unique dataset
    # Datasets are identified by source_id, variable_id, and grid_label
    if "version" in result.columns:
        group_by_cols = ["source_id", "variable_id", "grid_label"]
        # Only group by columns that exist in the DataFrame
        group_by_cols = [col for col in group_by_cols if col in result.columns]
        if group_by_cols:
            max_version = result.groupby(group_by_cols, sort=False)["version"].transform("max")
            result = result[result["version"] == max_version]

    logger.info(f"Found {len(result)} datasets matching request: {self.slug}")

    return result

sub-packages#

Sub-package Description
base Base classes and protocols for ESGF data requests.
cmip6 CMIP6 dataset request implementation.
cmip7 CMIP7 dataset request implementation.
fetcher ESGF dataset fetcher for downloading test data.
obs4mips Obs4MIPs dataset request implementation.
registry Registry-based dataset request implementation.