Skip to content

climate_ref.database #

Database adapter layer

This module provides a database adapter layer that abstracts the database connection and migrations. This allows us to easily switch between different database backends, and to run migrations when the database is loaded.

The Database class is the main entry point for interacting with the database. It provides a session object that can be used to interact with the database and run queries.

Database #

Manage the database connection and migrations

The database migrations are optionally run after the connection to the database is established.

Source code in packages/climate-ref/src/climate_ref/database.py
class Database:
    """
    Manage the database connection and migrations

    The database migrations are optionally run after the connection to the database is established.
    """

    def __init__(self, url: str, *, connect_args: dict[str, Any] | None = None) -> None:
        logger.info(f"Connecting to database at {url}")
        self.url = url
        engine_kwargs: dict[str, Any] = {}
        if connect_args:
            engine_kwargs["connect_args"] = connect_args
        self._engine = sqlalchemy.create_engine(self.url, **engine_kwargs)
        # TODO: Set autobegin=False
        self.session = Session(self._engine)

    def __enter__(self) -> "Database":
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        self.close()

    def close(self) -> None:
        """
        Close the database connection

        This closes the session and disposes of the engine, releasing all connections.
        """
        try:
            self.session.close()
        finally:
            self._engine.dispose()

    def alembic_config(self, config: "Config") -> AlembicConfig:
        """
        Get the Alembic configuration object for the database

        This includes an open connection with the database engine and the REF configuration.

        Returns
        -------
        :
            The Alembic configuration object that can be used with alembic commands
        """
        alembic_config_filename = importlib.resources.files("climate_ref") / "alembic.ini"
        if not alembic_config_filename.is_file():  # pragma: no cover
            raise FileNotFoundError(f"{alembic_config_filename} not found")

        alembic_config = AlembicConfig(str(alembic_config_filename))
        alembic_config.attributes["connection"] = self._engine
        alembic_config.attributes["ref_config"] = config

        return alembic_config

    def migration_status(self, config: "Config") -> dict[str, Any]:
        """
        Report the current migration state of the database.

        Returns a dict with ``current`` (the current revision or ``None``),
        ``head`` (the latest available revision), and ``state`` (a
        :class:`MigrationState`).

        This is the canonical way for consumers of the library to check whether
        the database schema matches what the installed ``climate_ref`` expects.
        Prefer it over re-deriving Alembic plumbing in downstream code.

        Parameters
        ----------
        config
            REF Configuration, used to build the Alembic config.

        Returns
        -------
        :
            A dict with keys ``current``, ``head``, and ``state``.
        """
        alembic_cfg = self.alembic_config(config)
        script = ScriptDirectory.from_config(alembic_cfg)
        head_rev = script.get_current_head()

        with self._engine.connect() as connection:
            current_rev = _get_database_revision(connection)

        if current_rev in _REMOVED_REVISIONS:
            state = MigrationState.REMOVED
        elif current_rev is None:
            state = MigrationState.UNMANAGED
        elif current_rev == head_rev:
            state = MigrationState.UP_TO_DATE
        else:
            state = MigrationState.BEHIND

        return {"current": current_rev, "head": head_rev, "state": state}

    def migrate(self, config: "Config", skip_backup: bool = False) -> None:
        """
        Migrate the database to the latest revision

        Parameters
        ----------
        config
            REF Configuration

            This is passed to alembic
        skip_backup
            If True, skip creating a backup before running migrations.
            Useful for read-only commands that don't modify the database.
        """
        # Check if the database revision is one of the removed revisions
        # If it is, then we need to delete the database and start again
        with self._engine.connect() as connection:
            current_rev = _get_database_revision(connection)
            logger.debug(f"Current database revision: {current_rev}")
            if current_rev in _REMOVED_REVISIONS:
                raise ValueError(
                    f"Database revision {current_rev!r} has been removed in "
                    f"https://github.com/Climate-REF/climate-ref/pull/271. "
                    "Please delete your database and start again."
                )

        # Create backup before running migrations (unless skipped)
        db_path = _get_sqlite_path(self.url)
        if not skip_backup and db_path is not None:
            _create_backup(db_path, config.db.max_backups)

        alembic.command.upgrade(self.alembic_config(config), "heads")

    @staticmethod
    def from_config(
        config: "Config",
        run_migrations: bool = True,
        skip_backup: bool = False,
        *,
        read_only: bool = False,
    ) -> "Database":
        """
        Create a Database instance from a Config instance

        The `REF_DATABASE_URL` environment variable will take preference,
         and override the database URL specified in the config.

        Parameters
        ----------
        config
            The Config instance that includes information about where the database is located
        run_migrations
            If True, run any outstanding database migrations.
            Forced to False when ``read_only=True``.
        skip_backup
            If True, skip creating a backup before running migrations.
            Useful for read-only commands that don't modify the database.
        read_only
            If True, open the database in read-only mode and skip migrations.

            SQLite URLs are rewritten to URI form with ``mode=ro&immutable=1``.
            For other backends, callers must configure the connecting role as
            read-only themselves.

        Returns
        -------
        :
            A new Database instance
        """
        database_url: str = config.db.database_url
        connect_args: dict[str, Any] = {}

        if read_only:
            database_url, connect_args = _make_readonly_sqlite_url(database_url)
            run_migrations = False

        database_url = validate_database_url(database_url)

        cv = CV.load_from_file(config.paths.dimensions_cv)
        db = Database(database_url, connect_args=connect_args or None)

        if run_migrations:
            # Run any outstanding migrations
            # This also adds any diagnostic value columns to the DB if they don't exist
            db.migrate(config, skip_backup=skip_backup)
        # Register the CV dimensions with the MetricValue model
        # This will add new columns to the db if the CVs have changed
        MetricValue.register_cv_dimensions(cv)

        # Register the CV dimensions with the ExecutionOutput model
        # This enables dimension-based filtering of outputs
        ExecutionOutput.register_cv_dimensions(cv)

        return db

    def update_or_create(
        self, model: type[Table], defaults: dict[str, Any] | None = None, **kwargs: Any
    ) -> tuple[Table, ModelState | None]:
        """
        Update an existing instance or create a new one

        Safe under concurrent writers: creation goes through :meth:`get_or_create`,
        which wraps the INSERT in a SAVEPOINT so a racing writer's UNIQUE conflict
        resolves to fetching the winner's row. Field updates are then applied on top.

        Note: last-writer-wins for the update step. If two callers update the same
        pre-existing row concurrently, the later-committing transaction's values win.
        This race is inherent to update semantics and is not addressed here.

        This doesn't commit the outer transaction,
        so you will need to call `session.commit()` after this method
        or use a transaction context manager.

        Parameters
        ----------
        model
            The model to update or create
        defaults
            Default values to use when creating a new instance, or values to update on existing instance
        kwargs
            The filter parameters to use when querying for an instance

        Returns
        -------
        :
            A tuple containing the instance and a state enum indicating if the instance was created or updated
        """
        instance, state = self.get_or_create(model, defaults=defaults, **kwargs)
        if state is ModelState.CREATED:
            return instance, state

        # Row pre-existed (or a racing writer's row was fetched).
        # Apply caller-provided defaults as updates on top.
        if defaults:
            for key, value in defaults.items():
                if _values_differ(getattr(instance, key), value):
                    logger.debug(f"Updating {model.__name__} {key} to {value}")
                    setattr(instance, key, value)
                    state = ModelState.UPDATED
        return instance, state

    def get_or_create(
        self, model: type[Table], defaults: dict[str, Any] | None = None, **kwargs: Any
    ) -> tuple[Table, ModelState | None]:
        """
        Get or create an instance of a model

        To support concurrent writers, a SAVEPOINT is wrapped around the INSERT operation.
        This allows for a UNIQUE-constraint violation from a racing transaction to be caught
        and the row inserted by the winner to be re-fetched instead of crashing.
        The surrounding transaction (if any) is left intact.

        This doesn't commit the outer transaction,
        so you will need to call `session.commit()` after this method
        or use a transaction context manager.

        Parameters
        ----------
        model
            The model to get or create
        defaults
            Default values to use when creating a new instance
        kwargs
            The filter parameters to use when querying for an instance

        Returns
        -------
        :
            A tuple containing the instance and enum indicating if the instance was created
        """
        instance = self.session.query(model).filter_by(**kwargs).first()
        if instance:
            return instance, None

        params = {**kwargs, **(defaults or {})}
        try:
            # Avoiding dialect-specific on_conflict_do_nothing syntax to maintain compatibility
            with self.session.begin_nested():
                instance = model(**params)
                self.session.add(instance)
            return instance, ModelState.CREATED
        except IntegrityError:
            # Could be a racing INSERT on the same unique key, OR a genuine integrity
            # bug (NOT NULL violation, missing FK, failed CHECK).  Distinguish by
            # re-running the lookup: a racing winner will now be visible; a real bug
            # leaves the SELECT empty, in which case we re-raise the original error.
            instance = self.session.query(model).filter_by(**kwargs).first()
            if instance is None:
                raise
            return instance, None

