Skip to content

climate_ref_core.constraints #

Dataset selection constraints

AddParentDataset #

Include a dataset's parent in the selection.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
class AddParentDataset:
    """
    Include a dataset's parent in the selection.
    """

    parent_facet_map: dict[str, str]
    """
    Mapping from child to parent facets.
    """

    def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Include a dataset's parent in the selection.

        """
        all_parent_facets = sorted({*self.parent_facet_map.keys(), *self.parent_facet_map.values()})

        # Remove datasets that do not have all parent facets set.
        valid_group = group[all_parent_facets].dropna(axis="index")

        # Add the parent datasets from the data catalog.
        select = pd.Series(False, index=data_catalog.index)
        select.loc[valid_group.index] = True
        for _, child_dataset in valid_group.groupby(all_parent_facets):
            child_facets = {facet: child_dataset[facet].unique().tolist() for facet in all_parent_facets}
            parent_facets = {
                parent_facet: child_facets[child_facet]
                for parent_facet, child_facet in self.parent_facet_map.items()
            }
            parent_dataset = data_catalog[
                data_catalog[list(parent_facets)].isin(parent_facets).all(axis="columns")
            ]
            if parent_dataset.empty:
                # Drop the child dataset if no parent dataset is found.
                logger.debug(
                    f"Constraint {self} not satisfied because no parent dataset found for "
                    f"{', '.join(f'{k}={v}' for k, v in child_facets.items())}"
                )
                select.loc[child_dataset.index] = False
            else:
                # Add the latest version of the dataset to the selection.
                parent_dataset = parent_dataset[parent_dataset["version"] == parent_dataset["version"].max()]
                select.loc[parent_dataset.index] = True

        return data_catalog[select]

    @classmethod
    def from_defaults(
        cls,
        source_type: SourceDatasetType,
    ) -> Self:
        """
        Include a dataset's parent in the selection.

        The constraint is created using the defaults for the source_type.

        Parameters
        ----------
        source_type:
            The source_type of the variable to add.

        Returns
        -------
        :
            A constraint to include a dataset's parent in the selection.

        """
        parent_facet_options = {
            SourceDatasetType.CMIP6: {
                "source_id": "parent_source_id",
                "experiment_id": "parent_experiment_id",
                "variant_label": "parent_variant_label",
                "table_id": "table_id",
                "variable_id": "variable_id",
                "grid_label": "grid_label",
            },
            SourceDatasetType.CMIP7: {
                "source_id": "parent_source_id",
                "experiment_id": "parent_experiment_id",
                "variant_label": "parent_variant_label",
                "variable_id": "variable_id",
                "grid_label": "grid_label",
            },
        }
        return cls(parent_facet_map=parent_facet_options[source_type])

parent_facet_map instance-attribute #

Mapping from child to parent facets.

apply(group, data_catalog) #

Include a dataset's parent in the selection.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Include a dataset's parent in the selection.

    """
    all_parent_facets = sorted({*self.parent_facet_map.keys(), *self.parent_facet_map.values()})

    # Remove datasets that do not have all parent facets set.
    valid_group = group[all_parent_facets].dropna(axis="index")

    # Add the parent datasets from the data catalog.
    select = pd.Series(False, index=data_catalog.index)
    select.loc[valid_group.index] = True
    for _, child_dataset in valid_group.groupby(all_parent_facets):
        child_facets = {facet: child_dataset[facet].unique().tolist() for facet in all_parent_facets}
        parent_facets = {
            parent_facet: child_facets[child_facet]
            for parent_facet, child_facet in self.parent_facet_map.items()
        }
        parent_dataset = data_catalog[
            data_catalog[list(parent_facets)].isin(parent_facets).all(axis="columns")
        ]
        if parent_dataset.empty:
            # Drop the child dataset if no parent dataset is found.
            logger.debug(
                f"Constraint {self} not satisfied because no parent dataset found for "
                f"{', '.join(f'{k}={v}' for k, v in child_facets.items())}"
            )
            select.loc[child_dataset.index] = False
        else:
            # Add the latest version of the dataset to the selection.
            parent_dataset = parent_dataset[parent_dataset["version"] == parent_dataset["version"].max()]
            select.loc[parent_dataset.index] = True

    return data_catalog[select]

from_defaults(source_type) classmethod #

Include a dataset's parent in the selection.

