Skip to content

climate_ref.cli.datasets #

View and ingest input datasets

The metadata from these datasets are stored in the database so that they can be used to determine which executions are required for a given diagnostic without having to re-parse the datasets.

fetch_data(ctx, registry, output_directory=None, force_cleanup=False, symlink=False, verify=True) #

Fetch REF-specific datasets

These datasets have been verified to have open licenses and are in the process of being added to Obs4MIPs.

Source code in packages/climate-ref/src/climate_ref/cli/datasets.py
@app.command(name="fetch-data")
def fetch_data(  # noqa: PLR0913
    ctx: typer.Context,
    registry: Annotated[
        str,
        typer.Option(help="Name of the data registry to use"),
    ],
    output_directory: Annotated[
        Path | None,
        typer.Option(help="Output directory where files will be saved"),
    ] = None,
    force_cleanup: Annotated[
        bool,
        typer.Option(help="If True, remove any existing files"),
    ] = False,
    symlink: Annotated[
        bool,
        typer.Option(help="If True, symlink files into the output directory, otherwise perform a copy"),
    ] = False,
    verify: Annotated[
        bool,
        typer.Option(help="Verify the checksums of the fetched files"),
    ] = True,
) -> None:
    """
    Fetch REF-specific datasets

    These datasets have been verified to have open licenses
    and are in the process of being added to Obs4MIPs.
    """
    from climate_ref.provider_registry import ProviderRegistry

    config = ctx.obj.config
    db = ctx.obj.database

    # Setup the provider registry to register any dataset registries in the configured providers
    ProviderRegistry.build_from_config(config, db)

    if output_directory and force_cleanup and output_directory.exists():
        logger.warning(f"Removing existing directory {output_directory}")
        shutil.rmtree(output_directory)

    try:
        _registry = dataset_registry_manager[registry]
    except KeyError:
        logger.error(f"Registry {registry} not found")
        logger.error(f"Available registries: {', '.join(dataset_registry_manager.keys())}")
        raise typer.Exit(code=1)

    fetch_all_files(
        _registry,
        registry,
        output_directory,
        symlink=symlink,
        verify=verify,
    )

ingest(ctx, file_or_directory, source_type, solve=False, dry_run=False, n_jobs=None, skip_invalid=True, chunk_size=None) #

Ingest a directory of datasets into the database

Each dataset will be loaded and validated using the specified dataset adapter. This will extract metadata from the datasets and store it in the database.

A table of the datasets will be printed to the console at the end of the operation.

