Skip to content

climate_ref_ilamb.standard #

ILAMBStandard #

Bases: Diagnostic

Apply the standard ILAMB analysis with respect to a given reference dataset.

Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
class ILAMBStandard(Diagnostic):
    """
    Apply the standard ILAMB analysis with respect to a given reference dataset.
    """

    def __init__(
        self,
        registry_file: str,
        metric_name: str,
        sources: dict[str, str | dict[str, str]],
        **ilamb_kwargs: Any,
    ):
        # Setup the diagnostic
        self.variable_id = ilamb_kwargs.get("analysis_variable", next(iter(sources.keys())))
        if "sources" not in ilamb_kwargs:  # pragma: no cover
            ilamb_kwargs["sources"] = sources
        if "relationships" not in ilamb_kwargs:
            ilamb_kwargs["relationships"] = {}
        if ilamb_kwargs["relationships"]:
            transforms = list(ilamb_kwargs.get("transforms", []))
            transform_names = [
                next(iter(transform.keys())) if isinstance(transform, dict) else transform
                for transform in transforms
            ]
            if "climate_ref_relationship_time" not in transform_names:
                transforms.append({"climate_ref_relationship_time": {"variable_id": self.variable_id}})
            ilamb_kwargs["transforms"] = transforms

        # Allow per-diagnostic override of the test source_id
        # (not all variables are available from CanESM5)
        test_source_id = ilamb_kwargs.pop("test_source_id", "CanESM5")

        self.ilamb_kwargs = ilamb_kwargs

        # REF stuff
        self.name = metric_name
        self.slug = self.name.lower().replace(" ", "-")

        # Collect all variable IDs used by this diagnostic
        all_variable_ids = (
            self.variable_id,
            *ilamb_kwargs.get("alternate_vars", []),
            *ilamb_kwargs.get("related_vars", []),
            *ilamb_kwargs.get("relationships", {}).keys(),
        )

        # Variables that the primary diagnostic requires (not relationship vars)
        primary_variable_ids = (
            self.variable_id,
            *ilamb_kwargs.get("alternate_vars", []),
            *ilamb_kwargs.get("related_vars", []),
        )

        # Look up CMIP7 branded variable names
        branded_variables = _get_branded_variable(all_variable_ids, registry_file)

        # Determine realm/region for CMIP7 filter based on registry
        is_land = registry_file in ("ilamb", "ilamb-test")

        relationship_variable_ids = tuple(ilamb_kwargs.get("relationships", {}).keys())

        # Create the data requirement for the dataset under test
        cmip6_requirement = _build_cmip_data_requirement(
            source_type=SourceDatasetType.CMIP6,
            filters={
                "variable_id": all_variable_ids,
                "frequency": "mon",
                "experiment_id": ("historical", "land-hist"),
                "table_id": (
                    "AERmonZ",
                    "Amon",
                    "CFmon",
                    "Emon",
                    "EmonZ",
                    "LImon",
                    "Lmon",
                    "Omon",
                    "SImon",
                ),
            },
            group_by=("experiment_id", "source_id", "member_id", "grid_label"),
            primary_variable_ids=primary_variable_ids,
            relationship_variable_ids=relationship_variable_ids,
            is_land=is_land,
        )

        cmip7_requirement = _build_cmip_data_requirement(
            source_type=SourceDatasetType.CMIP7,
            filters={
                "branded_variable": branded_variables,
                "frequency": "mon",
                "experiment_id": ("historical", "land-hist"),
                "region": "glb",
            },
            group_by=("experiment_id", "source_id", "variant_label", "grid_label"),
            primary_variable_ids=primary_variable_ids,
            relationship_variable_ids=relationship_variable_ids,
            is_land=is_land,
        )

        # obs4MIPs data requirement, normally ilamb3 expects the `sources` to
        # resolve to keys in one of its data registries. If instead we find a
        # dictionary, then assume that these keys are meant to be keywords in a
        # REF data requirement.
        # obs_source key is used to determine whether to fetch from ESGF
        # or use the pre-fetched obs4REF registry.
        filters: dict[str, tuple[str, ...]] = {}
        obs_source = None
        for _, source in sources.items():
            if isinstance(source, dict):
                obs_source = source.pop("obs_source", None)
                for key, val in source.items():
                    if key not in filters:
                        filters[key] = ()
                    filters[key] += (val,)
        obs4mips_requirement = (
            DataRequirement(
                source_type=SourceDatasetType.obs4MIPs,
                filters=(FacetFilter(facets=filters),),
                group_by=tuple(filters.keys()),
            )
            if filters
            else None
        )

        data_requirements: Sequence[Sequence[DataRequirement]]
        if obs4mips_requirement is None:
            data_requirements = (
                (cmip6_requirement,),
                (cmip7_requirement,),
            )
        else:
            data_requirements = (
                (cmip6_requirement, obs4mips_requirement),
                (cmip7_requirement, obs4mips_requirement),
            )
        self.data_requirements = data_requirements

        self.facets = (
            "experiment_id",
            "source_id",
            "member_id",
            "grid_label",
            "region",
            "metric",
            "statistic",
        )

        self.test_data_spec = _build_test_data_spec(
            all_variable_ids=all_variable_ids,
            registry_file=registry_file,
            test_source_id=test_source_id,
            is_land=is_land,
            obs_filters=filters,
            obs_source=obs_source,
        )

        # Setup ILAMB data and options
        self.registry_file = registry_file
        self.registry = dataset_registry_manager[self.registry_file]
        self.ilamb_data = registry_to_collection(
            dataset_registry_manager[self.registry_file],
        )

    def execute(self, definition: ExecutionDefinition) -> None:
        """
        Run the ILAMB standard analysis.
        """
        _set_ilamb3_options(self.registry, self.registry_file)
        # Temporary hack of the ilamb3 inputs while we still need to refer to
        # data not yet available in obs4{MIPs,REF}. This logic allows for
        # DataRequirement filters to be added as a 'source' in the ilamb
        # configure file. If a dictionary instead of a string was found, we
        # populate an obs4MIPs requirement.
        if SourceDatasetType.obs4MIPs in definition.datasets:
            # ilamb3 will expect the reference dataset dataframe to have a `key`
            # column that uniquely describes each dataset. Create one using the
            # `instance_id` and an integer and then modify the ilamb3 configure
            # so that it finds the proper data.
            ref_datasets = definition.datasets[SourceDatasetType.obs4MIPs].datasets
            ref_datasets = ref_datasets.reset_index()
            ref_datasets["key"] = ref_datasets["instance_id"] + ref_datasets.index.astype(str)
            for instance_id, df in ref_datasets.groupby("instance_id"):
                variable_id = df["variable_id"].unique()[0]
                self.ilamb_kwargs["sources"][variable_id] = f"{instance_id}*"
        else:
            # If the data is not ingested yet but in a registry, we concat the
            # obs4REF registries to the ilamb one so that any key may be used
            # from either. Eventually (?) this can be removed.
            ref_datasets = pd.concat(
                [
                    self.ilamb_data.datasets.set_index(self.ilamb_data.slug_column),
                    registry_to_collection(dataset_registry_manager["obs4ref"]).datasets.set_index("key"),
                ]
            )
        cmip_source = _get_cmip_source_type(definition.datasets)
        model_datasets = definition.datasets[cmip_source].datasets

        # CMIP7 uses variant_label instead of member_id; ilamb3 expects member_id
        if cmip_source == SourceDatasetType.CMIP7 and "member_id" not in model_datasets.columns:
            model_datasets = model_datasets.copy()
            model_datasets["member_id"] = model_datasets["variant_label"]

        # When both a 3D primary variable (e.g. thetao) and its 2D alternate
        # (e.g. tos) are present, drop the primary so ilamb3 uses the alternate.
        # This avoids two problems with merging datasets of different time ranges:
        # 1) time_bnds gets NaN-filled for the shorter variable's missing steps,
        #    corrupting the time frequency calculation in ilamb3
        # 2) The 3D variable may not cover the reference period (e.g. thetao
        #    covers 1850-1860 but WOA2023 reference covers 2005-2014)
        # The alternate surface variable is equivalent after select_depth and
        # typically has full temporal coverage.
        alternate_vars = self.ilamb_kwargs.get("alternate_vars", [])
        if alternate_vars:
            available_alternates = [v for v in alternate_vars if v in model_datasets["variable_id"].values]
            if available_alternates and self.variable_id in model_datasets["variable_id"].values:
                model_datasets = model_datasets[model_datasets["variable_id"] != self.variable_id]

        # In ilamb3, this is run with all the models that we know about to
        # create different colors. For REF this will at least make the model
        # line color not be black
        run.set_model_colors(model_datasets)

        # Run ILAMB in a single-threaded mode to avoid issues with multithreading (#394)
        with dask.config.set(scheduler="synchronous"):
            run.run_single_block(
                self.slug,
                ref_datasets,
                model_datasets,
                definition.output_directory,
                **self.ilamb_kwargs,
            )

    def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
        """
        Build the diagnostic result after running ILAMB.

        Parameters
        ----------
        definition
            The definition of the diagnostic execution

        Returns
        -------
            An execution result object
        """
        _set_ilamb3_options(self.registry, self.registry_file)
        # In ILAMB, scalars are saved in CSV files in the output directory. To
        # be compatible with the REF system we will need to add the metadata
        # that is associated with the execution group, called the selector.
        df = _load_csv_and_merge(definition.output_directory)
        selectors = _get_selectors(definition.datasets)

        # TODO: Fix reference data once we are using the obs4MIPs dataset
        dataset_source = self.name.split("-")[1] if "-" in self.name else "None"
        common_dimensions = {**selectors, "reference_source_id": dataset_source}
        for key, value in common_dimensions.items():
            df[key] = value
        metric_bundle = CMECMetric.model_validate(_build_cmec_bundle(df))

        # Add each png file plot to the output
        output_bundle = CMECOutput.create_template()
        for plotfile in definition.output_directory.glob("*.png"):
            relative_path = str(definition.as_relative_path(plotfile))
            caption, figure_dimensions = _caption_from_filename(plotfile, common_dimensions)

            output_bundle[OutputCV.PLOTS.value][relative_path] = {
                OutputCV.FILENAME.value: relative_path,
                OutputCV.LONG_NAME.value: caption,
                OutputCV.DESCRIPTION.value: "",
                OutputCV.DIMENSIONS.value: figure_dimensions,
            }

        # Add the html page to the output
        index_html = definition.to_output_path("index.html")
        if index_html.exists():
            relative_path = str(definition.as_relative_path(index_html))
            output_bundle[OutputCV.HTML.value][relative_path] = {
                OutputCV.FILENAME.value: relative_path,
                OutputCV.LONG_NAME.value: "Results page",
                OutputCV.DESCRIPTION.value: "Page displaying scalars and plots from the ILAMB execution.",
                OutputCV.DIMENSIONS.value: common_dimensions,
            }
            output_bundle[OutputCV.INDEX.value] = relative_path

        # Add series to the output based on the time traces we find in the
        # output files
        series = []
        for ncfile in definition.output_directory.glob("*.nc"):
            time_coder = xr.coders.CFDatetimeCoder(use_cftime=True)
            ds = xr.open_dataset(ncfile, decode_times=time_coder)
            for name, da in ds.items():
                # Only create series for 1d DataArray's with these dimensions
                if not (da.ndim == 1 and set(da.dims).intersection(["time", "month"])):
                    continue
                # Convert dimension values
                attrs = {
                    "units": da.attrs.get("units", ""),
                    "long_name": da.attrs.get("long_name", str(name)),
                    "standard_name": da.attrs.get("standard_name", ""),
                }
                str_name = str(name)
                index_name = str(da.dims[0])
                index = ds[index_name].values.tolist()
                if hasattr(index[0], "isoformat"):
                    index = [v.isoformat() for v in index]
                if hasattr(index[0], "calendar"):
                    attrs["calendar"] = index[0].calendar

                # Parse out some dimensions
                if ncfile.stem == "Reference":
                    dimensions = {
                        "source_id": "Reference",
                        "metric": str_name,
                    }
                else:
                    dimensions = {"metric": str_name, **common_dimensions}

                # Split the metric into metric and region if possible
                if "_" in str_name:
                    metric_name, region_name = str_name.split("_", maxsplit=1)
                    dimensions["metric"] = metric_name
                    dimensions["region"] = region_name
                else:
                    dimensions["region"] = "None"

                series.append(
                    SeriesMetricValue(
                        dimensions=dimensions,
                        values=da.values.tolist(),
                        index=index,
                        index_name=index_name,
                        attributes=attrs,
                    )
                )

        return ExecutionResult.build_from_output_bundle(
            definition, cmec_output_bundle=output_bundle, cmec_metric_bundle=metric_bundle, series=series
        )