The constraint is created using the defaults for the source_type.

Parameters:

Name Type Description Default
source_type SourceDatasetType

The source_type of the variable to add.

required

Returns:

Type Description
Self

A constraint to include a dataset's parent in the selection.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@classmethod
def from_defaults(
    cls,
    source_type: SourceDatasetType,
) -> Self:
    """
    Include a dataset's parent in the selection.

    The constraint is created using the defaults for the source_type.

    Parameters
    ----------
    source_type:
        The source_type of the variable to add.

    Returns
    -------
    :
        A constraint to include a dataset's parent in the selection.

    """
    parent_facet_options = {
        SourceDatasetType.CMIP6: {
            "source_id": "parent_source_id",
            "experiment_id": "parent_experiment_id",
            "variant_label": "parent_variant_label",
            "table_id": "table_id",
            "variable_id": "variable_id",
            "grid_label": "grid_label",
        },
        SourceDatasetType.CMIP7: {
            "source_id": "parent_source_id",
            "experiment_id": "parent_experiment_id",
            "variant_label": "parent_variant_label",
            "variable_id": "variable_id",
            "grid_label": "grid_label",
        },
    }
    return cls(parent_facet_map=parent_facet_options[source_type])

AddSupplementaryDataset #

Include e.g. a cell measure or ancillary variable in the selection.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
class AddSupplementaryDataset:
    """
    Include e.g. a cell measure or ancillary variable in the selection.
    """

    supplementary_facets: Mapping[str, str | tuple[str, ...]]
    """
    Facets describing the supplementary dataset.
    """

    matching_facets: tuple[str, ...]
    """
    Facets that must match with datasets in the selection.
    """

    optional_matching_facets: tuple[str, ...]
    """
    Select only the best matching datasets based on similarity with these facets.
    """

    def apply(
        self,
        group: pd.DataFrame,
        data_catalog: pd.DataFrame,
    ) -> pd.DataFrame:
        """
        Add a supplementary dataset to the group.
        """
        supplementary_facets: defaultdict[str, tuple[str, ...]] = defaultdict(tuple)
        for facet, values in self.supplementary_facets.items():
            supplementary_facets[facet] = values if isinstance(values, tuple) else (values,)

        for facet in self.matching_facets:
            values = tuple(group[facet].unique())
            supplementary_facets[facet] += values

        mask = data_catalog[list(supplementary_facets)].isin(supplementary_facets).all(axis="columns")
        supplementary_group = data_catalog[mask]
        if not supplementary_group.empty:
            # Save the original index and reset to a unique RangeIndex.
            # The data catalog index can contain duplicate labels (e.g. multiple
            # file entries for the same dataset) which causes pandas ``|=`` to
            # fail with "cannot reindex on an axis with duplicate labels".
            original_index = supplementary_group.index.copy()
            supplementary_group = supplementary_group.reset_index(drop=True)

            matching_facets = list(self.matching_facets)
            facets = matching_facets + list(self.optional_matching_facets)
            datasets = group[facets].drop_duplicates()
            select = pd.Series(False, index=supplementary_group.index)
            for i in range(len(datasets)):
                dataset = datasets.iloc[i]
                # Restrict the supplementary datasets to those that match the main dataset.
                supplementaries = supplementary_group[
                    (supplementary_group[matching_facets] == dataset[matching_facets]).all(axis="columns")
                ]
                if not supplementaries.empty:
                    # Select the best matching supplementary dataset based on the optional matching facets.
                    scores = (supplementaries[facets] == dataset).sum(axis="columns")
                    supplementaries = supplementaries[scores == scores.max()]
                    if "version" in supplementaries.columns:
                        # Select the latest version if there are multiple matches
                        supplementaries = supplementaries[
                            supplementaries["version"] == supplementaries["version"].max()
                        ]
                    # Select only the first group if there are still multiple matches
                    first_supplementary_dataset = supplementaries[facets].drop_duplicates().iloc[0]
                    select |= (supplementaries[facets] == first_supplementary_dataset).all(axis="columns")

            supplementary_group = supplementary_group[select]
            # Restore the original index so downstream concatenation is consistent
            supplementary_group.index = original_index[list(select)]

        if supplementary_group.empty:
            return group
        # Drop all-NA columns before concat as the default behaviour will change in pandas 3
        return pd.concat(
            [
                group.dropna(axis="columns", how="all"),
                supplementary_group.dropna(axis="columns", how="all"),
            ]
        )

    @classmethod
    def from_defaults(
        cls,
        variable: str,
        source_type: SourceDatasetType,
    ) -> Self:
        """
        Include e.g. a cell measure or ancillary variable in the selection.

        The constraint is created using the defaults for the source_type.

        Parameters
        ----------
        variable:
            The name of the variable to add.
        source_type:
            The source_type of the variable to add.

        Returns
        -------
        :
            A constraint to include a supplementary variable.

        """
        kwargs: dict[SourceDatasetType, dict[str, tuple[str, ...]]] = {
            SourceDatasetType.CMIP6: {
                "matching_facets": (
                    "source_id",
                    "grid_label",
                ),
                "optional_matching_facets": (
                    "table_id",
                    "experiment_id",
                    "member_id",
                    "version",
                ),
            },
            SourceDatasetType.CMIP7: {
                "matching_facets": (
                    "source_id",
                    "grid_label",
                ),
                "optional_matching_facets": (
                    "experiment_id",
                    "variant_label",  # CMIP7 uses variant_label instead of member_id
                ),
            },
        }
        variable_facet: dict[SourceDatasetType, str] = {
            SourceDatasetType.CMIP6: "variable_id",
            SourceDatasetType.CMIP7: "variable_id",
        }

        supplementary_facets = {variable_facet[source_type]: variable}
        source_kwargs = kwargs[source_type]
        return cls(
            supplementary_facets,
            matching_facets=source_kwargs["matching_facets"],
            optional_matching_facets=source_kwargs["optional_matching_facets"],
        )