alembic_config(config) #

Get the Alembic configuration object for the database

This includes an open connection with the database engine and the REF configuration.

Returns:

Type Description
Config

The Alembic configuration object that can be used with alembic commands

Source code in packages/climate-ref/src/climate_ref/database.py
def alembic_config(self, config: "Config") -> AlembicConfig:
    """
    Get the Alembic configuration object for the database

    This includes an open connection with the database engine and the REF configuration.

    Returns
    -------
    :
        The Alembic configuration object that can be used with alembic commands
    """
    alembic_config_filename = importlib.resources.files("climate_ref") / "alembic.ini"
    if not alembic_config_filename.is_file():  # pragma: no cover
        raise FileNotFoundError(f"{alembic_config_filename} not found")

    alembic_config = AlembicConfig(str(alembic_config_filename))
    alembic_config.attributes["connection"] = self._engine
    alembic_config.attributes["ref_config"] = config

    return alembic_config

close() #

Close the database connection

This closes the session and disposes of the engine, releasing all connections.

Source code in packages/climate-ref/src/climate_ref/database.py
def close(self) -> None:
    """
    Close the database connection

    This closes the session and disposes of the engine, releasing all connections.
    """
    try:
        self.session.close()
    finally:
        self._engine.dispose()

from_config(config, run_migrations=True, skip_backup=False, *, read_only=False) staticmethod #

