Skip to content

climate_ref.executor.reingest #

Reingest existing execution results without re-running diagnostics.

This module provides functionality to re-run build_execution_result() and re-ingest the results into the database for executions that have already completed. This is useful when new series definitions or metadata extraction logic has been added and you want to apply it to existing outputs without re-executing the diagnostics.

Reingest always creates a new Execution record under the same ExecutionGroup with its own output directory, leaving the original execution untouched. Results are treated as immutable.

get_executions_for_reingest(database, *, execution_group_ids=None, provider_filters=None, diagnostic_filters=None, include_failed=False) #

Query executions eligible for reingest.

Always selects the oldest (original) execution per group so that reingest uses the execution whose scratch directory actually exists. Reingested executions only have results directories, not scratch.

Parameters:

Name Type Description Default
database Database

Database instance

required
execution_group_ids Sequence[int] | None

If provided, only include these execution group IDs

None
provider_filters list[str] | None

Filter by provider slug (substring, case-insensitive)

None
diagnostic_filters list[str] | None

Filter by diagnostic slug (substring, case-insensitive)

None
include_failed bool

If True, also include failed executions

False

Returns:

Type Description
list[tuple[ExecutionGroup, Execution]]

List of (ExecutionGroup, oldest Execution) tuples

Source code in packages/climate-ref/src/climate_ref/executor/reingest.py
def get_executions_for_reingest(
    database: "Database",
    *,
    execution_group_ids: Sequence[int] | None = None,
    provider_filters: list[str] | None = None,
    diagnostic_filters: list[str] | None = None,
    include_failed: bool = False,
) -> list[tuple[ExecutionGroup, Execution]]:
    """
    Query executions eligible for reingest.

    Always selects the **oldest** (original) execution per group so that
    reingest uses the execution whose scratch directory actually exists.
    Reingested executions only have results directories, not scratch.

    Parameters
    ----------
    database
        Database instance
    execution_group_ids
        If provided, only include these execution group IDs
    provider_filters
        Filter by provider slug (substring, case-insensitive)
    diagnostic_filters
        Filter by diagnostic slug (substring, case-insensitive)
    include_failed
        If True, also include failed executions

    Returns
    -------
    :
        List of (ExecutionGroup, oldest Execution) tuples
    """
    # Use the existing filtered query to identify matching execution groups
    results = get_execution_group_and_latest_filtered(
        database.session,
        diagnostic_filters=diagnostic_filters,
        provider_filters=provider_filters,
        successful=None if include_failed else True,
        include_superseded=True,
    )

    # Filter by execution group IDs if provided
    if execution_group_ids:
        id_set = set(execution_group_ids)
        results = [(eg, ex) for eg, ex in results if eg.id in id_set]

    # Filter out entries with no execution, then select the oldest per group.
    # ExecutionGroup.executions is ordered by created_at ascending,
    # so [0] is the original execution whose scratch directory exists.
    seen: set[int] = set()
    out: list[tuple[ExecutionGroup, Execution]] = []
    for eg, ex in results:
        if ex is None or eg.id in seen:
            continue
        seen.add(eg.id)
        oldest = eg.executions[0]
        if not include_failed and not oldest.successful:
            continue
        out.append((eg, oldest))
    return out

reconstruct_execution_definition(config, execution, diagnostic, output_fragment=None) #

Reconstruct an ExecutionDefinition from database state.

This rebuilds the definition that was originally used to produce the execution, using the execution's stored datasets, output fragment, and the live diagnostic object from the provider registry.

Parameters:

Name Type Description Default
config Config

Application configuration (provides paths.results)

required
execution Execution

The database Execution record to reconstruct from

required
diagnostic Diagnostic

The live Diagnostic instance resolved from the provider registry

required
output_fragment str | None

If provided, use this fragment instead of the execution's own fragment for the output directory. Used during reingest to point at the new scratch location after copying.

None

Returns:

Type Description
ExecutionDefinition

A reconstructed ExecutionDefinition pointing at the scratch directory

Source code in packages/climate-ref/src/climate_ref/executor/reingest.py
def reconstruct_execution_definition(
    config: "Config",
    execution: Execution,
    diagnostic: "Diagnostic",
    output_fragment: str | None = None,
) -> ExecutionDefinition:
    """
    Reconstruct an ``ExecutionDefinition`` from database state.

    This rebuilds the definition that was originally used to produce the execution,
    using the execution's stored datasets, output fragment, and the live diagnostic
    object from the provider registry.

    Parameters
    ----------
    config
        Application configuration (provides ``paths.results``)
    execution
        The database ``Execution`` record to reconstruct from
    diagnostic
        The live ``Diagnostic`` instance resolved from the provider registry
    output_fragment
        If provided, use this fragment instead of the execution's own fragment
        for the output directory. Used during reingest to point at the new
        scratch location after copying.

    Returns
    -------
    :
        A reconstructed ``ExecutionDefinition`` pointing at the scratch directory
    """
    execution_group = execution.execution_group

    # Build DatasetCollection per source type from the execution's linked datasets
    datasets_by_type: dict[SourceDatasetType, list[Any]] = defaultdict(list)
    for dataset in execution.datasets:
        datasets_by_type[dataset.dataset_type].append(dataset)

    collection: dict[SourceDatasetType | str, DatasetCollection] = {}
    for source_type, ds_list in datasets_by_type.items():
        slug_column = get_slug_column(source_type)

        # Build a DataFrame from the DB dataset records and their files
        rows = []
        for dataset in ds_list:
            # Get all attributes from the polymorphic dataset model
            dataset_attrs = _extract_dataset_attributes(dataset)
            for file in dataset.files:
                row = {
                    **dataset_attrs,
                    "path": file.path,
                    "start_time": file.start_time,
                    "end_time": file.end_time,
                }
                if hasattr(file, "tracking_id"):
                    row["tracking_id"] = file.tracking_id
                rows.append((dataset.id, row))

        if rows:
            index = [r[0] for r in rows]
            data = [r[1] for r in rows]
            df = pd.DataFrame(data, index=index)
        else:
            df = pd.DataFrame()

        # Retrieve the selector for this source type from the execution group
        selector_key = source_type.value
        selector = tuple(tuple(pair) for pair in execution_group.selectors.get(selector_key, []))

        collection[source_type] = DatasetCollection(
            datasets=df,
            slug_column=slug_column,
            selector=selector,
        )

    fragment = output_fragment if output_fragment is not None else execution.output_fragment
    output_directory = config.paths.scratch / fragment

    return ExecutionDefinition(
        diagnostic=diagnostic,
        key=execution_group.key,
        datasets=ExecutionDatasetCollection(collection),
        root_directory=config.paths.scratch,
        output_directory=output_directory,
    )