build_execution_result(definition) #

Build the diagnostic result after running ILAMB.

Parameters:

Name Type Description Default
definition ExecutionDefinition

The definition of the diagnostic execution

required

Returns:

Type Description
An execution result object
Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
    """
    Build the diagnostic result after running ILAMB.

    Parameters
    ----------
    definition
        The definition of the diagnostic execution

    Returns
    -------
        An execution result object
    """
    _set_ilamb3_options(self.registry, self.registry_file)
    # In ILAMB, scalars are saved in CSV files in the output directory. To
    # be compatible with the REF system we will need to add the metadata
    # that is associated with the execution group, called the selector.
    df = _load_csv_and_merge(definition.output_directory)
    selectors = _get_selectors(definition.datasets)

    # TODO: Fix reference data once we are using the obs4MIPs dataset
    dataset_source = self.name.split("-")[1] if "-" in self.name else "None"
    common_dimensions = {**selectors, "reference_source_id": dataset_source}
    for key, value in common_dimensions.items():
        df[key] = value
    metric_bundle = CMECMetric.model_validate(_build_cmec_bundle(df))

    # Add each png file plot to the output
    output_bundle = CMECOutput.create_template()
    for plotfile in definition.output_directory.glob("*.png"):
        relative_path = str(definition.as_relative_path(plotfile))
        caption, figure_dimensions = _caption_from_filename(plotfile, common_dimensions)

        output_bundle[OutputCV.PLOTS.value][relative_path] = {
            OutputCV.FILENAME.value: relative_path,
            OutputCV.LONG_NAME.value: caption,
            OutputCV.DESCRIPTION.value: "",
            OutputCV.DIMENSIONS.value: figure_dimensions,
        }

    # Add the html page to the output
    index_html = definition.to_output_path("index.html")
    if index_html.exists():
        relative_path = str(definition.as_relative_path(index_html))
        output_bundle[OutputCV.HTML.value][relative_path] = {
            OutputCV.FILENAME.value: relative_path,
            OutputCV.LONG_NAME.value: "Results page",
            OutputCV.DESCRIPTION.value: "Page displaying scalars and plots from the ILAMB execution.",
            OutputCV.DIMENSIONS.value: common_dimensions,
        }
        output_bundle[OutputCV.INDEX.value] = relative_path

    # Add series to the output based on the time traces we find in the
    # output files
    series = []
    for ncfile in definition.output_directory.glob("*.nc"):
        time_coder = xr.coders.CFDatetimeCoder(use_cftime=True)
        ds = xr.open_dataset(ncfile, decode_times=time_coder)
        for name, da in ds.items():
            # Only create series for 1d DataArray's with these dimensions
            if not (da.ndim == 1 and set(da.dims).intersection(["time", "month"])):
                continue
            # Convert dimension values
            attrs = {
                "units": da.attrs.get("units", ""),
                "long_name": da.attrs.get("long_name", str(name)),
                "standard_name": da.attrs.get("standard_name", ""),
            }
            str_name = str(name)
            index_name = str(da.dims[0])
            index = ds[index_name].values.tolist()
            if hasattr(index[0], "isoformat"):
                index = [v.isoformat() for v in index]
            if hasattr(index[0], "calendar"):
                attrs["calendar"] = index[0].calendar

            # Parse out some dimensions
            if ncfile.stem == "Reference":
                dimensions = {
                    "source_id": "Reference",
                    "metric": str_name,
                }
            else:
                dimensions = {"metric": str_name, **common_dimensions}

            # Split the metric into metric and region if possible
            if "_" in str_name:
                metric_name, region_name = str_name.split("_", maxsplit=1)
                dimensions["metric"] = metric_name
                dimensions["region"] = region_name
            else:
                dimensions["region"] = "None"

            series.append(
                SeriesMetricValue(
                    dimensions=dimensions,
                    values=da.values.tolist(),
                    index=index,
                    index_name=index_name,
                    attributes=attrs,
                )
            )

    return ExecutionResult.build_from_output_bundle(
        definition, cmec_output_bundle=output_bundle, cmec_metric_bundle=metric_bundle, series=series
    )

