Skip to content

climate_ref.executor.result_handling #

Execute diagnostics in different environments

We support running diagnostics in different environments, such as locally, in a separate process, or in a container. These environments are represented by climate_ref.executor.Executor classes.

The simplest executor is the LocalExecutor, which runs the diagnostic in the same process. This is useful for local testing and debugging.

ExecutionFuture #

A container linking a submitted future to its execution metadata.

Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
@define
class ExecutionFuture:
    """A container linking a submitted future to its execution metadata."""

    future: Future[ExecutionResult]
    definition: ExecutionDefinition
    execution_id: int | None = None
    submitted_at: float = 0.0

handle_execution_result(config, database, execution, result, *, update_dirty=True) #

Handle the result of a diagnostic execution

This will update the diagnostic execution result with the output of the diagnostic execution. The output will be copied from the scratch directory to the executions directory.

Parameters:

Name Type Description Default
config Config

The configuration to use

required
database Database

The active database session to use

required
execution Execution

The diagnostic execution result DB object to update

required
result ExecutionResult

The result of the diagnostic execution, either successful or failed

required
update_dirty bool

Whether to update the execution group's dirty flag. Set to False for reingest which should not alter pending-work state.

True
Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
def handle_execution_result(
    config: "Config",
    database: Database,
    execution: Execution,
    result: "ExecutionResult",
    *,
    update_dirty: bool = True,
) -> None:
    """
    Handle the result of a diagnostic execution

    This will update the diagnostic execution result with the output of the diagnostic execution.
    The output will be copied from the scratch directory to the executions directory.

    Parameters
    ----------
    config
        The configuration to use
    database
        The active database session to use
    execution
        The diagnostic execution result DB object to update
    result
        The result of the diagnostic execution, either successful or failed
    update_dirty
        Whether to update the execution group's dirty flag.
        Set to False for reingest which should not alter pending-work state.
    """
    # Always copy log data to the results directory
    try:
        _copy_file_to_results(
            config.paths.scratch,
            config.paths.results,
            execution.output_fragment,
            EXECUTION_LOG_FILENAME,
        )
    except FileNotFoundError:
        logger.error(
            f"Could not find log file {EXECUTION_LOG_FILENAME} in scratch directory: {config.paths.scratch}. "
            f"This is likely a system error (will be retried on next solve)."
        )
        execution.mark_failed()
        # Missing log file suggests the process was killed before writing output,
        # so leave dirty=True for retry
        return

    if not result.successful or result.metric_bundle_filename is None:
        execution.mark_failed()
        if result.retryable:
            logger.error(f"{execution} failed due to a system error (will be retried on next solve)")
            # Leave dirty=True so the execution is retried on next solve
        else:
            logger.error(f"{execution} failed due to a diagnostic error")
            if update_dirty:
                execution.execution_group.dirty = False
        return

    logger.info(f"{execution} successful")

    _copy_file_to_results(
        config.paths.scratch,
        config.paths.results,
        execution.output_fragment,
        result.metric_bundle_filename,
    )

    if result.output_bundle_filename:
        _copy_file_to_results(
            config.paths.scratch,
            config.paths.results,
            execution.output_fragment,
            result.output_bundle_filename,
        )
        _copy_output_bundle_files(
            config,
            execution,
            result.to_output_path(result.output_bundle_filename),
        )

    if result.series_filename:
        _copy_file_to_results(
            config.paths.scratch,
            config.paths.results,
            execution.output_fragment,
            result.series_filename,
        )

    # Ingest outputs and metrics into the database via the shared ingestion path
    cv = CV.load_from_file(config.paths.dimensions_cv)
    try:
        with database.session.begin_nested():
            ingest_execution_result(
                database,
                execution,
                result,
                cv,
                output_base_path=config.paths.scratch / execution.output_fragment,
            )
    except Exception:
        logger.exception("Something went wrong when ingesting execution result")

    # TODO: This should check if the result is the most recent for the execution,
    # if so then update the dirty fields
    # i.e. if there are outstanding executions don't make as clean
    if update_dirty:
        execution.execution_group.dirty = False

    # Finally, mark the execution as successful
    execution.mark_successful(result.as_relative_path(result.metric_bundle_filename))

ingest_execution_result(database, execution, result, cv, *, output_base_path) #

Ingest a successful execution result into the database.

Registers output entries and ingests scalar and series metric values.

Parameters:

Name Type Description Default
database Database

The active database session to use

