climate_ref.solver
#
Solver to determine which diagnostics need to be calculated
This module provides a solver to determine which diagnostics need to be calculated.
DiagnosticExecution
#
Class to hold information about the execution of a diagnostic
This is a temporary class used by the solver to hold information about an execution that might be required.
Source code in packages/climate-ref/src/climate_ref/solver.py
dataset_key
property
#
Key used to uniquely identify the execution group
This key is unique to an execution group and uses unique set of metadata (selectors) that defines the group. This key is combines the selectors from each source dataset type into a single key and should be stable if new datasets are added or removed.
selectors
property
#
Collection of selectors used to identify the datasets
These are the key, value pairs that were selected during the initial group-by, for each data requirement.
build_execution_definition(output_root)
#
Build the execution definition for the current diagnostic execution.
The returned definition uses a placeholder fragment for the output directory.
solve_required_executions rewrites output_directory via
:func:attrs.evolve once the new Execution.id is known.
Source code in packages/climate-ref/src/climate_ref/solver.py
ExecutionSolver
#
A solver to determine which executions need to be calculated.
Source code in packages/climate-ref/src/climate_ref/solver.py
build_from_db(config, db)
staticmethod
#
Initialise the solver using information from the database
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
Database
|
Database instance |
required |
Returns:
| Type | Description |
|---|---|
ExecutionSolver
|
A new ExecutionSolver instance |
Source code in packages/climate-ref/src/climate_ref/solver.py
solve(filters=None)
#
Solve which executions need to be calculated for a dataset
The solving scheme is iterative, for each iteration we find all diagnostics that can be solved and calculate them. After each iteration we check if there are any more diagnostics to solve.
Yields:
| Type | Description |
|---|---|
DiagnosticExecution
|
A class containing the information related to the execution of a diagnostic |
Source code in packages/climate-ref/src/climate_ref/solver.py
SolveFilterOptions
#
Options to filter the diagnostics that are solved
Source code in packages/climate-ref/src/climate_ref/solver.py
dataset = None
class-attribute
instance-attribute
#
Filter datasets by facet values before solving.
Keys are facet names (e.g. source_id, variable_id) and values are
lists of allowed values. Different facets are ANDed together; multiple
values for the same facet are ORed.
diagnostic = None
class-attribute
instance-attribute
#
Check if the diagnostic slug contains any of the provided values
provider = None
class-attribute
instance-attribute
#
Check if the provider slug contains any of the provided values
apply_dataset_filters(data_catalog, dataset_filters)
#
Filter data catalogs by facet values
Each facet filter is applied independently to each data catalog. Different facets are ANDed together; multiple values for the same facet are ORed. Facets that do not exist as columns in a given catalog are skipped for that catalog.
When a DataCatalog is provided, the returned value preserves the DataCatalog wrapper (with adapter and database references) so that downstream finalisation still works.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_catalog
|
Mapping[SourceDatasetType, DataCatalog | DataFrame]
|
Data catalogs keyed by source dataset type |
required |
dataset_filters
|
dict[str, list[str]]
|
Mapping of facet names to lists of allowed values |
required |
Returns:
| Type | Description |
|---|---|
dict[SourceDatasetType, DataCatalog | DataFrame]
|
Filtered data catalogs |
Source code in packages/climate-ref/src/climate_ref/solver.py
extract_covered_datasets(data_catalog, requirement)
#
Determine the different diagnostic executions that should be performed with the current data catalog
Source code in packages/climate-ref/src/climate_ref/solver.py
fail_stale_in_progress_executions(db, *, stale_after_seconds=DEFAULT_STALE_EXECUTION_AGE_SECONDS)
#
Mark abandoned in-progress executions as failed so the next solve can retry them.
An execution is considered abandoned when it has successful=None and was
created longer ago than stale_after_seconds. This commonly happens when
a worker is killed (OOM, walltime, segfault) before its result-handling
callback ran, or when the join loop crashed mid-flight.
The execution group's dirty flag is left untouched so the existing
retry logic (ExecutionGroup.should_run) picks the work back up.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
Database
|
The database to inspect |
required |
stale_after_seconds
|
int
|
Minimum age in seconds before an in-progress execution is considered abandoned. Defaults to 6 hours, matching the Celery and LocalExecutor per-task time limits. |
DEFAULT_STALE_EXECUTION_AGE_SECONDS
|
Returns:
| Type | Description |
|---|---|
int
|
The number of executions that were marked failed. |
Source code in packages/climate-ref/src/climate_ref/solver.py
matches_filter(diagnostic, filters)
#
Check if a diagnostic matches the provided filters
Each filter is optional and a diagnostic will match if it satisfies all the provided filters. i.e. the filters are ANDed together.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
diagnostic
|
Diagnostic
|
Diagnostic to check against the filters |
required |
filters
|
SolveFilterOptions | None
|
Collection of filters to apply to the diagnostic If no filters are provided, the diagnostic is considered to match |
required |
Returns:
| Type | Description |
|---|---|
True if the diagnostic matches the filters, False otherwise
|
|
Source code in packages/climate-ref/src/climate_ref/solver.py
solve_executions(data_catalog, diagnostic, provider)
#
Calculate the diagnostic executions that need to be performed for a given diagnostic
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_catalog
|
Mapping[SourceDatasetType, DataFrame | DataCatalog]
|
Data catalogs for each source dataset type |
required |
diagnostic
|
Diagnostic
|
Diagnostic of interest |
required |
provider
|
DiagnosticProvider
|
Provider of the diagnostic |
required |
Returns:
| Type | Description |
|---|---|
Generator[DiagnosticExecution, None, None]
|
A generator that yields the diagnostic executions that need to be performed |
Source code in packages/climate-ref/src/climate_ref/solver.py
solve_required_executions(db, dry_run=False, execute=True, solver=None, config=None, timeout=60, one_per_provider=False, one_per_diagnostic=False, filters=None, limit=None, rerun_failed=False)
#
Solve for executions that require recalculation
This may trigger a number of additional calculations depending on what data has been ingested since the last solve.
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If the execution isn't completed within the specified timeout |
Source code in packages/climate-ref/src/climate_ref/solver.py
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 | |