matching_facets instance-attribute #

Facets that must match with datasets in the selection.

optional_matching_facets instance-attribute #

Select only the best matching datasets based on similarity with these facets.

supplementary_facets instance-attribute #

Facets describing the supplementary dataset.

apply(group, data_catalog) #

Add a supplementary dataset to the group.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(
    self,
    group: pd.DataFrame,
    data_catalog: pd.DataFrame,
) -> pd.DataFrame:
    """
    Add a supplementary dataset to the group.
    """
    supplementary_facets: defaultdict[str, tuple[str, ...]] = defaultdict(tuple)
    for facet, values in self.supplementary_facets.items():
        supplementary_facets[facet] = values if isinstance(values, tuple) else (values,)

    for facet in self.matching_facets:
        values = tuple(group[facet].unique())
        supplementary_facets[facet] += values

    mask = data_catalog[list(supplementary_facets)].isin(supplementary_facets).all(axis="columns")
    supplementary_group = data_catalog[mask]
    if not supplementary_group.empty:
        # Save the original index and reset to a unique RangeIndex.
        # The data catalog index can contain duplicate labels (e.g. multiple
        # file entries for the same dataset) which causes pandas ``|=`` to
        # fail with "cannot reindex on an axis with duplicate labels".
        original_index = supplementary_group.index.copy()
        supplementary_group = supplementary_group.reset_index(drop=True)

        matching_facets = list(self.matching_facets)
        facets = matching_facets + list(self.optional_matching_facets)
        datasets = group[facets].drop_duplicates()
        select = pd.Series(False, index=supplementary_group.index)
        for i in range(len(datasets)):
            dataset = datasets.iloc[i]
            # Restrict the supplementary datasets to those that match the main dataset.
            supplementaries = supplementary_group[
                (supplementary_group[matching_facets] == dataset[matching_facets]).all(axis="columns")
            ]
            if not supplementaries.empty:
                # Select the best matching supplementary dataset based on the optional matching facets.
                scores = (supplementaries[facets] == dataset).sum(axis="columns")
                supplementaries = supplementaries[scores == scores.max()]
                if "version" in supplementaries.columns:
                    # Select the latest version if there are multiple matches
                    supplementaries = supplementaries[
                        supplementaries["version"] == supplementaries["version"].max()
                    ]
                # Select only the first group if there are still multiple matches
                first_supplementary_dataset = supplementaries[facets].drop_duplicates().iloc[0]
                select |= (supplementaries[facets] == first_supplementary_dataset).all(axis="columns")

        supplementary_group = supplementary_group[select]
        # Restore the original index so downstream concatenation is consistent
        supplementary_group.index = original_index[list(select)]

    if supplementary_group.empty:
        return group
    # Drop all-NA columns before concat as the default behaviour will change in pandas 3
    return pd.concat(
        [
            group.dropna(axis="columns", how="all"),
            supplementary_group.dropna(axis="columns", how="all"),
        ]
    )