Create a Database instance from a Config instance

The REF_DATABASE_URL environment variable will take preference, and override the database URL specified in the config.

Parameters:

Name Type Description Default
config Config

The Config instance that includes information about where the database is located

required
run_migrations bool

If True, run any outstanding database migrations. Forced to False when read_only=True.

True
skip_backup bool

If True, skip creating a backup before running migrations. Useful for read-only commands that don't modify the database.

False
read_only bool

If True, open the database in read-only mode and skip migrations.

SQLite URLs are rewritten to URI form with mode=ro&immutable=1. For other backends, callers must configure the connecting role as read-only themselves.

False

Returns:

Type Description
Database

A new Database instance

Source code in packages/climate-ref/src/climate_ref/database.py
@staticmethod
def from_config(
    config: "Config",
    run_migrations: bool = True,
    skip_backup: bool = False,
    *,
    read_only: bool = False,
) -> "Database":
    """
    Create a Database instance from a Config instance

    The `REF_DATABASE_URL` environment variable will take preference,
     and override the database URL specified in the config.

    Parameters
    ----------
    config
        The Config instance that includes information about where the database is located
    run_migrations
        If True, run any outstanding database migrations.
        Forced to False when ``read_only=True``.
    skip_backup
        If True, skip creating a backup before running migrations.
        Useful for read-only commands that don't modify the database.
    read_only
        If True, open the database in read-only mode and skip migrations.

        SQLite URLs are rewritten to URI form with ``mode=ro&immutable=1``.
        For other backends, callers must configure the connecting role as
        read-only themselves.

    Returns
    -------
    :
        A new Database instance
    """
    database_url: str = config.db.database_url
    connect_args: dict[str, Any] = {}

    if read_only:
        database_url, connect_args = _make_readonly_sqlite_url(database_url)
        run_migrations = False

    database_url = validate_database_url(database_url)

    cv = CV.load_from_file(config.paths.dimensions_cv)
    db = Database(database_url, connect_args=connect_args or None)

    if run_migrations:
        # Run any outstanding migrations
        # This also adds any diagnostic value columns to the DB if they don't exist
        db.migrate(config, skip_backup=skip_backup)
    # Register the CV dimensions with the MetricValue model
    # This will add new columns to the db if the CVs have changed
    MetricValue.register_cv_dimensions(cv)

    # Register the CV dimensions with the ExecutionOutput model
    # This enables dimension-based filtering of outputs
    ExecutionOutput.register_cv_dimensions(cv)

    return db