required
execution Execution

The execution record to associate results with

required
result ExecutionResult

The successful execution result

required
cv CV

The controlled vocabulary to validate metrics against

required
output_base_path Path

Primary base directory for resolving output filenames

required
Notes

Callers are responsible for:

  • File copying (scratch -> results)
  • Transaction boundaries
  • Marking the execution as successful (execution.mark_successful())
  • Setting the dirty flag on the execution group
Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
def ingest_execution_result(
    database: Database,
    execution: Execution,
    result: "ExecutionResult",
    cv: CV,
    *,
    output_base_path: pathlib.Path,
) -> None:
    """
    Ingest a successful execution result into the database.

    Registers output entries and ingests scalar and series metric values.

    Parameters
    ----------
    database
        The active database session to use
    execution
        The execution record to associate results with
    result
        The successful execution result
    cv
        The controlled vocabulary to validate metrics against
    output_base_path
        Primary base directory for resolving output filenames

    Notes
    -----
    Callers are responsible for:

    * File copying (scratch -> results)
    * Transaction boundaries
    * Marking the execution as successful (``execution.mark_successful()``)
    * Setting the dirty flag on the execution group
    """
    if result.output_bundle_filename:
        cmec_output_bundle = CMECOutput.load_from_json(result.to_output_path(result.output_bundle_filename))
        for attr, output_type in [
            ("plots", ResultOutputType.Plot),
            ("data", ResultOutputType.Data),
            ("html", ResultOutputType.HTML),
        ]:
            register_execution_outputs(
                database,
                execution,
                getattr(cmec_output_bundle, attr),
                output_type=output_type,
                base_path=output_base_path,
            )

    if result.series_filename:
        ingest_series_values(
            database=database,
            result=result,
            execution=execution,
            cv=cv,
        )

    ingest_scalar_values(
        database=database,
        result=result,
        execution=execution,
        cv=cv,
    )

ingest_scalar_values(database, result, execution, cv) #

Load, validate, and bulk-insert scalar metric values.

Parameters:

Name Type Description Default
database Database

The active database session to use

required
result ExecutionResult

The execution result containing the metric bundle filename

required
execution Execution

The execution record to associate values with

required
cv CV

The controlled vocabulary to validate against

required
Notes

Callers are responsible for transaction boundaries; this function does not open a nested transaction or catch exceptions.

Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
def ingest_scalar_values(
    database: Database,
    result: "ExecutionResult",
    execution: Execution,
    cv: CV,
) -> None:
    """
    Load, validate, and bulk-insert scalar metric values.

    Parameters
    ----------
    database
        The active database session to use
    result
        The execution result containing the metric bundle filename
    execution
        The execution record to associate values with
    cv
        The controlled vocabulary to validate against

    Notes
    -----
    Callers are responsible for transaction boundaries; this function does not
    open a nested transaction or catch exceptions.
    """
    cmec_metric_bundle = CMECMetric.load_from_json(result.to_output_path(result.metric_bundle_filename))

    try:
        cv.validate_metrics(cmec_metric_bundle)
    except (ResultValidationError, AssertionError):
        # TODO: Remove once we have settled on a controlled vocabulary
        logger.warning(
            "Diagnostic scalar values do not conform with the controlled vocabulary", exc_info=True
        )

    new_values = []
    for metric_result in cmec_metric_bundle.iter_results():
        new_values.append(
            {
                "execution_id": execution.id,
                "value": metric_result.value,
                "attributes": metric_result.attributes,
                **metric_result.dimensions,
            }
        )

    logger.debug(f"Ingesting {len(new_values)} scalar values for execution {execution.id}")

    if new_values:
        database.session.execute(insert(ScalarMetricValue), new_values)

ingest_series_values(database, result, execution, cv) #

Load, validate, and bulk-insert series metric values.

Parameters:

Name Type Description Default
database Database

The active database session to use

required
result ExecutionResult

The execution result containing the series filename

required
execution Execution

The execution record to associate values with

required
cv CV

The controlled vocabulary to validate against

required
Notes