from_defaults(variable, source_type) classmethod #

Include e.g. a cell measure or ancillary variable in the selection.

The constraint is created using the defaults for the source_type.

Parameters:

Name Type Description Default
variable str

The name of the variable to add.

required
source_type SourceDatasetType

The source_type of the variable to add.

required

Returns:

Type Description
Self

A constraint to include a supplementary variable.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@classmethod
def from_defaults(
    cls,
    variable: str,
    source_type: SourceDatasetType,
) -> Self:
    """
    Include e.g. a cell measure or ancillary variable in the selection.

    The constraint is created using the defaults for the source_type.

    Parameters
    ----------
    variable:
        The name of the variable to add.
    source_type:
        The source_type of the variable to add.

    Returns
    -------
    :
        A constraint to include a supplementary variable.

    """
    kwargs: dict[SourceDatasetType, dict[str, tuple[str, ...]]] = {
        SourceDatasetType.CMIP6: {
            "matching_facets": (
                "source_id",
                "grid_label",
            ),
            "optional_matching_facets": (
                "table_id",
                "experiment_id",
                "member_id",
                "version",
            ),
        },
        SourceDatasetType.CMIP7: {
            "matching_facets": (
                "source_id",
                "grid_label",
            ),
            "optional_matching_facets": (
                "experiment_id",
                "variant_label",  # CMIP7 uses variant_label instead of member_id
            ),
        },
    }
    variable_facet: dict[SourceDatasetType, str] = {
        SourceDatasetType.CMIP6: "variable_id",
        SourceDatasetType.CMIP7: "variable_id",
    }

    supplementary_facets = {variable_facet[source_type]: variable}
    source_kwargs = kwargs[source_type]
    return cls(
        supplementary_facets,
        matching_facets=source_kwargs["matching_facets"],
        optional_matching_facets=source_kwargs["optional_matching_facets"],
    )

GroupConstraint #

Bases: Protocol

An operation to perform on a group of datasets resulting in a new group of datasets.

This is applied to a group of datasets representing the inputs to a potential diagnostic execution.

If the operation results in an empty group, the constraint is considered not satisfied. The group must satisfy all constraints to be processed.

!! warning

Operations should not mutate the input group, but instead return a new group.
Mutating the input group may result in unexpected behaviour.
Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@runtime_checkable
class GroupConstraint(Protocol):
    """
    An operation to perform on a group of datasets resulting in a new group of datasets.

    This is applied to a group of datasets representing the inputs to a potential diagnostic execution.

    If the operation results in an empty group, the constraint is considered not satisfied.
    The group must satisfy all constraints to be processed.

    !! warning

        Operations should not mutate the input group, but instead return a new group.
        Mutating the input group may result in unexpected behaviour.
    """

    def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Perform an operation on the group of datasets.

        A new group of datasets should be returned if modifications are required,
        and the input group should not be modified. If no modifications are required,
        return the input group unchanged.
        If this operation fails, a ConstraintNotSatisfied exception should be raised.

        Parameters
        ----------
        group
            A group of datasets that is being validated.
        data_catalog
            The data catalog of datasets

        Raises
        ------
        ConstraintNotSatisfied
            The operation was not successful

        Returns
        -------
        :
            The updated group of datasets
        """
        ...

apply(group, data_catalog) #

Perform an operation on the group of datasets.

A new group of datasets should be returned if modifications are required, and the input group should not be modified. If no modifications are required, return the input group unchanged. If this operation fails, a ConstraintNotSatisfied exception should be raised.

Parameters:

Name Type Description Default
group DataFrame

A group of datasets that is being validated.

required
data_catalog DataFrame

The data catalog of datasets

required

Raises:

Type Description
ConstraintNotSatisfied

The operation was not successful

Returns:

Type Description
DataFrame

The updated group of datasets

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Perform an operation on the group of datasets.

    A new group of datasets should be returned if modifications are required,
    and the input group should not be modified. If no modifications are required,
    return the input group unchanged.
    If this operation fails, a ConstraintNotSatisfied exception should be raised.

    Parameters
    ----------
    group
        A group of datasets that is being validated.
    data_catalog
        The data catalog of datasets

    Raises
    ------
    ConstraintNotSatisfied
        The operation was not successful

    Returns
    -------
    :
        The updated group of datasets
    """
    ...