get_or_create(model, defaults=None, **kwargs) #

Get or create an instance of a model

To support concurrent writers, a SAVEPOINT is wrapped around the INSERT operation. This allows for a UNIQUE-constraint violation from a racing transaction to be caught and the row inserted by the winner to be re-fetched instead of crashing. The surrounding transaction (if any) is left intact.

This doesn't commit the outer transaction, so you will need to call session.commit() after this method or use a transaction context manager.

Parameters:

Name Type Description Default
model type[Table]

The model to get or create

required
defaults dict[str, Any] | None

Default values to use when creating a new instance

None
kwargs Any

The filter parameters to use when querying for an instance

{}

Returns:

Type Description
tuple[Table, ModelState | None]

A tuple containing the instance and enum indicating if the instance was created

Source code in packages/climate-ref/src/climate_ref/database.py
def get_or_create(
    self, model: type[Table], defaults: dict[str, Any] | None = None, **kwargs: Any
) -> tuple[Table, ModelState | None]:
    """
    Get or create an instance of a model

    To support concurrent writers, a SAVEPOINT is wrapped around the INSERT operation.
    This allows for a UNIQUE-constraint violation from a racing transaction to be caught
    and the row inserted by the winner to be re-fetched instead of crashing.
    The surrounding transaction (if any) is left intact.

    This doesn't commit the outer transaction,
    so you will need to call `session.commit()` after this method
    or use a transaction context manager.

    Parameters
    ----------
    model
        The model to get or create
    defaults
        Default values to use when creating a new instance
    kwargs
        The filter parameters to use when querying for an instance

    Returns
    -------
    :
        A tuple containing the instance and enum indicating if the instance was created
    """
    instance = self.session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, None

    params = {**kwargs, **(defaults or {})}
    try:
        # Avoiding dialect-specific on_conflict_do_nothing syntax to maintain compatibility
        with self.session.begin_nested():
            instance = model(**params)
            self.session.add(instance)
        return instance, ModelState.CREATED
    except IntegrityError:
        # Could be a racing INSERT on the same unique key, OR a genuine integrity
        # bug (NOT NULL violation, missing FK, failed CHECK).  Distinguish by
        # re-running the lookup: a racing winner will now be visible; a real bug
        # leaves the SELECT empty, in which case we re-raise the original error.
        instance = self.session.query(model).filter_by(**kwargs).first()
        if instance is None:
            raise
        return instance, None

migrate(config, skip_backup=False) #

Migrate the database to the latest revision

Parameters:

Name Type Description Default
config Config

REF Configuration

This is passed to alembic

required
skip_backup bool

If True, skip creating a backup before running migrations. Useful for read-only commands that don't modify the database.