Source code in packages/climate-ref/src/climate_ref/cli/datasets.py
@app.command()
def ingest(  # noqa
    ctx: typer.Context,
    file_or_directory: list[Path],
    source_type: Annotated[SourceDatasetType, typer.Option(help="Type of source dataset")],
    solve: Annotated[bool, typer.Option(help="Solve for new diagnostic executions after ingestion")] = False,
    dry_run: Annotated[bool, typer.Option(help="Do not ingest datasets into the database")] = False,
    n_jobs: Annotated[int | None, typer.Option(help="Number of jobs to run in parallel")] = None,
    skip_invalid: Annotated[
        bool, typer.Option(help="Ignore (but log) any datasets that don't pass validation")
    ] = True,
    chunk_size: Annotated[
        int | None,
        typer.Option(
            help=(
                "Stream the catalog in chunks of this many files instead of loading the whole "
                "directory at once. Bounds peak memory for large archives. Only supported by "
                "adapters that implement iter_local_datasets (currently CMIP6 and CMIP7)."
            )
        ),
    ] = None,
) -> None:
    """
    Ingest a directory of datasets into the database

    Each dataset will be loaded and validated using the specified dataset adapter.
    This will extract metadata from the datasets and store it in the database.

    A table of the datasets will be printed to the console at the end of the operation.
    """
    from climate_ref.datasets import IngestionStats, ingest_datasets

    config = ctx.obj.config
    db = ctx.obj.database
    console = ctx.obj.console

    kwargs = {}

    if n_jobs is not None:
        kwargs["n_jobs"] = n_jobs

    # Create a data catalog from the specified file or directory
    adapter = get_dataset_adapter(source_type.value, **kwargs)

    failed_dirs: list[Path] = []

    if chunk_size is not None and chunk_size < 1:
        raise typer.BadParameter(f"chunk_size must be >= 1, got {chunk_size}", param_hint="--chunk-size")

    streaming = chunk_size is not None and hasattr(adapter, "iter_local_datasets")
    if chunk_size is not None and not streaming:
        logger.warning(
            f"Adapter for {source_type.value} does not support streaming ingest; "
            "falling back to whole-catalog mode."
        )

    for _dir in file_or_directory:
        _dir = Path(_dir).expanduser()
        logger.info(f"Ingesting {_dir}")

        if not _dir.exists():
            logger.error(f"File or directory {_dir} does not exist")
            failed_dirs.append(_dir)
            continue

        # TODO: This assumes that all datasets are nc files.
        # This is true for CMIP6 and obs4MIPs but may not be true for other dataset types in the future.
        if next(_dir.rglob("*.nc"), None) is None:
            logger.error(f"No .nc files found in {_dir}")
            failed_dirs.append(_dir)
            continue

        if streaming:
            stats = IngestionStats()
            preview_printed = False
            total_files = 0
            total_datasets = 0
            try:
                for raw_chunk in adapter.iter_local_datasets(_dir, chunk_size=chunk_size):  # type: ignore[attr-defined]
                    validated_chunk = adapter.validate_data_catalog(raw_chunk, skip_invalid=skip_invalid)
                    if validated_chunk.empty:
                        continue
                    if not preview_printed:
                        pretty_print_df(adapter.pretty_subset(validated_chunk), console=console)
                        preview_printed = True
                    total_files += len(validated_chunk)
                    total_datasets += validated_chunk[adapter.slug_column].nunique()

                    if dry_run:
                        for instance_id in validated_chunk[adapter.slug_column].unique():
                            with db.session.begin():
                                dataset = (
                                    db.session.query(Dataset)
                                    .filter_by(slug=instance_id, dataset_type=source_type)
                                    .first()
                                )
                                if not dataset:
                                    logger.info(f"Would save dataset {instance_id} to the database")
                    else:
                        stats += ingest_datasets(adapter, None, db, data_catalog=validated_chunk)
                    del raw_chunk, validated_chunk
            except Exception as e:
                logger.exception(f"Error ingesting datasets from {_dir}: {e}")
                failed_dirs.append(_dir)
                continue

            if total_files == 0:
                logger.warning(f"No valid datasets found in {_dir}")
                continue

            logger.info(
                f"Streamed {total_files} files across approximately {total_datasets} datasets from {_dir}"
            )
            if not dry_run:
                stats.log_summary()
            continue

        try:
            data_catalog = adapter.find_local_datasets(_dir)
            data_catalog = adapter.validate_data_catalog(data_catalog, skip_invalid=skip_invalid)
        except Exception as e:
            logger.exception(f"Error ingesting datasets from {_dir}: {e}")
            failed_dirs.append(_dir)
            continue

        if data_catalog.empty:
            logger.warning(f"No valid datasets found in {_dir}")
            continue

        logger.info(
            f"Found {len(data_catalog)} files for {len(data_catalog[adapter.slug_column].unique())} datasets"
        )
        pretty_print_df(adapter.pretty_subset(data_catalog), console=console)

        if dry_run:
            # In dry_run mode, just report what would be saved
            for instance_id in data_catalog[adapter.slug_column].unique():
                with db.session.begin():
                    dataset = (
                        db.session.query(Dataset)
                        .filter_by(slug=instance_id, dataset_type=source_type)
                        .first()
                    )
                    if not dataset:
                        logger.info(f"Would save dataset {instance_id} to the database")
        else:
            # Use shared ingestion logic with pre-validated catalog
            stats = ingest_datasets(adapter, None, db, data_catalog=data_catalog, skip_invalid=skip_invalid)
            stats.log_summary()

    if solve:
        from climate_ref.solver import solve_required_executions

        solve_required_executions(
            config=config,
            db=db,
            dry_run=dry_run,
        )

    if failed_dirs:
        logger.error(
            f"Ingestion failed for {len(failed_dirs)} of {len(file_or_directory)} input(s): "
            f"{', '.join(str(p) for p in failed_dirs)}"
        )
        raise typer.Exit(code=1)

list_(ctx, source_type=SourceDatasetType.CMIP6.value, column=None, include_files=typer.Option(False, help='Include files in the output'), limit=typer.Option(100, help='Limit the number of datasets (or files when using --include-files) to display to this number.'), dataset_filter=None) #

List the datasets that have been ingested

The data catalog is sorted by the date that the dataset was ingested (first = newest).

Source code in packages/climate-ref/src/climate_ref/cli/datasets.py
@app.command(name="list")
def list_(  # noqa: PLR0913
    ctx: typer.Context,
    source_type: Annotated[
        SourceDatasetType, typer.Option(help="Type of source dataset")
    ] = SourceDatasetType.CMIP6.value,  # type: ignore
    column: Annotated[list[str] | None, typer.Option()] = None,
    include_files: bool = typer.Option(False, help="Include files in the output"),
    limit: int = typer.Option(
        100,
        help=(
            "Limit the number of datasets (or files when using --include-files) to display to this number."
        ),
    ),
    dataset_filter: Annotated[
        list[str] | None,
        typer.Option(
            help="Filter datasets by facet values using key=value syntax. "
            "For example, --dataset-filter source_id=ACCESS-CM2 --dataset-filter variable_id=tas. "
            "Multiple values for the same facet are ORed (include any match), "
            "different facets are ANDed (must match all). "
            "Multiple values can be provided"
        ),
    ] = None,
) -> None:
    """
    List the datasets that have been ingested

    The data catalog is sorted by the date that the dataset was ingested (first = newest).
    """
    database = ctx.obj.database

    adapter = get_dataset_adapter(source_type.value)
    data_catalog = adapter.load_catalog(database, include_files=include_files, limit=limit)

    if dataset_filter:
        try:
            parsed_filters = parse_facet_filters(dataset_filter)
        except ValueError as e:
            raise typer.BadParameter(str(e), param_hint="--dataset-filter")

        for facet in parsed_filters:
            if facet not in data_catalog.columns:
                logger.error(
                    f"Filter facet '{facet}' not found in data catalog. "
                    f"Choose from: {', '.join(sorted(data_catalog.columns))}"
                )
                raise typer.Exit(code=1)

        filtered = apply_dataset_filters({source_type: data_catalog}, parsed_filters)
        data_catalog = filtered[source_type]  # type: ignore[assignment]  # input is DataFrame

    if column:
        missing = set(column) - set(data_catalog.columns)
        if missing:

            def format_(columns: Iterable[str]) -> str:
                return ", ".join(f"'{c}'" for c in sorted(columns))

            logger.error(
                f"Column{'s' if len(missing) > 1 else ''} "
                f"{format_(missing)} not found in data catalog. "
                f"Choose from: {format_(data_catalog.columns)}"
            )
            raise typer.Exit(code=1)
        data_catalog = data_catalog[column].sort_values(by=column)

    pretty_print_df(data_catalog, console=ctx.obj.console)

