Skip to content

climate_ref_pmp.diagnostics.enso #

ENSO #

Bases: CommandLineDiagnostic

Calculate the ENSO performance metrics for a dataset

Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/enso.py
class ENSO(CommandLineDiagnostic):
    """
    Calculate the ENSO performance metrics for a dataset
    """

    facets = (
        "mip_id",
        "source_id",
        "member_id",
        "grid_label",
        "experiment_id",
        "metric",
        "reference_datasets",
    )

    def __init__(self, metrics_collection: str, experiments: Collection[str] = ("historical",)) -> None:
        self.name = metrics_collection
        self.slug = metrics_collection.lower()
        self.metrics_collection = metrics_collection
        self.parameter_file = "pmp_param_enso.py"
        self.obs_sources: tuple[str, ...]
        self.model_variables: tuple[str, ...]

        if metrics_collection == "ENSO_perf":  # pragma: no cover
            self.model_variables = ("pr", "ts", "tauu")
            self.obs_sources = ("GPCP-Monthly-3-2", "TropFlux-1-0", "HadISST-1-1")
        elif metrics_collection == "ENSO_tel":
            self.model_variables = ("pr", "ts")
            self.obs_sources = ("GPCP-Monthly-3-2", "TropFlux-1-0", "HadISST-1-1")
        elif metrics_collection == "ENSO_proc":
            self.model_variables = ("ts", "tauu", "hfls", "hfss", "rlds", "rlus", "rsds", "rsus")
            self.obs_sources = (
                "GPCP-Monthly-3-2",
                "TropFlux-1-0",
                "HadISST-1-1",
                "CERES-EBAF-4-2",
            )
        else:
            raise ValueError(
                f"Unknown metrics collection: {metrics_collection}. "
                "Valid options are: ENSO_perf, ENSO_tel, ENSO_proc"
            )

        self.data_requirements = self._get_data_requirements(experiments)

        self.test_data_spec = TestDataSpecification(
            test_cases=(
                TestCase(
                    name="cmip6",
                    description=f"Test {metrics_collection} with CMIP6 data",
                    requests=(
                        RegistryRequest(
                            slug=f"enso-{self.slug}-obs",
                            registry_name="obs4ref",
                            source_type="obs4MIPs",
                            facets={
                                "source_id": self.obs_sources,
                                "variable_id": self.model_variables,
                            },
                        ),
                        CMIP6Request(
                            slug=f"enso-{self.slug}-cmip6",
                            facets={
                                "source_id": "ACCESS-ESM1-5",
                                "experiment_id": "historical",
                                "variable_id": self.model_variables,
                                "member_id": "r1i1p1f1",
                                "table_id": "Amon",
                            },
                            time_span=("2000-01", "2014-12"),
                        ),
                        CMIP6Request(
                            slug=f"enso-{self.slug}-areacella",
                            facets={
                                "source_id": "ACCESS-ESM1-5",
                                "experiment_id": "historical",
                                "variable_id": "areacella",
                                "member_id": "r1i1p1f1",
                                "table_id": "fx",
                            },
                        ),
                        CMIP6Request(
                            slug=f"enso-{self.slug}-sftlf",
                            facets={
                                "source_id": "ACCESS-ESM1-5",
                                "experiment_id": "historical",
                                "variable_id": "sftlf",
                                "member_id": "r1i1p1f1",
                                "table_id": "fx",
                            },
                        ),
                    ),
                ),
                TestCase(
                    name="cmip7",
                    description=f"CMIP7 test case for {metrics_collection}",
                    requests=(
                        RegistryRequest(
                            slug=f"enso-{self.slug}-obs-cmip7",
                            registry_name="obs4ref",
                            source_type="obs4MIPs",
                            facets={
                                "source_id": self.obs_sources,
                                "variable_id": self.model_variables,
                            },
                        ),
                        CMIP7Request(
                            slug=f"enso-{self.slug}-cmip7",
                            facets={
                                "source_id": "ACCESS-ESM1-5",
                                "experiment_id": "historical",
                                "variable_id": self.model_variables,
                                "branded_variable": tuple(
                                    _BRANDED_VARIABLE_NAMES[v] for v in self.model_variables
                                ),
                                "variant_label": "r1i1p1f1",
                                "frequency": "mon",
                                "region": "glb",
                            },
                            time_span=("2000-01", "2014-12"),
                        ),
                        CMIP7Request(
                            slug=f"enso-{self.slug}-areacella-cmip7",
                            facets={
                                "source_id": "ACCESS-ESM1-5",
                                "experiment_id": "historical",
                                "variable_id": "areacella",
                                "branded_variable": "areacella_ti-u-hxy-u",
                                "variant_label": "r1i1p1f1",
                                "frequency": "fx",
                                "region": "glb",
                            },
                        ),
                        CMIP7Request(
                            slug=f"enso-{self.slug}-sftlf-cmip7",
                            facets={
                                "source_id": "ACCESS-ESM1-5",
                                "experiment_id": "historical",
                                "variable_id": "sftlf",
                                "branded_variable": "sftlf_ti-u-hxy-u",
                                "variant_label": "r1i1p1f1",
                                "frequency": "fx",
                                "region": "glb",
                            },
                        ),
                    ),
                ),
            ),
        )

    def _get_data_requirements(
        self,
        experiments: Collection[str] = ("historical",),
    ) -> tuple[tuple[DataRequirement, DataRequirement], ...]:
        cmip6_filters = [
            FacetFilter(
                facets={
                    "frequency": "mon",
                    "experiment_id": tuple(experiments),
                    "variable_id": self.model_variables,
                }
            )
        ]

        cmip7_filters = [
            FacetFilter(
                facets={
                    "branded_variable": tuple(_BRANDED_VARIABLE_NAMES[v] for v in self.model_variables),
                    "experiment_id": tuple(experiments),
                    "frequency": "mon",
                    "region": "glb",
                }
            )
        ]

        obs_requirement = DataRequirement(
            source_type=SourceDatasetType.obs4MIPs,
            filters=(
                FacetFilter(facets={"source_id": self.obs_sources, "variable_id": self.model_variables}),
            ),
            group_by=("activity_id",),
        )
        cmip6_requirement = DataRequirement(
            source_type=SourceDatasetType.CMIP6,
            filters=tuple(cmip6_filters),
            group_by=("source_id", "experiment_id", "member_id", "grid_label"),
            constraints=(
                AddSupplementaryDataset.from_defaults("areacella", SourceDatasetType.CMIP6),
                AddSupplementaryDataset.from_defaults("sftlf", SourceDatasetType.CMIP6),
            ),
        )
        cmip7_requirement = DataRequirement(
            source_type=SourceDatasetType.CMIP7,
            filters=tuple(cmip7_filters),
            group_by=("source_id", "experiment_id", "variant_label", "grid_label"),
            constraints=(
                AddSupplementaryDataset.from_defaults("areacella", SourceDatasetType.CMIP7),
                AddSupplementaryDataset.from_defaults("sftlf", SourceDatasetType.CMIP7),
            ),
        )

        return (
            (obs_requirement, cmip6_requirement),
            (obs_requirement, cmip7_requirement),
        )

    def build_cmd(self, definition: ExecutionDefinition) -> Iterable[str]:
        """
        Run the diagnostic on the given configuration.

        Parameters
        ----------
        definition : ExecutionDefinition
            The configuration to run the diagnostic on.

        Returns
        -------
        :
            The result of running the diagnostic.
        """
        mc_name = self.metrics_collection

        # ------------------------------------------------
        # Get the input datasets information for the model
        # ------------------------------------------------
        model_source_type = get_model_source_type(definition)
        input_datasets = definition.datasets[model_source_type]
        source_id = input_datasets["source_id"].unique()[0]
        member_id_col = "variant_label" if model_source_type == SourceDatasetType.CMIP7 else "member_id"
        member_id = input_datasets[member_id_col].unique()[0]
        experiment_id = input_datasets["experiment_id"].unique()[0]
        variable_ids = set(input_datasets["variable_id"].unique()) - {"areacella", "sftlf"}
        mod_run = f"{source_id}_{member_id}"

        # We only need one entry for the model run
        dict_mod: dict[str, dict[str, Any]] = {mod_run: {}}

        def extract_variable(dc: DatasetCollection, variable: str) -> list[str]:
            return dc.datasets[input_datasets["variable_id"] == variable]["path"].to_list()  # type: ignore

        # TO DO: Get the path to the files per variable
        for variable in variable_ids:
            list_files = extract_variable(input_datasets, variable)
            list_areacella = extract_variable(input_datasets, "areacella")
            list_sftlf = extract_variable(input_datasets, "sftlf")

            if len(list_files) > 0:
                dict_mod[mod_run][variable] = {
                    "path + filename": list_files,
                    "varname": variable,
                    "path + filename_area": list_areacella,
                    "areaname": "areacella",
                    "path + filename_landmask": list_sftlf,
                    "landmaskname": "sftlf",
                }

        # -------------------------------------------------------
        # Get the input datasets information for the observations
        # -------------------------------------------------------
        reference_dataset = definition.datasets[SourceDatasetType.obs4MIPs]
        reference_dataset_names = reference_dataset["source_id"].unique()

        dict_obs: dict[str, dict[str, Any]] = {}

        # TO DO: Get the path to the files per variable and per source
        for obs_name in reference_dataset_names:
            dict_obs[obs_name] = {}
            for variable in variable_ids:
                # Get the list of files for the current variable and observation source
                list_files = reference_dataset.datasets[
                    (reference_dataset["variable_id"] == variable)
                    & (reference_dataset["source_id"] == obs_name)
                ]["path"].to_list()
                # If the list is not empty, add it to the dictionary
                if len(list_files) > 0:
                    dict_obs[obs_name][variable] = {
                        "path + filename": list_files,
                        "varname": variable,
                    }

        # Create input directory
        dict_datasets = {
            "model": dict_mod,
            "observations": dict_obs,
            "metricsCollection": mc_name,
            "experiment_id": experiment_id,
        }

        # Create JSON file for dictDatasets
        json_file = os.path.join(
            definition.output_directory, f"input_{mc_name}_{source_id}_{experiment_id}_{member_id}.json"
        )
        with open(json_file, "w") as f:
            json.dump(dict_datasets, f, indent=4)
        logger.debug(f"JSON file created: {json_file}")

        driver_file = _get_resource("climate_ref_pmp.drivers", "enso_driver.py", use_resources=True)
        return [
            "python",
            driver_file,
            "--metrics_collection",
            mc_name,
            "--experiment_id",
            experiment_id,
            "--input_json_path",
            json_file,
            "--output_directory",
            str(definition.output_directory),
        ]

    def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
        """
        Build a diagnostic result from the output of the PMP driver

        Parameters
        ----------
        definition
            Definition of the diagnostic execution

        Returns
        -------
            Result of the diagnostic execution
        """
        model_source_type = get_model_source_type(definition)
        input_datasets = definition.datasets[model_source_type]
        source_id = input_datasets["source_id"].unique()[0]
        experiment_id = input_datasets["experiment_id"].unique()[0]
        member_id_col = "variant_label" if model_source_type == SourceDatasetType.CMIP7 else "member_id"
        member_id = input_datasets[member_id_col].unique()[0]
        mc_name = self.metrics_collection
        pattern = f"{mc_name}_{source_id}_{experiment_id}_{member_id}"

        # Find the results files
        results_files = list(definition.output_directory.glob(f"{pattern}_cmec.json"))
        logger.debug(f"Results files: {results_files}")

        if len(results_files) != 1:  # pragma: no cover
            logger.warning(f"A single cmec output file not found: {results_files}")
            return ExecutionResult.build_from_failure(definition)

        # Find the other outputs
        png_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.png")]
        data_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.nc")]

        cmec_output, cmec_metric = process_json_result(results_files[0], png_files, data_files)

        cmec_metric_bundle = cmec_metric.remove_dimensions(
            [
                "model",
                "realization",
            ],
        ).prepend_dimensions(
            {
                "mip_id": model_source_type.value,
                "source_id": source_id,
                "member_id": member_id,
                "grid_label": input_datasets["grid_label"].unique()[0],
                "experiment_id": experiment_id,
            }
        )

        return ExecutionResult.build_from_output_bundle(
            definition,
            cmec_output_bundle=cmec_output,
            cmec_metric_bundle=cmec_metric_bundle,
        )

