Source code for neurocore.persistence.base
"""Persistence models and the RunStore interface.
NeuroCore persists every blueprint execution as a durable *run* with an
ordered list of *step* records. This makes runs inspectable, resumable, and
replayable — the difference between a "nice framework" and a "production agent
runtime".
The ``RunStore`` interface is intentionally separate from flowengine's
``CheckpointStore``: a checkpoint is transient (deleted on resume), while a run
record is durable history. A ``RunStore`` can still *back* a ``CheckpointStore``
for flowengine's sync suspend/resume path — see
:mod:`neurocore.persistence.checkpoint_adapter`.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import UTC, datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field
[docs]
def utcnow_iso() -> str:
"""Return the current UTC time as an ISO-8601 string."""
return datetime.now(UTC).isoformat()
class RunStatus(StrEnum):
"""Lifecycle status of a run."""
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SUSPENDED = "suspended" # awaiting human approval / external input
CANCELLED = "cancelled"
class StepStatus(StrEnum):
"""Outcome of a single step within a run."""
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class RunRecord(BaseModel):
"""A durable record of one blueprint execution."""
run_id: str
blueprint_name: str
blueprint_version: str = "1.0"
blueprint_path: str | None = None
# Full Blueprint.model_dump() so a run can be replayed/resumed even if the
# source file has moved or changed.
blueprint_snapshot: dict[str, Any] = Field(default_factory=dict)
flow_type: str = "sequential"
status: RunStatus = RunStatus.RUNNING
initial_data: dict[str, Any] = Field(default_factory=dict)
final_context: dict[str, Any] | None = None # FlowContext.to_dict()
error: str | None = None
suspended_at_node: str | None = None
suspension_reason: str | None = None
checkpoint_id: str | None = None # flowengine checkpoint id (sync path only)
created_at: str = Field(default_factory=utcnow_iso)
updated_at: str = Field(default_factory=utcnow_iso)
duration_ms: float | None = None
class StepRecord(BaseModel):
"""A record of one step (component invocation) within a run."""
run_id: str
step_index: int
component: str
skill_type: str | None = None
status: StepStatus = StepStatus.COMPLETED
started_at: str = Field(default_factory=utcnow_iso)
duration_ms: float | None = None
error: str | None = None
output_keys: list[str] = Field(default_factory=list)
context_snapshot: dict[str, Any] | None = None # gated by persist_step_snapshots
class RunStore(ABC):
"""Abstract durable store for run history.
Implementations: :class:`~neurocore.persistence.sqlite_store.SQLiteRunStore`
(default) and :class:`~neurocore.persistence.memory_store.InMemoryRunStore`
(tests).
"""
@abstractmethod
def save_run(self, run: RunRecord) -> str:
"""Insert or update a run by ``run_id``. Returns the run_id."""
...
@abstractmethod
def save_step(self, step: StepRecord) -> None:
"""Insert or update a step by ``(run_id, step_index)``."""
...
@abstractmethod
def load_run(self, run_id: str) -> RunRecord | None:
"""Load a run by id, or None if not found."""
...
@abstractmethod
def load_steps(self, run_id: str) -> list[StepRecord]:
"""Load a run's steps, ordered by step_index."""
...
@abstractmethod
def list_runs(
self,
*,
status: RunStatus | None = None,
blueprint: str | None = None,
limit: int = 50,
) -> list[RunRecord]:
"""List runs newest-first, optionally filtered by status/blueprint."""
...
@abstractmethod
def delete_run(self, run_id: str) -> None:
"""Delete a run and its steps."""
...
def close(self) -> None: # noqa: B027 — optional override, default no-op
"""Release any underlying resources (no-op by default)."""