Callers are responsible for transaction boundaries; this function does not open a nested transaction or catch exceptions.

Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
def ingest_series_values(
    database: Database,
    result: "ExecutionResult",
    execution: Execution,
    cv: CV,
) -> None:
    """
    Load, validate, and bulk-insert series metric values.

    Parameters
    ----------
    database
        The active database session to use
    result
        The execution result containing the series filename
    execution
        The execution record to associate values with
    cv
        The controlled vocabulary to validate against

    Notes
    -----
    Callers are responsible for transaction boundaries; this function does not
    open a nested transaction or catch exceptions.
    """
    assert result.series_filename, "Series filename must be set in the result"

    series_values_path = result.to_output_path(result.series_filename)
    series_values = TSeries.load_from_json(series_values_path)

    try:
        cv.validate_metrics(series_values)
    except (ResultValidationError, AssertionError):
        # TODO: Remove once we have settled on a controlled vocabulary
        logger.warning(
            "Diagnostic series values do not conform with the controlled vocabulary", exc_info=True
        )

    new_values = []
    for series_result in series_values:
        new_values.append(
            {
                "execution_id": execution.id,
                "values": series_result.values,
                "attributes": series_result.attributes,
                "index": series_result.index,
                "index_name": series_result.index_name,
                **series_result.dimensions,
            }
        )

    logger.debug(f"Ingesting {len(new_values)} series values for execution {execution.id}")

    if new_values:
        database.session.execute(insert(SeriesMetricValue), new_values)

mark_execution_failed(database, config, definition, execution_id, *, retryable) #

Persist a failed result for an outstanding execution.

Used when an executor abandons a future (per-task timeout, overall timeout, worker crash) so the corresponding Execution row never stays stuck in successful=None state.

Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
def mark_execution_failed(
    database: Database,
    config: "Config",
    definition: ExecutionDefinition,
    execution_id: int | None,
    *,
    retryable: bool,
) -> None:
    """Persist a failed result for an outstanding execution.

    Used when an executor abandons a future (per-task timeout, overall timeout,
    worker crash) so the corresponding ``Execution`` row never stays stuck in
    ``successful=None`` state.
    """
    try:
        failure_result = ExecutionResult.build_from_failure(definition, retryable=retryable)
        with database.session.begin():
            execution = database.session.get(Execution, execution_id) if execution_id else None
            process_result(config, database, failure_result, execution)
    except Exception:
        logger.exception(f"Failed to record failure for {definition.execution_slug()!r}")

process_result(config, database, result, execution) #

Process the result of a diagnostic execution, persisting outcome to the DB.

Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
def process_result(
    config: "Config",
    database: Database,
    result: ExecutionResult,
    execution: Execution | None,
) -> None:
    """Process the result of a diagnostic execution, persisting outcome to the DB."""
    if not result.successful:
        if execution is not None:  # pragma: no branch
            info_msg = (
                f"\nAdditional information about this execution can be viewed using: "
                f"ref executions inspect {execution.execution_group_id}"
            )
        else:
            info_msg = ""
        logger.error(f"Error running {result.definition.execution_slug()}. {info_msg}")

    if execution:
        handle_execution_result(config, database, execution, result)

register_execution_outputs(database, execution, outputs, output_type, *, base_path) #

Register output entries in the database.

Each entry in outputs is resolved relative to base_path.

Parameters:

Name Type Description Default
database Database

The active database session to use

required
execution Execution

The execution record to associate outputs with

required
outputs dict[str, OutputDict] | None

Mapping of short name to OutputDict (may be None)

required
output_type ResultOutputType

The type of output being registered

required
base_path Path

Base directory for resolving relative filenames

required
Notes

Callers are responsible for transaction boundaries.

Source code in packages/climate-ref/src/climate_ref/executor/result_handling.py
def register_execution_outputs(
    database: Database,
    execution: Execution,
    outputs: "dict[str, OutputDict] | None",
    output_type: ResultOutputType,
    *,
    base_path: pathlib.Path,
) -> None:
    """
    Register output entries in the database.

    Each entry in ``outputs`` is resolved relative to ``base_path``.

    Parameters
    ----------
    database
        The active database session to use
    execution
        The execution record to associate outputs with
    outputs
        Mapping of short name to ``OutputDict`` (may be None)
    output_type
        The type of output being registered
    base_path
        Base directory for resolving relative filenames

    Notes
    -----
    Callers are responsible for transaction boundaries.
    """
    for key, output_info in (outputs or {}).items():
        filename = ensure_relative_path(output_info.filename, base_path)
        database.session.add(
            ExecutionOutput.build(
                execution_id=execution.id,
                output_type=output_type,
                filename=str(filename),
                description=output_info.description,
                short_name=key,
                long_name=output_info.long_name,
                dimensions=output_info.dimensions or {},
            )
        )