Skip to content

climate_ref_core.esgf.base #

Base classes and protocols for ESGF data requests.

This module provides the infrastructure for fetching datasets from ESGF using the intake-esgf package.

ESGFRequest #

Bases: Protocol

Protocol for ESGF dataset requests.

Implementations provide the logic for searching ESGF and generating output paths for downloaded datasets.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
@runtime_checkable
class ESGFRequest(Protocol):
    """
    Protocol for ESGF dataset requests.

    Implementations provide the logic for searching ESGF and generating
    output paths for downloaded datasets.
    """

    slug: str
    """Unique identifier for this request."""

    source_type: str
    """Type of dataset (e.g., 'CMIP6', 'obs4MIPs')."""

    time_span: tuple[str, str] | None
    """Optional time range to filter datasets (start, end)."""

    def fetch_datasets(self) -> pd.DataFrame:
        """
        Fetch dataset metadata from ESGF.

        Returns
        -------
        pd.DataFrame
            DataFrame containing dataset metadata and file paths.
            Must contain at minimum:
            - key: A unique identifier for the dataset
            - files: A list of files for the dataset
        """
        ...

slug instance-attribute #

Unique identifier for this request.

source_type instance-attribute #

Type of dataset (e.g., 'CMIP6', 'obs4MIPs').

time_span instance-attribute #

Optional time range to filter datasets (start, end).

fetch_datasets() #

Fetch dataset metadata from ESGF.

Returns:

Type Description
DataFrame

DataFrame containing dataset metadata and file paths. Must contain at minimum: - key: A unique identifier for the dataset - files: A list of files for the dataset

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
def fetch_datasets(self) -> pd.DataFrame:
    """
    Fetch dataset metadata from ESGF.

    Returns
    -------
    pd.DataFrame
        DataFrame containing dataset metadata and file paths.
        Must contain at minimum:
        - key: A unique identifier for the dataset
        - files: A list of files for the dataset
    """
    ...

IntakeESGFMixin #

Mixin that fetches datasets from ESGF using intake-esgf.

Subclasses must define: - facets: dict[str, str | tuple[str, ...]] - remove_ensembles: bool - time_span: tuple[str, str] | None

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
class IntakeESGFMixin:
    """
    Mixin that fetches datasets from ESGF using intake-esgf.

    Subclasses must define:
    - facets: dict[str, str | tuple[str, ...]]
    - remove_ensembles: bool
    - time_span: tuple[str, str] | None
    """

    facets: dict[str, str | tuple[str, ...]]
    remove_ensembles: bool
    time_span: tuple[str, str] | None

    def fetch_datasets(self) -> pd.DataFrame:
        """Fetch dataset metadata from ESGF."""
        facets: dict[str, Any] = dict(self.facets)
        if self.time_span:
            facets["file_start"] = self.time_span[0]
            facets["file_end"] = self.time_span[1]

        # Convert tuples to lists for intake-esgf compatibility
        for key, value in facets.items():
            if isinstance(value, tuple):
                facets[key] = list(value)

        cat = ESGFCatalog()  # type: ignore[no-untyped-call]
        cat.search(**facets)

        if self.remove_ensembles:
            cat.remove_ensembles()

        path_dict = cat.to_path_dict(prefer_streaming=False, minimal_keys=False, quiet=True)
        if cat.df is None or cat.df.empty:
            raise ValueError("No datasets found for the given ESGF request")
        merged_df = cat.df.merge(pd.Series(path_dict, name="files"), left_on="key", right_index=True)

        if self.time_span:
            merged_df["time_start"] = self.time_span[0]
            merged_df["time_end"] = self.time_span[1]

        return _deduplicate_datasets(merged_df)

fetch_datasets() #

Fetch dataset metadata from ESGF.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/base.py
def fetch_datasets(self) -> pd.DataFrame:
    """Fetch dataset metadata from ESGF."""
    facets: dict[str, Any] = dict(self.facets)
    if self.time_span:
        facets["file_start"] = self.time_span[0]
        facets["file_end"] = self.time_span[1]

    # Convert tuples to lists for intake-esgf compatibility
    for key, value in facets.items():
        if isinstance(value, tuple):
            facets[key] = list(value)

    cat = ESGFCatalog()  # type: ignore[no-untyped-call]
    cat.search(**facets)

    if self.remove_ensembles:
        cat.remove_ensembles()

    path_dict = cat.to_path_dict(prefer_streaming=False, minimal_keys=False, quiet=True)
    if cat.df is None or cat.df.empty:
        raise ValueError("No datasets found for the given ESGF request")
    merged_df = cat.df.merge(pd.Series(path_dict, name="files"), left_on="key", right_index=True)

    if self.time_span:
        merged_df["time_start"] = self.time_span[0]
        merged_df["time_end"] = self.time_span[1]

    return _deduplicate_datasets(merged_df)