IgnoreFacets #

A constraint that ignores certain facet values.

Datasets with these facet values are removed from the selection.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
class IgnoreFacets:
    """
    A constraint that ignores certain facet values.

    Datasets with these facet values are removed from the selection.
    """

    facets: dict[str, str | tuple[str, ...]] = field(converter=_to_tuple_dict)
    """The facet values to ignore."""

    def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Filter out datasets with the ignored facets.
        """
        mask = group[list(self.facets)].isin(self.facets).all(axis="columns")
        if mask.any():
            logger.debug(f"Ignoring files {', '.join(group.loc[mask, 'path'])} becauseof {self}")
        return group[~mask]

facets = field(converter=_to_tuple_dict) class-attribute instance-attribute #

The facet values to ignore.

apply(group, data_catalog) #

Filter out datasets with the ignored facets.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Filter out datasets with the ignored facets.
    """
    mask = group[list(self.facets)].isin(self.facets).all(axis="columns")
    if mask.any():
        logger.debug(f"Ignoring files {', '.join(group.loc[mask, 'path'])} becauseof {self}")
    return group[~mask]

PartialDateTime #

A partial datetime object that can be used to compare datetimes.

Only the specified fields are used for comparison.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
@total_ordering
class PartialDateTime:  # noqa: PLW1641
    """
    A partial datetime object that can be used to compare datetimes.

    Only the specified fields are used for comparison.
    """

    year: int | None = None
    month: int | None = None
    day: int | None = None
    hour: int | None = None
    minute: int | None = None
    second: int | None = None

    @property
    def _attrs(self) -> dict[str, int]:
        """The attributes that are set."""
        return {
            a: v
            for a in self.__slots__  # type: ignore[attr-defined]
            if not a.startswith("_") and (v := getattr(self, a)) is not None
        }

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({', '.join(f'{a}={v}' for a, v in self._attrs.items())})"

    def __eq__(self, other: object) -> bool:
        # Use duck typing to support both datetime.datetime and cftime.datetime
        for attr, value in self._attrs.items():
            other_value = getattr(other, attr, None)
            if other_value is None:
                msg = (
                    f"Cannot compare PartialDateTime: object {other} of type "
                    f"{type(other)} has no attribute '{attr}'"
                )
                raise TypeError(msg)
            if value != other_value:
                return False
        return True

    def __lt__(self, other: object) -> bool:
        # Use duck typing to support both datetime.datetime and cftime.datetime
        for attr, value in self._attrs.items():
            other_value = getattr(other, attr, None)
            if other_value is None:
                msg = (
                    f"Cannot compare PartialDateTime: object {other} of type "
                    f"{type(other)} has no attribute '{attr}'"
                )
                raise TypeError(msg)
            if value != other_value:
                return value < other_value  # type: ignore[no-any-return]
        return False

RequireContiguousTimerange #

