class ILAMBStandard(Diagnostic):
"""
Apply the standard ILAMB analysis with respect to a given reference dataset.
"""
def __init__(
self,
registry_file: str,
metric_name: str,
sources: dict[str, str | dict[str, str]],
**ilamb_kwargs: Any,
):
# Setup the diagnostic
self.variable_id = ilamb_kwargs.get("analysis_variable", next(iter(sources.keys())))
if "sources" not in ilamb_kwargs: # pragma: no cover
ilamb_kwargs["sources"] = sources
if "relationships" not in ilamb_kwargs:
ilamb_kwargs["relationships"] = {}
if ilamb_kwargs["relationships"]:
transforms = list(ilamb_kwargs.get("transforms", []))
transform_names = [
next(iter(transform.keys())) if isinstance(transform, dict) else transform
for transform in transforms
]
if "climate_ref_relationship_time" not in transform_names:
transforms.append({"climate_ref_relationship_time": {"variable_id": self.variable_id}})
ilamb_kwargs["transforms"] = transforms
# Allow per-diagnostic override of the test source_id
# (not all variables are available from CanESM5)
test_source_id = ilamb_kwargs.pop("test_source_id", "CanESM5")
self.ilamb_kwargs = ilamb_kwargs
# REF stuff
self.name = metric_name
self.slug = self.name.lower().replace(" ", "-")
# Collect all variable IDs used by this diagnostic
all_variable_ids = (
self.variable_id,
*ilamb_kwargs.get("alternate_vars", []),
*ilamb_kwargs.get("related_vars", []),
*ilamb_kwargs.get("relationships", {}).keys(),
)
# Variables that the primary diagnostic requires (not relationship vars)
primary_variable_ids = (
self.variable_id,
*ilamb_kwargs.get("alternate_vars", []),
*ilamb_kwargs.get("related_vars", []),
)
# Look up CMIP7 branded variable names
branded_variables = _get_branded_variable(all_variable_ids, registry_file)
# Determine realm/region for CMIP7 filter based on registry
is_land = registry_file in ("ilamb", "ilamb-test")
relationship_variable_ids = tuple(ilamb_kwargs.get("relationships", {}).keys())
# Create the data requirement for the dataset under test
cmip6_requirement = _build_cmip_data_requirement(
source_type=SourceDatasetType.CMIP6,
filters={
"variable_id": all_variable_ids,
"frequency": "mon",
"experiment_id": ("historical", "land-hist"),
"table_id": (
"AERmonZ",
"Amon",
"CFmon",
"Emon",
"EmonZ",
"LImon",
"Lmon",
"Omon",
"SImon",
),
},
group_by=("experiment_id", "source_id", "member_id", "grid_label"),
primary_variable_ids=primary_variable_ids,
relationship_variable_ids=relationship_variable_ids,
is_land=is_land,
)
cmip7_requirement = _build_cmip_data_requirement(
source_type=SourceDatasetType.CMIP7,
filters={
"branded_variable": branded_variables,
"frequency": "mon",
"experiment_id": ("historical", "land-hist"),
"region": "glb",
},
group_by=("experiment_id", "source_id", "variant_label", "grid_label"),
primary_variable_ids=primary_variable_ids,
relationship_variable_ids=relationship_variable_ids,
is_land=is_land,
)
# obs4MIPs data requirement, normally ilamb3 expects the `sources` to
# resolve to keys in one of its data registries. If instead we find a
# dictionary, then assume that these keys are meant to be keywords in a
# REF data requirement.
# obs_source key is used to determine whether to fetch from ESGF
# or use the pre-fetched obs4REF registry.
filters: dict[str, tuple[str, ...]] = {}
obs_source = None
for _, source in sources.items():
if isinstance(source, dict):
obs_source = source.pop("obs_source", None)
for key, val in source.items():
if key not in filters:
filters[key] = ()
filters[key] += (val,)
obs4mips_requirement = (
DataRequirement(
source_type=SourceDatasetType.obs4MIPs,
filters=(FacetFilter(facets=filters),),
group_by=tuple(filters.keys()),
)
if filters
else None
)
data_requirements: Sequence[Sequence[DataRequirement]]
if obs4mips_requirement is None:
data_requirements = (
(cmip6_requirement,),
(cmip7_requirement,),
)
else:
data_requirements = (
(cmip6_requirement, obs4mips_requirement),
(cmip7_requirement, obs4mips_requirement),
)
self.data_requirements = data_requirements
self.facets = (
"experiment_id",
"source_id",
"member_id",
"grid_label",
"region",
"metric",
"statistic",
)
self.test_data_spec = _build_test_data_spec(
all_variable_ids=all_variable_ids,
registry_file=registry_file,
test_source_id=test_source_id,
is_land=is_land,
obs_filters=filters,
obs_source=obs_source,
)
# Setup ILAMB data and options
self.registry_file = registry_file
self.registry = dataset_registry_manager[self.registry_file]
self.ilamb_data = registry_to_collection(
dataset_registry_manager[self.registry_file],
)
def execute(self, definition: ExecutionDefinition) -> None:
"""
Run the ILAMB standard analysis.
"""
_set_ilamb3_options(self.registry, self.registry_file)
# Temporary hack of the ilamb3 inputs while we still need to refer to
# data not yet available in obs4{MIPs,REF}. This logic allows for
# DataRequirement filters to be added as a 'source' in the ilamb
# configure file. If a dictionary instead of a string was found, we
# populate an obs4MIPs requirement.
if SourceDatasetType.obs4MIPs in definition.datasets:
# ilamb3 will expect the reference dataset dataframe to have a `key`
# column that uniquely describes each dataset. Create one using the
# `instance_id` and an integer and then modify the ilamb3 configure
# so that it finds the proper data.
ref_datasets = definition.datasets[SourceDatasetType.obs4MIPs].datasets
ref_datasets = ref_datasets.reset_index()
ref_datasets["key"] = ref_datasets["instance_id"] + ref_datasets.index.astype(str)
for instance_id, df in ref_datasets.groupby("instance_id"):
variable_id = df["variable_id"].unique()[0]
self.ilamb_kwargs["sources"][variable_id] = f"{instance_id}*"
else:
# If the data is not ingested yet but in a registry, we concat the
# obs4REF registries to the ilamb one so that any key may be used
# from either. Eventually (?) this can be removed.
ref_datasets = pd.concat(
[
self.ilamb_data.datasets.set_index(self.ilamb_data.slug_column),
registry_to_collection(dataset_registry_manager["obs4ref"]).datasets.set_index("key"),
]
)
cmip_source = _get_cmip_source_type(definition.datasets)
model_datasets = definition.datasets[cmip_source].datasets
# CMIP7 uses variant_label instead of member_id; ilamb3 expects member_id
if cmip_source == SourceDatasetType.CMIP7 and "member_id" not in model_datasets.columns:
model_datasets = model_datasets.copy()
model_datasets["member_id"] = model_datasets["variant_label"]
# When both a 3D primary variable (e.g. thetao) and its 2D alternate
# (e.g. tos) are present, drop the primary so ilamb3 uses the alternate.
# This avoids two problems with merging datasets of different time ranges:
# 1) time_bnds gets NaN-filled for the shorter variable's missing steps,
# corrupting the time frequency calculation in ilamb3
# 2) The 3D variable may not cover the reference period (e.g. thetao
# covers 1850-1860 but WOA2023 reference covers 2005-2014)
# The alternate surface variable is equivalent after select_depth and
# typically has full temporal coverage.
alternate_vars = self.ilamb_kwargs.get("alternate_vars", [])
if alternate_vars:
available_alternates = [v for v in alternate_vars if v in model_datasets["variable_id"].values]
if available_alternates and self.variable_id in model_datasets["variable_id"].values:
model_datasets = model_datasets[model_datasets["variable_id"] != self.variable_id]
# In ilamb3, this is run with all the models that we know about to
# create different colors. For REF this will at least make the model
# line color not be black
run.set_model_colors(model_datasets)
# Run ILAMB in a single-threaded mode to avoid issues with multithreading (#394)
with dask.config.set(scheduler="synchronous"):
run.run_single_block(
self.slug,
ref_datasets,
model_datasets,
definition.output_directory,
**self.ilamb_kwargs,
)
def build_execution_result(self, definition: ExecutionDefinition) -> ExecutionResult:
"""
Build the diagnostic result after running ILAMB.
Parameters
----------
definition
The definition of the diagnostic execution
Returns
-------
An execution result object
"""
_set_ilamb3_options(self.registry, self.registry_file)
# In ILAMB, scalars are saved in CSV files in the output directory. To
# be compatible with the REF system we will need to add the metadata
# that is associated with the execution group, called the selector.
df = _load_csv_and_merge(definition.output_directory)
selectors = _get_selectors(definition.datasets)
# TODO: Fix reference data once we are using the obs4MIPs dataset
dataset_source = self.name.split("-")[1] if "-" in self.name else "None"
common_dimensions = {**selectors, "reference_source_id": dataset_source}
for key, value in common_dimensions.items():
df[key] = value
metric_bundle = CMECMetric.model_validate(_build_cmec_bundle(df))
# Add each png file plot to the output
output_bundle = CMECOutput.create_template()
for plotfile in definition.output_directory.glob("*.png"):
relative_path = str(definition.as_relative_path(plotfile))
caption, figure_dimensions = _caption_from_filename(plotfile, common_dimensions)
output_bundle[OutputCV.PLOTS.value][relative_path] = {
OutputCV.FILENAME.value: relative_path,
OutputCV.LONG_NAME.value: caption,
OutputCV.DESCRIPTION.value: "",
OutputCV.DIMENSIONS.value: figure_dimensions,
}
# Add the html page to the output
index_html = definition.to_output_path("index.html")
if index_html.exists():
relative_path = str(definition.as_relative_path(index_html))
output_bundle[OutputCV.HTML.value][relative_path] = {
OutputCV.FILENAME.value: relative_path,
OutputCV.LONG_NAME.value: "Results page",
OutputCV.DESCRIPTION.value: "Page displaying scalars and plots from the ILAMB execution.",
OutputCV.DIMENSIONS.value: common_dimensions,
}
output_bundle[OutputCV.INDEX.value] = relative_path
# Add series to the output based on the time traces we find in the
# output files
series = []
for ncfile in definition.output_directory.glob("*.nc"):
time_coder = xr.coders.CFDatetimeCoder(use_cftime=True)
ds = xr.open_dataset(ncfile, decode_times=time_coder)
for name, da in ds.items():
# Only create series for 1d DataArray's with these dimensions
if not (da.ndim == 1 and set(da.dims).intersection(["time", "month"])):
continue
# Convert dimension values
attrs = {
"units": da.attrs.get("units", ""),
"long_name": da.attrs.get("long_name", str(name)),
"standard_name": da.attrs.get("standard_name", ""),
}
str_name = str(name)
index_name = str(da.dims[0])
index = ds[index_name].values.tolist()
if hasattr(index[0], "isoformat"):
index = [v.isoformat() for v in index]
if hasattr(index[0], "calendar"):
attrs["calendar"] = index[0].calendar
# Parse out some dimensions
if ncfile.stem == "Reference":
dimensions = {
"source_id": "Reference",
"metric": str_name,
}
else:
dimensions = {"metric": str_name, **common_dimensions}
# Split the metric into metric and region if possible
if "_" in str_name:
metric_name, region_name = str_name.split("_", maxsplit=1)
dimensions["metric"] = metric_name
dimensions["region"] = region_name
else:
dimensions["region"] = "None"
series.append(
SeriesMetricValue(
dimensions=dimensions,
values=da.values.tolist(),
index=index,
index_name=index_name,
attributes=attrs,
)
)
return ExecutionResult.build_from_output_bundle(
definition, cmec_output_bundle=output_bundle, cmec_metric_bundle=metric_bundle, series=series
)