Wrapper around a dataset catalog DataFrame that supports lazy loading and lazy finalisation.
This replaces the raw pd.DataFrame in the solver's data_catalog dict,
enabling two-phase ingestion where datasets are first bootstrapped from
DRS metadata and only finalised when needed.
This lowers the amount of file I/O needed.
Source code in packages/climate-ref/src/climate_ref/data_catalog.py
| @define
class DataCatalog:
"""
Wrapper around a dataset catalog DataFrame that supports lazy loading and lazy finalisation.
This replaces the raw pd.DataFrame in the solver's data_catalog dict,
enabling two-phase ingestion where datasets are first bootstrapped from
DRS metadata and only finalised when needed.
This lowers the amount of file I/O needed.
"""
database: Database | None
adapter: DatasetAdapter | None
_df: pd.DataFrame | None = None
@property
def columns_requiring_finalisation(self) -> frozenset[str]:
"""
Columns that require finalisation before they can be used for filtering or grouping.
Delegates to the adapter's ``columns_requiring_finalisation`` attribute.
Returns an empty frozenset when no adapter is set.
"""
if self.adapter is None:
return frozenset()
return self.adapter.columns_requiring_finalisation
@staticmethod
def from_frame(df: pd.DataFrame) -> DataCatalog:
"""
Create a DataCatalog from an existing DataFrame, bypassing lazy loading.
This is useful for testing or when the catalog is already loaded.
Parameters
----------
df
The DataFrame to use as the catalog
Returns
-------
:
A DataCatalog instance with the given DataFrame as its catalog
"""
return DataCatalog(database=None, adapter=None, df=df)
def to_frame(self) -> pd.DataFrame:
"""
Get the catalog as a DataFrame, lazily loading from DB on first access.
"""
if self._df is None:
if self.adapter is None or self.database is None:
raise RefException("Cannot load catalog: adapter and database must be provided")
self._df = self.adapter.load_catalog(self.database)
return self._df
def finalise(self, subset: pd.DataFrame) -> pd.DataFrame:
"""
Finalise unfinalised datasets in the given subset.
If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin),
unfinalised datasets in the subset are finalised by opening their files.
The internal cache and database are updated accordingly.
Parameters
----------
subset
DataFrame subset to finalise (typically after filter+group_by)
Returns
-------
:
The subset with any unfinalised datasets now finalised
"""
if not isinstance(self.adapter, FinaliseableDatasetAdapterMixin):
return subset
if self.database is None: # type: ignore[unreachable]
raise RefException("Cannot finalise datasets: database must be provided")
has_unfinalised = (
"finalised" in subset.columns and (subset["finalised"] == False).any() # noqa: E712
)
if not has_unfinalised:
return subset
logger.info(
f"Finalising {(subset['finalised'] == False).sum()} unfinalised datasets" # noqa: E712
)
result = self.adapter.finalise_datasets(self.database, subset)
# Invalidate the cached DataFrame so the next to_frame() call
# reloads from DB with correct finalised metadata.
# In-place cache updates are unreliable because _apply_fixes()
# can change the DataFrame's index structure.
# Note: this invalidation does NOT affect the current iteration in
# extract_covered_datasets (which operates on a local catalog_df copy).
# It ensures the *next* DataRequirement processed against this
# DataCatalog gets fresh data from the DB.
self._df = None
return result
|
columns_requiring_finalisation
property
Columns that require finalisation before they can be used for filtering or grouping.
Delegates to the adapter's columns_requiring_finalisation attribute.
Returns an empty frozenset when no adapter is set.
finalise(subset)
Finalise unfinalised datasets in the given subset.
If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin),
unfinalised datasets in the subset are finalised by opening their files.
The internal cache and database are updated accordingly.
Parameters:
| Name |
Type |
Description |
Default |
subset
|
DataFrame
|
DataFrame subset to finalise (typically after filter+group_by)
|
required
|
Returns:
| Type |
Description |
DataFrame
|
The subset with any unfinalised datasets now finalised
|
Source code in packages/climate-ref/src/climate_ref/data_catalog.py
| def finalise(self, subset: pd.DataFrame) -> pd.DataFrame:
"""
Finalise unfinalised datasets in the given subset.
If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin),
unfinalised datasets in the subset are finalised by opening their files.
The internal cache and database are updated accordingly.
Parameters
----------
subset
DataFrame subset to finalise (typically after filter+group_by)
Returns
-------
:
The subset with any unfinalised datasets now finalised
"""
if not isinstance(self.adapter, FinaliseableDatasetAdapterMixin):
return subset
if self.database is None: # type: ignore[unreachable]
raise RefException("Cannot finalise datasets: database must be provided")
has_unfinalised = (
"finalised" in subset.columns and (subset["finalised"] == False).any() # noqa: E712
)
if not has_unfinalised:
return subset
logger.info(
f"Finalising {(subset['finalised'] == False).sum()} unfinalised datasets" # noqa: E712
)
result = self.adapter.finalise_datasets(self.database, subset)
# Invalidate the cached DataFrame so the next to_frame() call
# reloads from DB with correct finalised metadata.
# In-place cache updates are unreliable because _apply_fixes()
# can change the DataFrame's index structure.
# Note: this invalidation does NOT affect the current iteration in
# extract_covered_datasets (which operates on a local catalog_df copy).
# It ensures the *next* DataRequirement processed against this
# DataCatalog gets fresh data from the DB.
self._df = None
return result
|
from_frame(df)
staticmethod
Create a DataCatalog from an existing DataFrame, bypassing lazy loading.
This is useful for testing or when the catalog is already loaded.
Parameters:
| Name |
Type |
Description |
Default |
df
|
DataFrame
|
The DataFrame to use as the catalog
|
required
|
Returns:
| Type |
Description |
DataCatalog
|
A DataCatalog instance with the given DataFrame as its catalog
|
Source code in packages/climate-ref/src/climate_ref/data_catalog.py
| @staticmethod
def from_frame(df: pd.DataFrame) -> DataCatalog:
"""
Create a DataCatalog from an existing DataFrame, bypassing lazy loading.
This is useful for testing or when the catalog is already loaded.
Parameters
----------
df
The DataFrame to use as the catalog
Returns
-------
:
A DataCatalog instance with the given DataFrame as its catalog
"""
return DataCatalog(database=None, adapter=None, df=df)
|
to_frame()
Get the catalog as a DataFrame, lazily loading from DB on first access.
Source code in packages/climate-ref/src/climate_ref/data_catalog.py
| def to_frame(self) -> pd.DataFrame:
"""
Get the catalog as a DataFrame, lazily loading from DB on first access.
"""
if self._df is None:
if self.adapter is None or self.database is None:
raise RefException("Cannot load catalog: adapter and database must be provided")
self._df = self.adapter.load_catalog(self.database)
return self._df
|