Skip to content

climate_ref.data_catalog #

DataCatalog wrapper for lazy loading and finalisation of dataset catalogs.

This module provides a wrapper around pandas DataFrames that supports: - Lazy loading of dataset catalogs from the database - Lazy finalisation of unfinalised datasets at solve time

DataCatalog #

Wrapper around a dataset catalog DataFrame that supports lazy loading and lazy finalisation.

This replaces the raw pd.DataFrame in the solver's data_catalog dict, enabling two-phase ingestion where datasets are first bootstrapped from DRS metadata and only finalised when needed.

This lowers the amount of file I/O needed.

Source code in packages/climate-ref/src/climate_ref/data_catalog.py
@define
class DataCatalog:
    """
    Wrapper around a dataset catalog DataFrame that supports lazy loading and lazy finalisation.

    This replaces the raw pd.DataFrame in the solver's data_catalog dict,
    enabling two-phase ingestion where datasets are first bootstrapped from
    DRS metadata and only finalised when needed.

    This lowers the amount of file I/O needed.
    """

    database: Database | None
    adapter: DatasetAdapter | None
    _df: pd.DataFrame | None = None

    @property
    def columns_requiring_finalisation(self) -> frozenset[str]:
        """
        Columns that require finalisation before they can be used for filtering or grouping.

        Delegates to the adapter's ``columns_requiring_finalisation`` attribute.
        Returns an empty frozenset when no adapter is set.
        """
        if self.adapter is None:
            return frozenset()
        return self.adapter.columns_requiring_finalisation

    @staticmethod
    def from_frame(df: pd.DataFrame) -> DataCatalog:
        """
        Create a DataCatalog from an existing DataFrame, bypassing lazy loading.

        This is useful for testing or when the catalog is already loaded.

        Parameters
        ----------
        df
            The DataFrame to use as the catalog

        Returns
        -------
        :
            A DataCatalog instance with the given DataFrame as its catalog
        """
        return DataCatalog(database=None, adapter=None, df=df)

    def to_frame(self) -> pd.DataFrame:
        """
        Get the catalog as a DataFrame, lazily loading from DB on first access.
        """
        if self._df is None:
            if self.adapter is None or self.database is None:
                raise RefException("Cannot load catalog: adapter and database must be provided")

            self._df = self.adapter.load_catalog(self.database)
        return self._df

    def finalise(self, subset: pd.DataFrame) -> pd.DataFrame:
        """
        Finalise unfinalised datasets in the given subset.

        If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin),
        unfinalised datasets in the subset are finalised by opening their files.
        The internal cache and database are updated accordingly.

        Parameters
        ----------
        subset
            DataFrame subset to finalise (typically after filter+group_by)

        Returns
        -------
        :
            The subset with any unfinalised datasets now finalised
        """
        if not isinstance(self.adapter, FinaliseableDatasetAdapterMixin):
            return subset

        if self.database is None:  # type: ignore[unreachable]
            raise RefException("Cannot finalise datasets: database must be provided")

        has_unfinalised = (
            "finalised" in subset.columns and (subset["finalised"] == False).any()  # noqa: E712
        )
        if not has_unfinalised:
            return subset

        logger.info(
            f"Finalising {(subset['finalised'] == False).sum()} unfinalised datasets"  # noqa: E712
        )
        result = self.adapter.finalise_datasets(self.database, subset)

        # Invalidate the cached DataFrame so the next to_frame() call
        # reloads from DB with correct finalised metadata.
        # In-place cache updates are unreliable because _apply_fixes()
        # can change the DataFrame's index structure.
        # Note: this invalidation does NOT affect the current iteration in
        # extract_covered_datasets (which operates on a local catalog_df copy).
        # It ensures the *next* DataRequirement processed against this
        # DataCatalog gets fresh data from the DB.
        self._df = None

        return result

columns_requiring_finalisation property #

Columns that require finalisation before they can be used for filtering or grouping.

Delegates to the adapter's columns_requiring_finalisation attribute. Returns an empty frozenset when no adapter is set.

finalise(subset) #

Finalise unfinalised datasets in the given subset.

If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin), unfinalised datasets in the subset are finalised by opening their files. The internal cache and database are updated accordingly.

Parameters:

Name Type Description Default
subset DataFrame

DataFrame subset to finalise (typically after filter+group_by)

required

Returns:

Type Description
DataFrame

The subset with any unfinalised datasets now finalised

Source code in packages/climate-ref/src/climate_ref/data_catalog.py
def finalise(self, subset: pd.DataFrame) -> pd.DataFrame:
    """
    Finalise unfinalised datasets in the given subset.

    If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin),
    unfinalised datasets in the subset are finalised by opening their files.
    The internal cache and database are updated accordingly.

    Parameters
    ----------
    subset
        DataFrame subset to finalise (typically after filter+group_by)

    Returns
    -------
    :
        The subset with any unfinalised datasets now finalised
    """
    if not isinstance(self.adapter, FinaliseableDatasetAdapterMixin):
        return subset

    if self.database is None:  # type: ignore[unreachable]
        raise RefException("Cannot finalise datasets: database must be provided")

    has_unfinalised = (
        "finalised" in subset.columns and (subset["finalised"] == False).any()  # noqa: E712
    )
    if not has_unfinalised:
        return subset

    logger.info(
        f"Finalising {(subset['finalised'] == False).sum()} unfinalised datasets"  # noqa: E712
    )
    result = self.adapter.finalise_datasets(self.database, subset)

    # Invalidate the cached DataFrame so the next to_frame() call
    # reloads from DB with correct finalised metadata.
    # In-place cache updates are unreliable because _apply_fixes()
    # can change the DataFrame's index structure.
    # Note: this invalidation does NOT affect the current iteration in
    # extract_covered_datasets (which operates on a local catalog_df copy).
    # It ensures the *next* DataRequirement processed against this
    # DataCatalog gets fresh data from the DB.
    self._df = None

    return result

from_frame(df) staticmethod #

Create a DataCatalog from an existing DataFrame, bypassing lazy loading.

This is useful for testing or when the catalog is already loaded.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to use as the catalog

required

Returns:

Type Description
DataCatalog

A DataCatalog instance with the given DataFrame as its catalog

Source code in packages/climate-ref/src/climate_ref/data_catalog.py
@staticmethod
def from_frame(df: pd.DataFrame) -> DataCatalog:
    """
    Create a DataCatalog from an existing DataFrame, bypassing lazy loading.

    This is useful for testing or when the catalog is already loaded.

    Parameters
    ----------
    df
        The DataFrame to use as the catalog

    Returns
    -------
    :
        A DataCatalog instance with the given DataFrame as its catalog
    """
    return DataCatalog(database=None, adapter=None, df=df)

to_frame() #

Get the catalog as a DataFrame, lazily loading from DB on first access.

Source code in packages/climate-ref/src/climate_ref/data_catalog.py
def to_frame(self) -> pd.DataFrame:
    """
    Get the catalog as a DataFrame, lazily loading from DB on first access.
    """
    if self._df is None:
        if self.adapter is None or self.database is None:
            raise RefException("Cannot load catalog: adapter and database must be provided")

        self._df = self.adapter.load_catalog(self.database)
    return self._df