A constraint that requires datasets to have a contiguous timerange.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
class RequireContiguousTimerange:
    """
    A constraint that requires datasets to have a contiguous timerange.
    """

    group_by: tuple[str, ...]
    """
    The fields to group the datasets by. Groups that are not be contiguous in time
    are removed.
    """

    def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Check that all subgroups of the group have a contiguous timerange.
        """
        # Maximum allowed time difference between the end of one file and the
        # start of the next file.
        max_timedelta = pd.Timedelta(
            days=31,  # Maximum number of days in a month.
            hours=1,  # Allow for potential rounding errors.
        )

        select = pd.Series(True, index=group.index)

        for _, subgroup in group.dropna(subset=["start_time", "end_time"]).groupby(list(self.group_by)):
            if "calendar" in subgroup.columns and subgroup["calendar"].nunique() > 1:
                logger.debug(
                    f"Constraint {self} not satisfied because subgroup contains multiple calendars: "
                    f"{', '.join(subgroup['path'])}"
                )
                select.loc[subgroup.index] = False
                continue
            if len(subgroup) < 2:  # noqa: PLR2004
                continue

            sorted_group = subgroup.sort_values("start_time", kind="stable")
            start_series = sorted_group["start_time"]
            end_series = sorted_group["end_time"]
            try:
                diff = start_series.values[1:] - end_series.values[:-1]  # type: ignore[operator]
            except TypeError:
                # Cross-calendar cftime comparison: fall back to string representation
                # This can happen with historical vs scenario datasets that use different calendars
                diff = pd.to_timedelta(
                    pd.to_datetime(start_series.astype(str)).values[1:]  # type: ignore
                    - pd.to_datetime(end_series.astype(str)).values[:-1]
                )
            gap_indices = diff > max_timedelta
            if gap_indices.any():
                paths = sorted_group["path"]
                for gap_idx in np.flatnonzero(gap_indices):
                    logger.debug(
                        f"Constraint {self} not satisfied because gap larger "
                        f"than {max_timedelta} found between "
                        f"{paths.iloc[gap_idx]} and {paths.iloc[gap_idx + 1]}"
                    )
                select.loc[subgroup.index] = False

        return group[select]

group_by instance-attribute #

The fields to group the datasets by. Groups that are not be contiguous in time are removed.

apply(group, data_catalog) #

Check that all subgroups of the group have a contiguous timerange.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Check that all subgroups of the group have a contiguous timerange.
    """
    # Maximum allowed time difference between the end of one file and the
    # start of the next file.
    max_timedelta = pd.Timedelta(
        days=31,  # Maximum number of days in a month.
        hours=1,  # Allow for potential rounding errors.
    )

    select = pd.Series(True, index=group.index)

    for _, subgroup in group.dropna(subset=["start_time", "end_time"]).groupby(list(self.group_by)):
        if "calendar" in subgroup.columns and subgroup["calendar"].nunique() > 1:
            logger.debug(
                f"Constraint {self} not satisfied because subgroup contains multiple calendars: "
                f"{', '.join(subgroup['path'])}"
            )
            select.loc[subgroup.index] = False
            continue
        if len(subgroup) < 2:  # noqa: PLR2004
            continue

        sorted_group = subgroup.sort_values("start_time", kind="stable")
        start_series = sorted_group["start_time"]
        end_series = sorted_group["end_time"]
        try:
            diff = start_series.values[1:] - end_series.values[:-1]  # type: ignore[operator]
        except TypeError:
            # Cross-calendar cftime comparison: fall back to string representation
            # This can happen with historical vs scenario datasets that use different calendars
            diff = pd.to_timedelta(
                pd.to_datetime(start_series.astype(str)).values[1:]  # type: ignore
                - pd.to_datetime(end_series.astype(str)).values[:-1]
            )
        gap_indices = diff > max_timedelta
        if gap_indices.any():
            paths = sorted_group["path"]
            for gap_idx in np.flatnonzero(gap_indices):
                logger.debug(
                    f"Constraint {self} not satisfied because gap larger "
                    f"than {max_timedelta} found between "
                    f"{paths.iloc[gap_idx]} and {paths.iloc[gap_idx + 1]}"
                )
            select.loc[subgroup.index] = False

    return group[select]

RequireFacets #

