Source code for neurocore.persistence.checkpoint_adapter

"""Adapt a NeuroCore RunStore to flowengine's CheckpointStore.

flowengine's sync ``FlowEngine.execute()`` auto-saves a ``Checkpoint`` when a
component suspends, and ``FlowEngine.resume()`` reloads it. We back that with the
same SQLite file the RunStore uses (the ``checkpoints`` table) so a single
``runs.db`` holds both durable history and transient checkpoints.

For the in-memory backend (tests), we fall back to flowengine's own
``InMemoryCheckpointStore``.
"""
from __future__ import annotations

import json

from flowengine import Checkpoint, CheckpointStore, InMemoryCheckpointStore

from neurocore.persistence.base import RunStore
from neurocore.persistence.sqlite_store import SQLiteRunStore, _dumps


class SQLiteCheckpointStore(CheckpointStore):
    """flowengine CheckpointStore backed by a SQLiteRunStore's connection."""

    def __init__(self, store: SQLiteRunStore) -> None:
        self._store = store

    def save(self, checkpoint: Checkpoint) -> str:
        with self._store._lock:  # noqa: SLF001 — same package, shared connection
            self._store._conn.execute(
                """
                INSERT INTO checkpoints (checkpoint_id, flow_config, context, created_at)
                VALUES (?,?,?,?)
                ON CONFLICT(checkpoint_id) DO UPDATE SET
                    flow_config=excluded.flow_config,
                    context=excluded.context,
                    created_at=excluded.created_at
                """,
                (
                    checkpoint.checkpoint_id,
                    _dumps(checkpoint.flow_config),
                    _dumps(checkpoint.context),
                    checkpoint.created_at,
                ),
            )
            self._store._conn.commit()
        return checkpoint.checkpoint_id

    def load(self, checkpoint_id: str) -> Checkpoint | None:
        with self._store._lock:  # noqa: SLF001
            cur = self._store._conn.execute(
                "SELECT * FROM checkpoints WHERE checkpoint_id=?", (checkpoint_id,)
            )
            row = cur.fetchone()
        if row is None:
            return None
        return Checkpoint(
            checkpoint_id=row["checkpoint_id"],
            flow_config=json.loads(row["flow_config"]),
            context=json.loads(row["context"]),
            created_at=row["created_at"],
        )

    def delete(self, checkpoint_id: str) -> None:
        with self._store._lock:  # noqa: SLF001
            self._store._conn.execute(
                "DELETE FROM checkpoints WHERE checkpoint_id=?", (checkpoint_id,)
            )
            self._store._conn.commit()


[docs] def checkpoint_store_for(run_store: RunStore | None) -> CheckpointStore | None: """Return a flowengine CheckpointStore backed by ``run_store``. Returns None when persistence is disabled, a SQLite-backed store for the sqlite backend, and a per-run_store in-memory store for the memory backend. The in-memory store is memoized on the run_store so checkpoints survive across the suspend/resume boundary (which makes two separate calls). """ if run_store is None: return None if isinstance(run_store, SQLiteRunStore): return SQLiteCheckpointStore(run_store) cached = getattr(run_store, "_checkpoint_store", None) if cached is None: cached = InMemoryCheckpointStore() run_store._checkpoint_store = cached # type: ignore[attr-defined] return cached