execute(definition) #

Run the ILAMB standard analysis.

Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
def execute(self, definition: ExecutionDefinition) -> None:
    """
    Run the ILAMB standard analysis.
    """
    _set_ilamb3_options(self.registry, self.registry_file)
    # Temporary hack of the ilamb3 inputs while we still need to refer to
    # data not yet available in obs4{MIPs,REF}. This logic allows for
    # DataRequirement filters to be added as a 'source' in the ilamb
    # configure file. If a dictionary instead of a string was found, we
    # populate an obs4MIPs requirement.
    if SourceDatasetType.obs4MIPs in definition.datasets:
        # ilamb3 will expect the reference dataset dataframe to have a `key`
        # column that uniquely describes each dataset. Create one using the
        # `instance_id` and an integer and then modify the ilamb3 configure
        # so that it finds the proper data.
        ref_datasets = definition.datasets[SourceDatasetType.obs4MIPs].datasets
        ref_datasets = ref_datasets.reset_index()
        ref_datasets["key"] = ref_datasets["instance_id"] + ref_datasets.index.astype(str)
        for instance_id, df in ref_datasets.groupby("instance_id"):
            variable_id = df["variable_id"].unique()[0]
            self.ilamb_kwargs["sources"][variable_id] = f"{instance_id}*"
    else:
        # If the data is not ingested yet but in a registry, we concat the
        # obs4REF registries to the ilamb one so that any key may be used
        # from either. Eventually (?) this can be removed.
        ref_datasets = pd.concat(
            [
                self.ilamb_data.datasets.set_index(self.ilamb_data.slug_column),
                registry_to_collection(dataset_registry_manager["obs4ref"]).datasets.set_index("key"),
            ]
        )
    cmip_source = _get_cmip_source_type(definition.datasets)
    model_datasets = definition.datasets[cmip_source].datasets

    # CMIP7 uses variant_label instead of member_id; ilamb3 expects member_id
    if cmip_source == SourceDatasetType.CMIP7 and "member_id" not in model_datasets.columns:
        model_datasets = model_datasets.copy()
        model_datasets["member_id"] = model_datasets["variant_label"]

    # When both a 3D primary variable (e.g. thetao) and its 2D alternate
    # (e.g. tos) are present, drop the primary so ilamb3 uses the alternate.
    # This avoids two problems with merging datasets of different time ranges:
    # 1) time_bnds gets NaN-filled for the shorter variable's missing steps,
    #    corrupting the time frequency calculation in ilamb3
    # 2) The 3D variable may not cover the reference period (e.g. thetao
    #    covers 1850-1860 but WOA2023 reference covers 2005-2014)
    # The alternate surface variable is equivalent after select_depth and
    # typically has full temporal coverage.
    alternate_vars = self.ilamb_kwargs.get("alternate_vars", [])
    if alternate_vars:
        available_alternates = [v for v in alternate_vars if v in model_datasets["variable_id"].values]
        if available_alternates and self.variable_id in model_datasets["variable_id"].values:
            model_datasets = model_datasets[model_datasets["variable_id"] != self.variable_id]

    # In ilamb3, this is run with all the models that we know about to
    # create different colors. For REF this will at least make the model
    # line color not be black
    run.set_model_colors(model_datasets)

    # Run ILAMB in a single-threaded mode to avoid issues with multithreading (#394)
    with dask.config.set(scheduler="synchronous"):
        run.run_single_block(
            self.slug,
            ref_datasets,
            model_datasets,
            definition.output_directory,
            **self.ilamb_kwargs,
        )