A constraint that requires datasets to have certain facet values.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
class RequireFacets:
    """
    A constraint that requires datasets to have certain facet values.
    """

    dimension: str
    """The name of the facet to filter on."""

    required_facets: tuple[str, ...] = field(converter=_to_tuple)
    "The required facet values."

    operator: Literal["all", "any"] = "all"
    """Whether all or any of the required facets must be present."""

    group_by: tuple[str, ...] | None = field(converter=_to_tuple, default=None)
    """
    The facets to group the datasets by.

    Each group created by `group_by` must contain at least one dataset where the
    value of the given dimension is in the list of required facet values.

    For example, if there are multiple models and variables in the selection,
    `group_by` can be used to make sure that only those models are selected that
    provide all required variables.
    """

    def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Filter out groups of datasets that do not provide the required facets
        """
        op = all if self.operator == "all" else any
        select = pd.Series(True, index=group.index)
        groups = [group] if not self.group_by else (g[1] for g in group.groupby(list(self.group_by)))
        for subgroup in groups:
            if not op(value in subgroup[self.dimension].values for value in self.required_facets):
                if self.operator == "all":
                    missing_values = [
                        f"'{value}'"
                        for value in self.required_facets
                        if value not in subgroup[self.dimension].values
                    ]
                    logger.debug(
                        f"Constraint {self} not satisfied because required facet values "
                        f"{', '.join(missing_values)} not found for group "
                        f"{', '.join(sorted(subgroup['path']))}"
                    )
                else:
                    logger.debug(
                        f"Constraint {self} not satisfied because none of the required facet values "
                        f"were found for group {', '.join(sorted(subgroup['path']))}"
                    )
                select.loc[subgroup.index] = False
        return group[select]

dimension instance-attribute #

The name of the facet to filter on.

group_by = field(converter=_to_tuple, default=None) class-attribute instance-attribute #

The facets to group the datasets by.

Each group created by group_by must contain at least one dataset where the value of the given dimension is in the list of required facet values.

For example, if there are multiple models and variables in the selection, group_by can be used to make sure that only those models are selected that provide all required variables.

operator = 'all' class-attribute instance-attribute #

Whether all or any of the required facets must be present.

required_facets = field(converter=_to_tuple) class-attribute instance-attribute #

The required facet values.

apply(group, data_catalog) #

Filter out groups of datasets that do not provide the required facets

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Filter out groups of datasets that do not provide the required facets
    """
    op = all if self.operator == "all" else any
    select = pd.Series(True, index=group.index)
    groups = [group] if not self.group_by else (g[1] for g in group.groupby(list(self.group_by)))
    for subgroup in groups:
        if not op(value in subgroup[self.dimension].values for value in self.required_facets):
            if self.operator == "all":
                missing_values = [
                    f"'{value}'"
                    for value in self.required_facets
                    if value not in subgroup[self.dimension].values
                ]
                logger.debug(
                    f"Constraint {self} not satisfied because required facet values "
                    f"{', '.join(missing_values)} not found for group "
                    f"{', '.join(sorted(subgroup['path']))}"
                )
            else:
                logger.debug(
                    f"Constraint {self} not satisfied because none of the required facet values "
                    f"were found for group {', '.join(sorted(subgroup['path']))}"
                )
            select.loc[subgroup.index] = False
    return group[select]

RequireOverlappingTimerange #

A constraint that requires datasets to have an overlapping timerange.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
class RequireOverlappingTimerange:
    """
    A constraint that requires datasets to have an overlapping timerange.
    """

    group_by: tuple[str, ...]
    """
    The fields to group the datasets by. There must be overlap in time between
    the groups to fulfill the constraint.
    """

    def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Check that all subgroups of the group have an overlapping timerange.
        """
        group_with_time = group.dropna(subset=["start_time", "end_time"])
        if len(group_with_time) < 2:  # noqa: PLR2004
            return group

        starts = group_with_time.groupby(list(self.group_by))["start_time"].min()
        ends = group_with_time.groupby(list(self.group_by))["end_time"].max()
        try:
            result = starts.max() < ends.min()
        except TypeError:
            # Cross-calendar cftime comparison: fall back to string representation
            result = starts.apply(str).max() < ends.apply(str).min()
        if not result:
            logger.debug(
                f"Constraint {self} not satisfied because no overlapping timerange "
                f"found for groups in {', '.join(group['path'])}"
            )
            return group.loc[[]]
        return group

group_by instance-attribute #

The fields to group the datasets by. There must be overlap in time between the groups to fulfill the constraint.

apply(group, data_catalog) #

Check that all subgroups of the group have an overlapping timerange.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Check that all subgroups of the group have an overlapping timerange.
    """
    group_with_time = group.dropna(subset=["start_time", "end_time"])
    if len(group_with_time) < 2:  # noqa: PLR2004
        return group

    starts = group_with_time.groupby(list(self.group_by))["start_time"].min()
    ends = group_with_time.groupby(list(self.group_by))["end_time"].max()
    try:
        result = starts.max() < ends.min()
    except TypeError:
        # Cross-calendar cftime comparison: fall back to string representation
        result = starts.apply(str).max() < ends.apply(str).min()
    if not result:
        logger.debug(
            f"Constraint {self} not satisfied because no overlapping timerange "
            f"found for groups in {', '.join(group['path'])}"
        )
        return group.loc[[]]
    return group

RequireTimerange #

A constraint that requires datasets to have a specific timerange.

Specify the start and/or end of the required timerange using a precision that matches the frequency of the datasets.