reingest_execution(config, database, execution, provider_registry) #

Reingest an existing execution.

Re-runs build_execution_result() against the scratch directory (which contains the raw outputs from the original diagnostic run), creates a new Execution record with a unique output fragment, copies results to the new location, and ingests metrics into the database.

The original execution is left untouched.

Parameters:

Name Type Description Default
config Config

Application configuration

required
database Database

Database instance

required
execution Execution

The Execution record to reingest

required
provider_registry ProviderRegistry

Registry of active providers (used to resolve the live diagnostic)

required

Returns:

Type Description
bool

True if reingest was successful, False if it was skipped due to an error

Source code in packages/climate-ref/src/climate_ref/executor/reingest.py
def reingest_execution(
    config: "Config",
    database: "Database",
    execution: Execution,
    provider_registry: "ProviderRegistry",
) -> bool:
    """
    Reingest an existing execution.

    Re-runs ``build_execution_result()`` against the scratch directory
    (which contains the raw outputs from the original diagnostic run),
    creates a new ``Execution`` record with a unique output fragment,
    copies results to the new location, and ingests metrics into the database.

    The original execution is left untouched.

    Parameters
    ----------
    config
        Application configuration
    database
        Database instance
    execution
        The ``Execution`` record to reingest
    provider_registry
        Registry of active providers (used to resolve the live diagnostic)

    Returns
    -------
    :
        True if reingest was successful, False if it was skipped due to an error
    """
    resolved = _resolve_diagnostic_and_scratch(config, execution, provider_registry)
    if resolved is None:
        return False
    diagnostic, scratch_dir = resolved

    execution_group = execution.execution_group
    provider_slug = execution_group.diagnostic.provider.slug
    diagnostic_slug = execution_group.diagnostic.slug

    # Convert the JSON-stored selector dict (lists-of-pairs) back into the
    # mapping[str, iterable[tuple[str, str]]] shape that ``compute_group_short`` expects.
    selectors = {
        source_key: [tuple(pair) for pair in pairs] for source_key, pairs in execution_group.selectors.items()
    }

    new_fragment: str | None = None
    new_scratch_dir: Path | None = None
    new_execution: Execution | None = None

    try:
        try:
            with database.session.begin_nested():
                new_execution = Execution(
                    execution_group=execution_group,
                    dataset_hash=execution.dataset_hash,
                    output_fragment=PLACEHOLDER_FRAGMENT,
                    provider_version=diagnostic.provider.version,
                )
                database.session.add(new_execution)

                new_fragment = assign_execution_fragment(
                    database.session,
                    new_execution,
                    provider_slug=provider_slug,
                    diagnostic_slug=diagnostic_slug,
                    selectors=selectors,
                    group_id=execution_group.id,
                )

                new_scratch_dir = config.paths.scratch / new_fragment
                new_scratch_dir.parent.mkdir(parents=True, exist_ok=True)
                shutil.copytree(scratch_dir, new_scratch_dir)

                definition = reconstruct_execution_definition(
                    config, execution, diagnostic, output_fragment=new_fragment
                )

                result = diagnostic.build_execution_result(definition)

                if not result.successful or result.metric_bundle_filename is None:
                    logger.warning(
                        f"build_execution_result returned unsuccessful result "
                        f"for execution {execution.id}. Skipping."
                    )
                    raise _ReingestSavepointAbort

                # Copy dataset links from the original execution
                for dataset in execution.datasets:
                    database.session.execute(
                        execution_datasets.insert().values(
                            execution_id=new_execution.id,
                            dataset_id=dataset.id,
                        )
                    )
        except _ReingestSavepointAbort:
            if new_scratch_dir is not None and new_scratch_dir.exists():
                shutil.rmtree(new_scratch_dir)
            return False

        handle_execution_result(
            config,
            database,
            new_execution,
            result,
            update_dirty=False,
        )
    except Exception:
        logger.exception(f"Ingestion failed for execution {execution.id}. Rolling back changes.")
        if new_scratch_dir is not None and new_scratch_dir.exists():
            shutil.rmtree(new_scratch_dir)
        if new_fragment is not None:
            new_results_dir = config.paths.results / new_fragment
            if new_results_dir.exists():
                shutil.rmtree(new_results_dir)
        return False

    logger.info(f"Successfully reingested execution {execution.id} -> new execution {new_execution.id}")
    return True