list_columns(ctx, source_type=SourceDatasetType.CMIP6.value, include_files=typer.Option(False, help='Include files in the output')) #

List the available columns in the data catalog for the given source type.

Source code in packages/climate-ref/src/climate_ref/cli/datasets.py
@app.command()
def list_columns(
    ctx: typer.Context,
    source_type: Annotated[
        SourceDatasetType, typer.Option(help="Type of source dataset")
    ] = SourceDatasetType.CMIP6.value,  # type: ignore
    include_files: bool = typer.Option(False, help="Include files in the output"),
) -> None:
    """
    List the available columns in the data catalog for the given source type.
    """
    database = ctx.obj.database

    adapter = get_dataset_adapter(source_type.value)
    data_catalog = adapter.load_catalog(database, include_files=include_files)

    for column in sorted(data_catalog.columns.to_list()):
        print(column)

stats(ctx, source_type=None, group_by=None) #

Show summary statistics for datasets.

Displays counts of datasets grouped by dataset type, with finalisation status breakdown and file counts. Optionally expand by a facet using --group-by (e.g., --group-by source_id).

Source code in packages/climate-ref/src/climate_ref/cli/datasets.py
@app.command()
def stats(
    ctx: typer.Context,
    source_type: Annotated[SourceDatasetType | None, typer.Option(help="Filter by dataset type")] = None,
    group_by: Annotated[
        str | None,
        typer.Option(
            help=f"Group results by a dataset facet. Allowed values: {', '.join(_ALLOWED_GROUP_BY)}. "
            "Requires --source-type to be specified."
        ),
    ] = None,
) -> None:
    """
    Show summary statistics for datasets.

    Displays counts of datasets grouped by dataset type, with finalisation status breakdown
    and file counts. Optionally expand by a facet using --group-by (e.g., --group-by source_id).
    """
    import pandas as pd
    from sqlalchemy import case, func

    from climate_ref.models.dataset import DatasetFile

    session = ctx.obj.database.session
    console = ctx.obj.console

    if group_by and source_type is None:
        logger.error("--group-by requires --source-type to be specified.")
        raise typer.Exit(code=1)

    if group_by is not None and group_by not in _ALLOWED_GROUP_BY:
        logger.error(f"Invalid --group-by value '{group_by}'. Allowed values: {', '.join(_ALLOWED_GROUP_BY)}")
        raise typer.Exit(code=1)

    # When source_type is given, query from the subclass to access its columns
    base: type[Dataset] = Dataset
    group_col = None
    if source_type is not None:
        base = get_dataset_adapter(source_type.value).dataset_cls
        if group_by is not None:
            group_col = getattr(base, group_by)

    group_columns = [base.dataset_type]
    if group_col is not None:
        group_columns.append(group_col)

    query = (
        session.query(
            base.dataset_type.label("dataset_type"),
            *([group_col.label(group_by)] if group_col is not None else []),
            func.count(func.distinct(base.id)).label("datasets"),
            func.count(DatasetFile.id).label("files"),
            func.count(func.distinct(case((base.finalised.is_(True), base.id)))).label("finalised"),
            func.count(func.distinct(case((base.finalised.is_(False), base.id)))).label("unfinalised"),
        )
        .outerjoin(DatasetFile, base.id == DatasetFile.dataset_id)
        .group_by(*group_columns)
    )

    if source_type is not None:
        query = query.filter(base.dataset_type == source_type)

    results = query.all()

    if not results:
        console.print("No datasets found.")
        return

    rows: list[dict[str, object]] = []
    for row in results:
        entry: dict[str, object] = {"dataset_type": row.dataset_type.value}
        if group_by is not None and group_col is not None:
            entry[group_by] = getattr(row, group_by)
        entry["datasets"] = row.datasets
        entry["files"] = row.files
        entry["finalised"] = row.finalised
        entry["unfinalised"] = row.unfinalised
        rows.append(entry)

    results_df = pd.DataFrame(rows)
    pretty_print_df(results_df, console=console)