build_cmd(definition) #

Run the diagnostic on the given configuration.

Parameters:

Name Type Description Default
definition ExecutionDefinition

The configuration to run the diagnostic on.

required

Returns:

Type Description
Iterable[str]

The result of running the diagnostic.

Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/enso.py
def build_cmd(self, definition: ExecutionDefinition) -> Iterable[str]:
    """
    Run the diagnostic on the given configuration.

    Parameters
    ----------
    definition : ExecutionDefinition
        The configuration to run the diagnostic on.

    Returns
    -------
    :
        The result of running the diagnostic.
    """
    mc_name = self.metrics_collection

    # ------------------------------------------------
    # Get the input datasets information for the model
    # ------------------------------------------------
    model_source_type = get_model_source_type(definition)
    input_datasets = definition.datasets[model_source_type]
    source_id = input_datasets["source_id"].unique()[0]
    member_id_col = "variant_label" if model_source_type == SourceDatasetType.CMIP7 else "member_id"
    member_id = input_datasets[member_id_col].unique()[0]
    experiment_id = input_datasets["experiment_id"].unique()[0]
    variable_ids = set(input_datasets["variable_id"].unique()) - {"areacella", "sftlf"}
    mod_run = f"{source_id}_{member_id}"

    # We only need one entry for the model run
    dict_mod: dict[str, dict[str, Any]] = {mod_run: {}}

    def extract_variable(dc: DatasetCollection, variable: str) -> list[str]:
        return dc.datasets[input_datasets["variable_id"] == variable]["path"].to_list()  # type: ignore

    # TO DO: Get the path to the files per variable
    for variable in variable_ids:
        list_files = extract_variable(input_datasets, variable)
        list_areacella = extract_variable(input_datasets, "areacella")
        list_sftlf = extract_variable(input_datasets, "sftlf")

        if len(list_files) > 0:
            dict_mod[mod_run][variable] = {
                "path + filename": list_files,
                "varname": variable,
                "path + filename_area": list_areacella,
                "areaname": "areacella",
                "path + filename_landmask": list_sftlf,
                "landmaskname": "sftlf",
            }

    # -------------------------------------------------------
    # Get the input datasets information for the observations
    # -------------------------------------------------------
    reference_dataset = definition.datasets[SourceDatasetType.obs4MIPs]
    reference_dataset_names = reference_dataset["source_id"].unique()

    dict_obs: dict[str, dict[str, Any]] = {}

    # TO DO: Get the path to the files per variable and per source
    for obs_name in reference_dataset_names:
        dict_obs[obs_name] = {}
        for variable in variable_ids:
            # Get the list of files for the current variable and observation source
            list_files = reference_dataset.datasets[
                (reference_dataset["variable_id"] == variable)
                & (reference_dataset["source_id"] == obs_name)
            ]["path"].to_list()
            # If the list is not empty, add it to the dictionary
            if len(list_files) > 0:
                dict_obs[obs_name][variable] = {
                    "path + filename": list_files,
                    "varname": variable,
                }

    # Create input directory
    dict_datasets = {
        "model": dict_mod,
        "observations": dict_obs,
        "metricsCollection": mc_name,
        "experiment_id": experiment_id,
    }

    # Create JSON file for dictDatasets
    json_file = os.path.join(
        definition.output_directory, f"input_{mc_name}_{source_id}_{experiment_id}_{member_id}.json"
    )
    with open(json_file, "w") as f:
        json.dump(dict_datasets, f, indent=4)
    logger.debug(f"JSON file created: {json_file}")

    driver_file = _get_resource("climate_ref_pmp.drivers", "enso_driver.py", use_resources=True)
    return [
        "python",
        driver_file,
        "--metrics_collection",
        mc_name,
        "--experiment_id",
        experiment_id,
        "--input_json_path",
        json_file,
        "--output_directory",
        str(definition.output_directory),
    ]