False
Source code in packages/climate-ref/src/climate_ref/database.py
def migrate(self, config: "Config", skip_backup: bool = False) -> None:
    """
    Migrate the database to the latest revision

    Parameters
    ----------
    config
        REF Configuration

        This is passed to alembic
    skip_backup
        If True, skip creating a backup before running migrations.
        Useful for read-only commands that don't modify the database.
    """
    # Check if the database revision is one of the removed revisions
    # If it is, then we need to delete the database and start again
    with self._engine.connect() as connection:
        current_rev = _get_database_revision(connection)
        logger.debug(f"Current database revision: {current_rev}")
        if current_rev in _REMOVED_REVISIONS:
            raise ValueError(
                f"Database revision {current_rev!r} has been removed in "
                f"https://github.com/Climate-REF/climate-ref/pull/271. "
                "Please delete your database and start again."
            )

    # Create backup before running migrations (unless skipped)
    db_path = _get_sqlite_path(self.url)
    if not skip_backup and db_path is not None:
        _create_backup(db_path, config.db.max_backups)

    alembic.command.upgrade(self.alembic_config(config), "heads")

migration_status(config) #

Report the current migration state of the database.

Returns a dict with current (the current revision or None), head (the latest available revision), and state (a :class:MigrationState).

This is the canonical way for consumers of the library to check whether the database schema matches what the installed climate_ref expects. Prefer it over re-deriving Alembic plumbing in downstream code.

Parameters:

Name Type Description Default
config Config

REF Configuration, used to build the Alembic config.

required

Returns:

Type Description
dict[str, Any]

A dict with keys current, head, and state.

Source code in packages/climate-ref/src/climate_ref/database.py
def migration_status(self, config: "Config") -> dict[str, Any]:
    """
    Report the current migration state of the database.

    Returns a dict with ``current`` (the current revision or ``None``),
    ``head`` (the latest available revision), and ``state`` (a
    :class:`MigrationState`).

    This is the canonical way for consumers of the library to check whether
    the database schema matches what the installed ``climate_ref`` expects.
    Prefer it over re-deriving Alembic plumbing in downstream code.

    Parameters
    ----------
    config
        REF Configuration, used to build the Alembic config.

    Returns
    -------
    :
        A dict with keys ``current``, ``head``, and ``state``.
    """
    alembic_cfg = self.alembic_config(config)
    script = ScriptDirectory.from_config(alembic_cfg)
    head_rev = script.get_current_head()

    with self._engine.connect() as connection:
        current_rev = _get_database_revision(connection)

    if current_rev in _REMOVED_REVISIONS:
        state = MigrationState.REMOVED
    elif current_rev is None:
        state = MigrationState.UNMANAGED
    elif current_rev == head_rev:
        state = MigrationState.UP_TO_DATE
    else:
        state = MigrationState.BEHIND

    return {"current": current_rev, "head": head_rev, "state": state}

update_or_create(model, defaults=None, **kwargs) #

Update an existing instance or create a new one

Safe under concurrent writers: creation goes through :meth:get_or_create, which wraps the INSERT in a SAVEPOINT so a racing writer's UNIQUE conflict resolves to fetching the winner's row. Field updates are then applied on top.

Note: last-writer-wins for the update step. If two callers update the same pre-existing row concurrently, the later-committing transaction's values win. This race is inherent to update semantics and is not addressed here.

This doesn't commit the outer transaction, so you will need to call session.commit() after this method or use a transaction context manager.

Parameters:

Name Type Description Default
model type[Table]

The model to update or create

required
defaults dict[str, Any] | None

Default values to use when creating a new instance, or values to update on existing instance

None
kwargs Any

The filter parameters to use when querying for an instance

{}

Returns:

Type Description
tuple[Table, ModelState | None]

A tuple containing the instance and a state enum indicating if the instance was created or updated