For example, to ensure that datasets at monthly frequency cover the period from 2000 to 2010, use start=PartialDateTime(year=2000, month=1) and end=PartialDateTime(year=2010, month=12).

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
@frozen
class RequireTimerange:
    """
    A constraint that requires datasets to have a specific timerange.

    Specify the start and/or end of the required timerange using a precision
    that matches the frequency of the datasets.

    For example, to ensure that datasets at monthly frequency cover the period
    from 2000 to 2010, use start=PartialDateTime(year=2000, month=1) and
    end=PartialDateTime(year=2010, month=12).
    """

    group_by: tuple[str, ...]
    """
    The fields to group the datasets by. Groups that do not cover the timerange
    will be removed.
    """

    start: PartialDateTime | None = None
    """
    The start time of the required timerange. If None, no start time is required.
    """

    end: PartialDateTime | None = None
    """
    The end time of the required timerange. If None, no end time is required.
    """

    def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
        """
        Check that all subgroups of the group have a contiguous timerange.
        """
        select = pd.Series(True, index=group.index)
        for _, subgroup in group.dropna(subset=["start_time", "end_time"]).groupby(list(self.group_by)):
            start = subgroup["start_time"].min()
            end = subgroup["end_time"].max()
            result = True
            if self.start is not None and start > self.start:
                logger.debug(
                    f"Constraint {self} not satisfied because start time {start} "
                    f"is after required start time for {', '.join(subgroup['path'])}"
                )
                result = False
            if self.end is not None and end < self.end:
                logger.debug(
                    f"Constraint {self} not satisfied because end time {end} "
                    f"is before required end time for {', '.join(subgroup['path'])}"
                )
                result = False
            if result:
                contiguous_subgroup = RequireContiguousTimerange(group_by=self.group_by).apply(
                    subgroup, data_catalog
                )
                result = len(contiguous_subgroup) == len(subgroup)
            if not result:
                select.loc[subgroup.index] = False
        return group[select]

end = None class-attribute instance-attribute #

The end time of the required timerange. If None, no end time is required.

group_by instance-attribute #

The fields to group the datasets by. Groups that do not cover the timerange will be removed.

start = None class-attribute instance-attribute #

The start time of the required timerange. If None, no start time is required.

apply(group, data_catalog) #

Check that all subgroups of the group have a contiguous timerange.

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
    """
    Check that all subgroups of the group have a contiguous timerange.
    """
    select = pd.Series(True, index=group.index)
    for _, subgroup in group.dropna(subset=["start_time", "end_time"]).groupby(list(self.group_by)):
        start = subgroup["start_time"].min()
        end = subgroup["end_time"].max()
        result = True
        if self.start is not None and start > self.start:
            logger.debug(
                f"Constraint {self} not satisfied because start time {start} "
                f"is after required start time for {', '.join(subgroup['path'])}"
            )
            result = False
        if self.end is not None and end < self.end:
            logger.debug(
                f"Constraint {self} not satisfied because end time {end} "
                f"is before required end time for {', '.join(subgroup['path'])}"
            )
            result = False
        if result:
            contiguous_subgroup = RequireContiguousTimerange(group_by=self.group_by).apply(
                subgroup, data_catalog
            )
            result = len(contiguous_subgroup) == len(subgroup)
        if not result:
            select.loc[subgroup.index] = False
    return group[select]

apply_constraint(dataframe, constraint, data_catalog) #

Apply a constraint to a group of datasets

Parameters:

Name Type Description Default
dataframe DataFrame

The group of datasets to apply the constraint to.

required
constraint GroupConstraint

The constraint to apply.

required
data_catalog DataFrame

The data catalog of all datasets.

required

Returns:

Type Description
DataFrame | None

The updated group of datasets or None if the constraint was not satisfied

Source code in packages/climate-ref-core/src/climate_ref_core/constraints.py
def apply_constraint(
    dataframe: pd.DataFrame,
    constraint: GroupConstraint,
    data_catalog: pd.DataFrame,
) -> pd.DataFrame | None:
    """
    Apply a constraint to a group of datasets

    Parameters
    ----------
    dataframe:
        The group of datasets to apply the constraint to.
    constraint
        The constraint to apply.
    data_catalog
        The data catalog of all datasets.

    Returns
    -------
    :
        The updated group of datasets or None if the constraint was not satisfied
    """
    updated_group = constraint.apply(dataframe, data_catalog)
    if updated_group.empty:
        logger.debug(
            "Constraint {} not satisfied for {} rows",
            constraint,
            len(dataframe),
        )
        return None

    return updated_group