Skip to content

climate_ref_core.esgf.registry #

Registry-based dataset request implementation.

This module provides request classes for fetching datasets from pooch registries (e.g., pmp-climatology) rather than ESGF.

RegistryRequest #

Request for data from a pooch registry (e.g., pmp-climatology).

These data are fetched from a pooch registry rather than ESGF. This is useful for pre-processed datasets like PMP climatologies that are hosted externally but not on ESGF.

Parameters:

Name Type Description Default
slug str

Unique identifier for this request

required
registry_name str

Name of the registry to fetch from (e.g., "pmp-climatology")

required
facets dict[str, str | tuple[str, ...]]

Facets to filter datasets (e.g., {"variable_id": "psl", "source_id": "ERA-5"})

required
source_type str

Type of dataset source (default: "PMPClimatology")

'PMPClimatology'
time_span tuple[str, str] | None

Optional time range filter (not used for registry filtering, but required for protocol)

None
Example
request = RegistryRequest(
    slug="era5-psl",
    registry_name="pmp-climatology",
    facets={"variable_id": "psl", "source_id": "ERA-5"},
)
df = request.fetch_datasets()
Source code in packages/climate-ref-core/src/climate_ref_core/esgf/registry.py
class RegistryRequest:
    """
    Request for data from a pooch registry (e.g., pmp-climatology).

    These data are fetched from a pooch registry rather than ESGF.
    This is useful for pre-processed datasets like PMP climatologies
    that are hosted externally but not on ESGF.

    Parameters
    ----------
    slug
        Unique identifier for this request
    registry_name
        Name of the registry to fetch from (e.g., "pmp-climatology")
    facets
        Facets to filter datasets (e.g., {"variable_id": "psl", "source_id": "ERA-5"})
    source_type
        Type of dataset source (default: "PMPClimatology")
    time_span
        Optional time range filter (not used for registry filtering, but required for protocol)

    Example
    -------
    ```python
    request = RegistryRequest(
        slug="era5-psl",
        registry_name="pmp-climatology",
        facets={"variable_id": "psl", "source_id": "ERA-5"},
    )
    df = request.fetch_datasets()
    ```
    """

    def __init__(
        self,
        slug: str,
        registry_name: str,
        facets: dict[str, str | tuple[str, ...]],
        source_type: str = "PMPClimatology",
        time_span: tuple[str, str] | None = None,
    ) -> None:
        self.slug = slug
        self.registry_name = registry_name
        self.facets = facets
        self.source_type = source_type
        self.time_span = time_span

    def __repr__(self) -> str:
        return (
            f"RegistryRequest(slug={self.slug!r}, registry_name={self.registry_name!r}, "
            f"facets={self.facets!r}, source_type={self.source_type!r}, time_span={self.time_span!r})"
        )

    def _get_parser(self) -> Callable[[str], dict[str, Any]]:
        """Get the appropriate parser function based on registry name."""
        if self.registry_name == "pmp-climatology":
            return _parse_pmp_climatology_key
        elif self.registry_name == "obs4ref":
            return _parse_obs4ref_key
        else:
            # Default to obs4ref parser as fallback
            logger.warning(f"Unknown registry '{self.registry_name}', using obs4ref parser")
            return _parse_obs4ref_key

    def fetch_datasets(self) -> pd.DataFrame:
        """
        Fetch matching datasets from the registry.

        Returns
        -------
            DataFrame containing dataset metadata and file paths.
            Each row represents one file, with columns for metadata
            and a 'files' column containing a list with the file path.
        """
        logger.info(f"Fetching from registry '{self.registry_name}' for request: {self.slug}")

        try:
            registry = dataset_registry_manager[self.registry_name]
        except KeyError:
            raise ValueError(
                f"Registry '{self.registry_name}' not found. "
                f"Available registries: {list(dataset_registry_manager.keys())}"
            )

        parser = self._get_parser()
        matching_rows: list[dict[str, Any]] = []

        for key in registry.registry.keys():
            # Parse metadata from the registry key
            metadata = parser(key)
            if not metadata:
                continue

            # Check if it matches the requested facets
            if not _matches_facets(metadata, self.facets):
                continue

            # Fetch the file (downloads if not cached)
            try:
                file_path = registry.fetch(key)
                logger.debug(f"Fetched: {key} -> {file_path}")
            except Exception as e:
                logger.warning(f"Failed to fetch {key}: {e}")
                continue

            # Build row compatible with ESGFFetcher expectations
            row = {
                **metadata,
                "files": [file_path],
                "path": file_path,
            }
            matching_rows.append(row)

        if not matching_rows:
            logger.warning(f"No datasets found matching facets: {self.facets}")
            return pd.DataFrame()

        result = pd.DataFrame(matching_rows)

        # Filter to only the latest version for each unique dataset
        # Datasets are identified by source_id, variable_id, and grid_label
        if "version" in result.columns:
            group_by_cols = ["source_id", "variable_id", "grid_label"]
            # Only group by columns that exist in the DataFrame
            group_by_cols = [col for col in group_by_cols if col in result.columns]
            if group_by_cols:
                max_version = result.groupby(group_by_cols, sort=False)["version"].transform("max")
                result = result[result["version"] == max_version]

        logger.info(f"Found {len(result)} datasets matching request: {self.slug}")

        return result

fetch_datasets() #

Fetch matching datasets from the registry.

Returns:

Type Description
DataFrame containing dataset metadata and file paths.

Each row represents one file, with columns for metadata and a 'files' column containing a list with the file path.

Source code in packages/climate-ref-core/src/climate_ref_core/esgf/registry.py
def fetch_datasets(self) -> pd.DataFrame:
    """
    Fetch matching datasets from the registry.

    Returns
    -------
        DataFrame containing dataset metadata and file paths.
        Each row represents one file, with columns for metadata
        and a 'files' column containing a list with the file path.
    """
    logger.info(f"Fetching from registry '{self.registry_name}' for request: {self.slug}")

    try:
        registry = dataset_registry_manager[self.registry_name]
    except KeyError:
        raise ValueError(
            f"Registry '{self.registry_name}' not found. "
            f"Available registries: {list(dataset_registry_manager.keys())}"
        )

    parser = self._get_parser()
    matching_rows: list[dict[str, Any]] = []

    for key in registry.registry.keys():
        # Parse metadata from the registry key
        metadata = parser(key)
        if not metadata:
            continue

        # Check if it matches the requested facets
        if not _matches_facets(metadata, self.facets):
            continue

        # Fetch the file (downloads if not cached)
        try:
            file_path = registry.fetch(key)
            logger.debug(f"Fetched: {key} -> {file_path}")
        except Exception as e:
            logger.warning(f"Failed to fetch {key}: {e}")
            continue

        # Build row compatible with ESGFFetcher expectations
        row = {
            **metadata,
            "files": [file_path],
            "path": file_path,
        }
        matching_rows.append(row)

    if not matching_rows:
        logger.warning(f"No datasets found matching facets: {self.facets}")
        return pd.DataFrame()

    result = pd.DataFrame(matching_rows)

    # Filter to only the latest version for each unique dataset
    # Datasets are identified by source_id, variable_id, and grid_label
    if "version" in result.columns:
        group_by_cols = ["source_id", "variable_id", "grid_label"]
        # Only group by columns that exist in the DataFrame
        group_by_cols = [col for col in group_by_cols if col in result.columns]
        if group_by_cols:
            max_version = result.groupby(group_by_cols, sort=False)["version"].transform("max")
            result = result[result["version"] == max_version]

    logger.info(f"Found {len(result)} datasets matching request: {self.slug}")

    return result