Source code in packages/climate-ref/src/climate_ref/database.py
def update_or_create(
    self, model: type[Table], defaults: dict[str, Any] | None = None, **kwargs: Any
) -> tuple[Table, ModelState | None]:
    """
    Update an existing instance or create a new one

    Safe under concurrent writers: creation goes through :meth:`get_or_create`,
    which wraps the INSERT in a SAVEPOINT so a racing writer's UNIQUE conflict
    resolves to fetching the winner's row. Field updates are then applied on top.

    Note: last-writer-wins for the update step. If two callers update the same
    pre-existing row concurrently, the later-committing transaction's values win.
    This race is inherent to update semantics and is not addressed here.

    This doesn't commit the outer transaction,
    so you will need to call `session.commit()` after this method
    or use a transaction context manager.

    Parameters
    ----------
    model
        The model to update or create
    defaults
        Default values to use when creating a new instance, or values to update on existing instance
    kwargs
        The filter parameters to use when querying for an instance

    Returns
    -------
    :
        A tuple containing the instance and a state enum indicating if the instance was created or updated
    """
    instance, state = self.get_or_create(model, defaults=defaults, **kwargs)
    if state is ModelState.CREATED:
        return instance, state

    # Row pre-existed (or a racing writer's row was fetched).
    # Apply caller-provided defaults as updates on top.
    if defaults:
        for key, value in defaults.items():
            if _values_differ(getattr(instance, key), value):
                logger.debug(f"Updating {model.__name__} {key} to {value}")
                setattr(instance, key, value)
                state = ModelState.UPDATED
    return instance, state

MigrationState #

Bases: Enum

State of the database schema relative to the expected Alembic head.

Source code in packages/climate-ref/src/climate_ref/database.py
class MigrationState(enum.Enum):
    """
    State of the database schema relative to the expected Alembic head.
    """

    UP_TO_DATE = "up_to_date"
    BEHIND = "behind"
    UNMANAGED = "unmanaged"
    REMOVED = "removed"

ModelState #

Bases: Enum

State of a model instance

Source code in packages/climate-ref/src/climate_ref/database.py
class ModelState(enum.Enum):
    """
    State of a model instance
    """

    CREATED = "created"
    UPDATED = "updated"
    DELETED = "deleted"

validate_database_url(database_url) #

Validate a database URL

We support sqlite databases, and we create the directory if it doesn't exist. We may aim to support PostgreSQL databases, but this is currently experimental and untested.

Parameters:

Name Type Description Default
database_url str

The database URL to validate

See climate_ref.config.DbConfig.database_url for more information on the format of the URL.

required

Raises:

Type Description
ValueError

If the database scheme is not supported

Returns:

Type Description
str

The validated database URL

Source code in packages/climate-ref/src/climate_ref/database.py
def validate_database_url(database_url: str) -> str:
    """
    Validate a database URL

    We support sqlite databases, and we create the directory if it doesn't exist.
    We may aim to support PostgreSQL databases, but this is currently experimental and untested.

    Parameters
    ----------
    database_url
        The database URL to validate

        See [climate_ref.config.DbConfig.database_url][climate_ref.config.DbConfig.database_url]
        for more information on the format of the URL.

    Raises
    ------
    ValueError
        If the database scheme is not supported

    Returns
    -------
    :
        The validated database URL
    """
    split_url = urlparse.urlsplit(database_url)

    if split_url.scheme == "sqlite":
        # URI-form SQLite URLs (``sqlite:///file:...``) are passed through
        # verbatim — the caller has supplied an explicit URI, possibly for a
        # read-only on-disk file, and we should neither treat it as in-memory
        # nor try to mkdir its (opaque) parent directory.
        if split_url.path[1:].startswith("file:"):
            logger.debug("Using URI-form SQLite URL; skipping parent directory creation")
        else:
            sqlite_path = _get_sqlite_path(database_url)
            if sqlite_path is None:
                logger.warning("Using an in-memory database")
            else:
                sqlite_path.parent.mkdir(parents=True, exist_ok=True)
    elif split_url.scheme == "postgresql":
        # We don't need to do anything special for PostgreSQL
        logger.warning("PostgreSQL support is currently experimental and untested")
    else:
        raise ValueError(f"Unsupported database scheme: {split_url.scheme}")

    return database_url