format_cmec_output_bundle(dataset, dimensions, metadata_columns, value_column='value') #

Create a CMEC output bundle for the dataset.

Parameters:

Name Type Description Default
dataset DataFrame

Processed dataset

required
dimensions list[str]

The dimensions of the dataset (e.g., ["source_id", "member_id", "region"])

required
metadata_columns list[str]

The columns to be used as metadata (e.g., ["Description", "LongName"])

required
value_column str

The column containing the values

'value'

Returns:

Type Description
A CMEC output bundle ready to be written to disk
Source code in packages/climate-ref-ilamb/src/climate_ref_ilamb/standard.py
def format_cmec_output_bundle(
    dataset: pd.DataFrame,
    dimensions: list[str],
    metadata_columns: list[str],
    value_column: str = "value",
) -> dict[str, Any]:
    """
    Create a CMEC output bundle for the dataset.

    Parameters
    ----------
    dataset
        Processed dataset
    dimensions
        The dimensions of the dataset (e.g., ["source_id", "member_id", "region"])
    metadata_columns
        The columns to be used as metadata (e.g., ["Description", "LongName"])
    value_column
        The column containing the values

    Returns
    -------
        A CMEC output bundle ready to be written to disk
    """
    # Validate that all required columns exist
    required_columns = set(dimensions) | {value_column} | set(metadata_columns)
    missing_columns = required_columns - set(dataset.columns)
    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}")

    # Build the dimensions section
    dimensions_dict: dict[str, dict[str, dict[str, str]]] = {}

    # For each dimension, create a dictionary of unique values and their metadata
    for dim in dimensions:
        unique_values = dataset[dim].unique()
        dim_dict: dict[str, dict[str, str]] = {}

        for val in unique_values:
            # Get the row for this dimension value

            dim_dict[str(val)] = {}

            if dim == dimensions[-1]:
                # If this is the last dimension, add the value column to the metadata

                dim_dict[str(val)] = dataset[dataset[dim] == val].iloc[0][metadata_columns].to_dict()

        dimensions_dict[dim] = dim_dict

    # Build the results section - create nested structure based on dimensions
    def nest_results(df: pd.DataFrame, dims: list[str]) -> dict[str, Any] | float:
        if not dims:
            return float(df[value_column].iloc[0].item())

        current_dim = dims[0]
        remaining_dims = dims[1:]

        return {
            str(group_name): nest_results(group_df, remaining_dims)
            for group_name, group_df in df.groupby(current_dim)
        }

    results = nest_results(dataset, list(dimensions))

    return {"DIMENSIONS": {"json_structure": list(dimensions), **dimensions_dict}, "RESULTS": results}