build_execution_result(definition) #

Build a diagnostic result from the output of the PMP driver

Parameters:

Name Type Description Default
definition ExecutionDefinition

Definition of the diagnostic execution

required

Returns:

Type Description
Result of the diagnostic execution
Source code in packages/climate-ref-pmp/src/climate_ref_pmp/diagnostics/enso.py
def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
    """
    Build a diagnostic result from the output of the PMP driver

    Parameters
    ----------
    definition
        Definition of the diagnostic execution

    Returns
    -------
        Result of the diagnostic execution
    """
    model_source_type = get_model_source_type(definition)
    input_datasets = definition.datasets[model_source_type]
    source_id = input_datasets["source_id"].unique()[0]
    experiment_id = input_datasets["experiment_id"].unique()[0]
    member_id_col = "variant_label" if model_source_type == SourceDatasetType.CMIP7 else "member_id"
    member_id = input_datasets[member_id_col].unique()[0]
    mc_name = self.metrics_collection
    pattern = f"{mc_name}_{source_id}_{experiment_id}_{member_id}"

    # Find the results files
    results_files = list(definition.output_directory.glob(f"{pattern}_cmec.json"))
    logger.debug(f"Results files: {results_files}")

    if len(results_files) != 1:  # pragma: no cover
        logger.warning(f"A single cmec output file not found: {results_files}")
        return ExecutionResult.build_from_failure(definition)

    # Find the other outputs
    png_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.png")]
    data_files = [definition.as_relative_path(f) for f in definition.output_directory.glob("*.nc")]

    cmec_output, cmec_metric = process_json_result(results_files[0], png_files, data_files)

    cmec_metric_bundle = cmec_metric.remove_dimensions(
        [
            "model",
            "realization",
        ],
    ).prepend_dimensions(
        {
            "mip_id": model_source_type.value,
            "source_id": source_id,
            "member_id": member_id,
            "grid_label": input_datasets["grid_label"].unique()[0],
            "experiment_id": experiment_id,
        }
    )

    return ExecutionResult.build_from_output_bundle(
        definition,
        cmec_output_bundle=cmec_output,
        cmec_metric_bundle=cmec_metric_bundle,
    )