Source code for neurocore.runtime.executor

"""Blueprint executor — wires skills into FlowEngine and runs them.

The executor is the bridge between NeuroCore's skill system and
FlowEngine's execution engine. It:

1. Resolves skill names from the blueprint to Skill classes via the registry
2. Merges config: neurocore.yaml skills.<name> (base) + blueprint config (overlay)
3. Creates and initializes Skill instances
4. Builds a FlowEngine FlowConfig and component dict
5. Executes via FlowEngine (sync or async)

Usage:
    from neurocore.runtime import execute_blueprint, load_and_run

    # Low-level
    result = execute_blueprint(blueprint, registry, config)

    # High-level (load + discover + execute)
    result = load_and_run(blueprint_path, project_root=Path("."))

    # Streaming
    async for event in execute_blueprint_stream(bp, registry, cfg):
        print(event.event_type, event.step_name)
"""

from __future__ import annotations

import asyncio
import random
import time
from collections import defaultdict, deque
from collections.abc import AsyncIterator
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from uuid import uuid4

from flowengine import (
    ComponentConfig,
    FlowConfig,
    FlowContext,
    FlowDefinition,
    FlowEngine,
    FlowSettings,
    StepConfig,
)

from neurocore.config.loader import load_config
from neurocore.config.schema import NeuroCoreConfig
from neurocore.errors import BlueprintError, ExecutionError
from neurocore.logging import get_logger
from neurocore.persistence import (
    RunRecord,
    RunStatus,
    RunStore,
    StepRecord,
    StepStatus,
    build_run_store,
    checkpoint_store_for,
)
from neurocore.persistence.base import utcnow_iso
from neurocore.runtime.blueprint import Blueprint, load_blueprint, validate_blueprint
from neurocore.skills.base import Skill, is_async_skill
from neurocore.skills.loader import discover_skills
from neurocore.skills.registry import SkillRegistry

log = get_logger("executor")


@dataclass
class _Tracker:
    """Records step-level history for a run during async execution."""

    store: RunStore
    run_id: str
    persist_snapshots: bool = False

    def record_step(
        self,
        *,
        step_index: int,
        component: str,
        skill: Skill,
        status: StepStatus,
        started_iso: str,
        started_perf: float,
        context: FlowContext,
        error: str | None = None,
    ) -> None:
        self.store.save_step(
            StepRecord(
                run_id=self.run_id,
                step_index=step_index,
                component=component,
                skill_type=skill.skill_meta.name,
                status=status,
                started_at=started_iso,
                duration_ms=(time.time() - started_perf) * 1000,
                error=error,
                output_keys=list(skill.skill_meta.provides or []),
                context_snapshot=context.to_dict() if self.persist_snapshots else None,
            )
        )


[docs] def merge_skill_config( neurocore_config: NeuroCoreConfig, skill_name: str, blueprint_config: dict[str, Any], ) -> dict[str, Any]: """Merge skill configuration from neurocore.yaml and blueprint. Priority (highest wins): 1. Blueprint component config (overlay) 2. neurocore.yaml skills.<name> (base) Args: neurocore_config: The project's NeuroCoreConfig. skill_name: The skill's registered name (for neurocore.yaml lookup). blueprint_config: Config from the blueprint's component definition. Returns: Merged config dict. """ base = neurocore_config.get_skill_config(skill_name) merged = {**base, **blueprint_config} return merged
def _create_skill_instances( blueprint: Blueprint, registry: SkillRegistry, neurocore_config: NeuroCoreConfig, ) -> tuple[dict[str, Skill], dict[str, dict[str, Any]]]: """Create and initialize Skill instances for a blueprint. Args: blueprint: Parsed blueprint. registry: SkillRegistry with discovered skills. neurocore_config: Project config for config merging. Returns: Tuple of: - Dict mapping component instance names to initialized Skill instances. - Dict mapping component instance names to their merged configs. Raises: BlueprintError: If a skill cannot be found or instantiated. """ instances: dict[str, Skill] = {} merged_configs: dict[str, dict[str, Any]] = {} for comp in blueprint.components: # Resolve skill class from registry skill_cls = registry.get(comp.type) if skill_cls is None: raise BlueprintError( f"Component '{comp.name}' references unknown skill '{comp.type}'" ) # Create instance with the component's instance name try: instance = skill_cls(name=comp.name) except Exception as e: raise BlueprintError( f"Failed to instantiate skill '{comp.type}' as '{comp.name}': {e}" ) from e # Merge config and initialize merged_config = merge_skill_config(neurocore_config, comp.type, comp.config) instance.init(merged_config) # Inject LLM provider if required if instance.skill_meta.requires_llm: from neurocore.llm.provider import build_provider instance.llm = build_provider(merged_config) if instance.llm is None: raise BlueprintError( f"Skill '{comp.name}' has requires_llm=True but no llm_provider " f"is configured. Set llm_provider in neurocore.yaml or blueprint config." ) # Validate config config_errors = instance.validate_config() if config_errors: raise BlueprintError( f"Skill '{comp.name}' ({comp.type}) config validation failed: " + "; ".join(config_errors) ) instances[comp.name] = instance merged_configs[comp.name] = merged_config return instances, merged_configs def _build_flow_config( blueprint: Blueprint, merged_configs: dict[str, dict[str, Any]] | None = None, ) -> FlowConfig: """Convert a NeuroCore Blueprint into a FlowEngine FlowConfig. Maps NeuroCore's blueprint model to FlowEngine's config schema, using fully qualified class paths for component types. Args: blueprint: Parsed NeuroCore blueprint. merged_configs: Optional mapping of component names to their merged configs (neurocore.yaml + blueprint overlay). If provided, these are used instead of blueprint-only configs so FlowEngine initializes skills correctly. Returns: FlowEngine FlowConfig. """ # Build component configs — use the skill class path as the type # (FlowEngine needs a type path, but since we provide pre-built # components, it won't actually load from the path. We use a # placeholder path based on the skill type name.) fe_components = [] for comp in blueprint.components: # Use merged config if available, otherwise fall back to blueprint config config = ( merged_configs.get(comp.name, comp.config) if merged_configs else comp.config ) fe_components.append( ComponentConfig( name=comp.name, type=f"neurocore.skills.{comp.type}", config=config, ) ) # Build flow definition fe_flow_kwargs: dict[str, Any] = { "type": blueprint.flow.type, } if blueprint.flow.settings: fe_flow_kwargs["settings"] = FlowSettings(**blueprint.flow.settings) if blueprint.flow.steps: fe_flow_kwargs["steps"] = [ StepConfig( component=step.component, description=step.description, condition=step.condition, on_error=step.on_error, ) for step in blueprint.flow.steps ] if blueprint.flow.nodes: from flowengine import GraphEdgeConfig, GraphNodeConfig fe_flow_kwargs["nodes"] = [ GraphNodeConfig( id=node.id, component=node.component, description=node.description, on_error=node.on_error, ) for node in blueprint.flow.nodes ] if blueprint.flow.edges: fe_flow_kwargs["edges"] = [ GraphEdgeConfig( source=edge.source, target=edge.target, port=edge.port, condition=edge.condition, ) for edge in blueprint.flow.edges ] fe_flow = FlowDefinition(**fe_flow_kwargs) return FlowConfig( name=blueprint.name, version=blueprint.version, description=blueprint.description, components=fe_components, flow=fe_flow, ) # --------------------------------------------------------------------------- # Async helpers # --------------------------------------------------------------------------- async def _run_skill_async(skill: Skill, context: FlowContext) -> FlowContext: """Execute a skill's process(), with automatic retry and exponential backoff. Retry behaviour is controlled by skill.skill_meta: max_retries=0 → no retry (default, backward-compatible) max_retries=3 → up to 3 retries after the first failure retry_on=() → retry on any Exception when max_retries > 0 retry_on=(SomeError,) → retry only those exceptions retry_delay_base=1.0 → first retry after ~1s retry_delay_max=60.0 → subsequent retries never wait longer than 60s """ meta = skill.skill_meta max_retries: int = meta.max_retries retry_on: tuple[type[BaseException], ...] = meta.retry_on or (Exception,) base: float = meta.retry_delay_base max_delay: float = meta.retry_delay_max last_exc: BaseException | None = None for attempt in range(max_retries + 1): try: if is_async_skill(skill): return await skill.process(context) # type: ignore[misc, no-any-return] loop = asyncio.get_event_loop() return await loop.run_in_executor(None, skill.process, context) except BaseException as exc: if max_retries == 0: raise if not isinstance(exc, retry_on): raise last_exc = exc if attempt == max_retries: break raw_delay = base * (2 ** attempt) capped_delay = min(raw_delay, max_delay) jittered_delay = random.uniform(0, capped_delay) log.warning( "skill.retry", skill=skill.name, attempt=attempt + 1, max_retries=max_retries, delay_seconds=round(jittered_delay, 2), exception_type=type(exc).__name__, exception_msg=str(exc)[:200], ) await asyncio.sleep(jittered_delay) assert last_exc is not None # noqa: S101 raise last_exc def _get_sequential_steps(blueprint: Blueprint) -> list[Any]: """Return the ordered list of flow steps for sequential/conditional flows.""" if blueprint.flow.steps: return list(blueprint.flow.steps) return [] async def _execute_blueprint_async( blueprint: Blueprint, instances: dict[str, Skill], merged_configs: dict[str, dict[str, Any]], initial_data: dict[str, Any] | None, *, tracker: _Tracker | None = None, base_context: FlowContext | None = None, skip: set[str] | None = None, ) -> FlowContext: """Execute a sequential/conditional blueprint asynchronously. Records per-step history (when ``tracker`` is set), appends each completed component to ``metadata.completed_nodes``, and stops early if a skill suspends the run (e.g. a human-approval gate). ``skip`` lets a resume skip already-completed steps; ``base_context`` restores a prior run's state. """ skip = skip or set() context = base_context if base_context is not None else FlowContext() if initial_data: for k, v in initial_data.items(): context.set(k, v) steps = _get_sequential_steps(blueprint) for idx, step in enumerate(steps): if step.component in skip: continue skill = instances[step.component] started_iso, started_perf = utcnow_iso(), time.time() try: context = await _run_skill_async(skill, context) except Exception as exc: if tracker is not None: tracker.record_step( step_index=idx, component=step.component, skill=skill, status=StepStatus.FAILED, started_iso=started_iso, started_perf=started_perf, context=context, error=str(exc), ) raise if context.metadata.suspended: if tracker is not None: tracker.record_step( step_index=idx, component=step.component, skill=skill, status=StepStatus.SKIPPED, started_iso=started_iso, started_perf=started_perf, context=context, ) break if step.component not in context.metadata.completed_nodes: context.metadata.completed_nodes.append(step.component) if tracker is not None: tracker.record_step( step_index=idx, component=step.component, skill=skill, status=StepStatus.COMPLETED, started_iso=started_iso, started_perf=started_perf, context=context, ) return context async def _execute_dag_concurrent( blueprint: Blueprint, instances: dict[str, Skill], merged_configs: dict[str, dict[str, Any]], initial_data: dict[str, Any] | None, *, tracker: _Tracker | None = None, base_context: FlowContext | None = None, skip: set[str] | None = None, ) -> FlowContext: """Execute a DAG blueprint with concurrent layers. ``skip`` (node ids) lets a resume omit already-completed nodes whose outputs are restored via ``base_context``. Records per-node history and stops if a node suspends the run. """ skip = skip or set() edges = blueprint.flow.edges or [] nodes = blueprint.flow.nodes or [] in_degree: dict[str, int] = {n.id: 0 for n in nodes} adjacency: dict[str, list[str]] = defaultdict(list) for edge in edges: adjacency[edge.source].append(edge.target) in_degree[edge.target] = in_degree.get(edge.target, 0) + 1 # Kahn's algorithm to get layers queue: deque[str] = deque(k for k, v in in_degree.items() if v == 0) layers: list[list[str]] = [] visited = 0 while queue: layer = list(queue) layers.append(layer) visited += len(layer) queue.clear() for node_id in layer: for neighbour in adjacency[node_id]: in_degree[neighbour] -= 1 if in_degree[neighbour] == 0: queue.append(neighbour) if visited != len(nodes): raise BlueprintError("DAG contains a cycle — topological sort failed.") # Map node IDs to component names node_to_component = {n.id: n.component for n in nodes} # Execute each layer concurrently context = base_context if base_context is not None else FlowContext() if initial_data: for k, v in initial_data.items(): context.set(k, v) step_index = 0 for layer in layers: pending = [node_id for node_id in layer if node_id not in skip] if not pending: continue layer_skills = [instances[node_to_component[node_id]] for node_id in pending] started_iso, started_perf = utcnow_iso(), time.time() try: results = await asyncio.gather( *[_run_skill_async(skill, context) for skill in layer_skills] ) except Exception as exc: if tracker is not None: for node_id in pending: tracker.record_step( step_index=step_index, component=node_id, skill=instances[node_to_component[node_id]], status=StepStatus.FAILED, started_iso=started_iso, started_perf=started_perf, context=context, error=str(exc), ) step_index += 1 raise # Merge results into context (last write wins per key) for result_ctx in results: for key, value in result_ctx.data.items(): context.set(key, value) for node_id in pending: if node_id not in context.metadata.completed_nodes: context.metadata.completed_nodes.append(node_id) if tracker is not None: tracker.record_step( step_index=step_index, component=node_id, skill=instances[node_to_component[node_id]], status=StepStatus.COMPLETED, started_iso=started_iso, started_perf=started_perf, context=context, ) step_index += 1 if context.metadata.suspended: break return context # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def _graph_has_cycle(blueprint: Blueprint) -> bool: """True if the blueprint's graph contains a cycle (Kahn's algorithm).""" nodes = blueprint.flow.nodes or [] edges = blueprint.flow.edges or [] in_degree: dict[str, int] = {n.id: 0 for n in nodes} adjacency: dict[str, list[str]] = defaultdict(list) for edge in edges: adjacency[edge.source].append(edge.target) if edge.target in in_degree: in_degree[edge.target] += 1 queue: deque[str] = deque(k for k, v in in_degree.items() if v == 0) visited = 0 while queue: nid = queue.popleft() visited += 1 for nbr in adjacency[nid]: in_degree[nbr] -= 1 if in_degree[nbr] == 0: queue.append(nbr) return visited != len(nodes) def _graph_needs_executor(blueprint: Blueprint) -> bool: """True if a graph flow needs flowengine's GraphExecutor. Triggers when any edge declares a ``port`` or ``condition`` (conditional routing), or the graph is cyclic — features the concurrent layer executor does not implement. """ edges = blueprint.flow.edges or [] if any(e.port or e.condition for e in edges): return True return _graph_has_cycle(blueprint) def _execute_graph_via_engine( blueprint: Blueprint, instances: dict[str, Skill], merged_configs: dict[str, dict[str, Any]], initial_data: dict[str, Any] | None, *, has_async: bool, tracker: _Tracker | None, base_context: FlowContext | None, ) -> FlowContext: """Run a graph flow through flowengine's GraphExecutor (ports/conditions/cycles). Uses the async executor when any skill is async (awaits coroutine ``process``), else the sync executor. Records per-step history via the tracker (timings + completed_nodes are populated by GraphExecutor). """ from flowengine import GraphExecutor from flowengine.eval.evaluator import ConditionEvaluator flow_config = _build_flow_config(blueprint, merged_configs) executor = GraphExecutor( nodes=flow_config.flow.nodes or [], edges=flow_config.flow.edges or [], components=instances, # type: ignore[arg-type] # Skill is a BaseComponent settings=flow_config.flow.settings, evaluator=ConditionEvaluator(), ) context = base_context if base_context is not None else FlowContext() if initial_data: for key, value in initial_data.items(): context.set(key, value) try: if has_async: context = asyncio.run(executor.execute_async(context)) else: context = executor.execute(context) except Exception: if tracker is not None: _record_sync_steps(tracker, context, instances, failed=True) raise if tracker is not None: _record_sync_steps(tracker, context, instances) return context
[docs] def execute_blueprint( blueprint: Blueprint, registry: SkillRegistry, neurocore_config: NeuroCoreConfig, *, initial_data: dict[str, Any] | None = None, _tracker: _Tracker | None = None, _base_context: FlowContext | None = None, _skip: set[str] | None = None, ) -> FlowContext: """Execute a blueprint. Supports both sync and async skills. This is the core execution function. It: 1. Validates skill references 2. Creates and initializes skill instances with merged config 3. Detects async skills and chooses execution path 4. For sync-only blueprints, delegates to FlowEngine 5. For async blueprints, uses asyncio event loop Args: blueprint: Parsed Blueprint. registry: SkillRegistry with discovered skills. neurocore_config: Project configuration. initial_data: Optional initial context data (key-value pairs). Returns: FlowContext with execution results. Raises: BlueprintError: If validation or instantiation fails. ExecutionError: If execution fails. The leading-underscore params are used internally by :func:`execute_blueprint_tracked` and :func:`resume_blueprint`. """ # Validate all skill references errors = validate_blueprint(blueprint, registry) if errors: raise BlueprintError( "Blueprint validation failed:\n" + "\n".join(f" - {e}" for e in errors) ) # Create and initialize skill instances skill_instances, merged_configs = _create_skill_instances( blueprint, registry, neurocore_config ) has_async = any(is_async_skill(s) for s in skill_instances.values()) # Hybrid graph routing: flows using edge ports/conditions or cycles go # through flowengine's GraphExecutor (which honors them); plain DAGs keep # the concurrent layer executor below. if ( blueprint.flow.type == "graph" and blueprint.flow.edges and _graph_needs_executor(blueprint) ): try: return _execute_graph_via_engine( blueprint, skill_instances, merged_configs, initial_data, has_async=has_async, tracker=_tracker, base_context=_base_context, ) except Exception as e: if isinstance(e, (BlueprintError, ExecutionError)): raise raise ExecutionError(f"Blueprint execution failed: {e}") from e if has_async or blueprint.flow.type == "graph": # Async path: handles async skills and DAG concurrent execution try: if blueprint.flow.type == "graph" and blueprint.flow.edges: return asyncio.run( _execute_dag_concurrent( blueprint, skill_instances, merged_configs, initial_data, tracker=_tracker, base_context=_base_context, skip=_skip, ) ) return asyncio.run( _execute_blueprint_async( blueprint, skill_instances, merged_configs, initial_data, tracker=_tracker, base_context=_base_context, skip=_skip, ) ) except Exception as e: if isinstance(e, (BlueprintError, ExecutionError)): raise raise ExecutionError(f"Blueprint execution failed: {e}") from e # Sync path: use FlowEngine for backward compatibility flow_config = _build_flow_config(blueprint, merged_configs) checkpoint_store = ( checkpoint_store_for(_tracker.store) if _tracker is not None else None ) try: engine = FlowEngine( flow_config, skill_instances, # type: ignore[arg-type] # Skill is a BaseComponent validate_types=False, checkpoint_store=checkpoint_store, ) except Exception as e: raise ExecutionError(f"Failed to create FlowEngine: {e}") from e context = _base_context if _base_context is not None else FlowContext() if initial_data: for key, value in initial_data.items(): context.set(key, value) try: result = engine.execute(context) except Exception as e: # FlowEngine mutates `context` in place, so completed steps are recorded # in its timings even on failure. Record those plus the failing step. if _tracker is not None: _record_sync_steps(_tracker, context, skill_instances, failed=True) raise ExecutionError(f"Blueprint execution failed: {e}") from e if _tracker is not None: _record_sync_steps(_tracker, result, skill_instances) # FlowEngine doesn't populate completed_nodes for sequential flows; derive # it from the step timings so resume/inspection behave consistently. for st in result.metadata.step_timings: if st.component not in result.metadata.completed_nodes: result.metadata.completed_nodes.append(st.component) return result
def _record_sync_steps( tracker: _Tracker, context: FlowContext, skill_instances: dict[str, Skill], *, failed: bool = False, ) -> None: """Persist StepRecords for a FlowEngine (sync) execution from its timings. FlowEngine records a timing for the failing step too, so the set of failed components is read from ``metadata.errors`` rather than inferred. """ failed_components = { e.get("component") for e in context.metadata.errors } if failed else set() for st in context.metadata.step_timings: skill = skill_instances.get(st.component) status = ( StepStatus.FAILED if st.component in failed_components else StepStatus.COMPLETED ) tracker.store.save_step( StepRecord( run_id=tracker.run_id, step_index=st.step_index, component=st.component, skill_type=skill.skill_meta.name if skill else None, status=status, started_at=st.started_at.isoformat(), duration_ms=st.duration * 1000, output_keys=list(skill.skill_meta.provides or []) if skill else [], ) ) def _finalize_run( store: RunStore, record: RunRecord, context: FlowContext, started_perf: float, ) -> None: """Persist the terminal state of a run from its final context.""" record.final_context = context.to_dict() record.duration_ms = (time.time() - started_perf) * 1000 record.updated_at = utcnow_iso() if context.metadata.suspended: record.status = RunStatus.SUSPENDED record.suspended_at_node = context.metadata.suspended_at_node record.suspension_reason = context.metadata.suspension_reason else: record.status = RunStatus.COMPLETED cid = context.get("checkpoint_id") if cid: record.checkpoint_id = cid store.save_run(record)
[docs] def execute_blueprint_tracked( blueprint: Blueprint, registry: SkillRegistry, neurocore_config: NeuroCoreConfig, *, initial_data: dict[str, Any] | None = None, run_store: RunStore | None = None, run_id: str | None = None, blueprint_path: Path | None = None, ) -> FlowContext: """Execute a blueprint and persist a durable run record + step history. Falls back to :func:`execute_blueprint` when persistence is disabled. Returns: The final FlowContext (may have ``metadata.suspended`` set if the run paused at a human-approval gate — resume with :func:`resume_blueprint`). """ store = run_store if run_store is not None else build_run_store(neurocore_config) if store is None: return execute_blueprint( blueprint, registry, neurocore_config, initial_data=initial_data ) rid = run_id or uuid4().hex tracker = _Tracker( store, rid, neurocore_config.persistence.persist_step_snapshots ) record = RunRecord( run_id=rid, blueprint_name=blueprint.name, blueprint_version=blueprint.version, blueprint_path=str(blueprint_path) if blueprint_path else None, blueprint_snapshot=blueprint.model_dump(), flow_type=blueprint.flow.type, status=RunStatus.RUNNING, initial_data=initial_data or {}, ) store.save_run(record) started_perf = time.time() try: context = execute_blueprint( blueprint, registry, neurocore_config, initial_data=initial_data, _tracker=tracker, ) except Exception as exc: record.status = RunStatus.FAILED record.error = str(exc) record.duration_ms = (time.time() - started_perf) * 1000 record.updated_at = utcnow_iso() store.save_run(record) raise _finalize_run(store, record, context, started_perf) return context
[docs] def resume_blueprint( run_id: str, registry: SkillRegistry, neurocore_config: NeuroCoreConfig, *, resume_data: dict[str, Any] | None = None, run_store: RunStore | None = None, ) -> FlowContext: """Resume a suspended or failed run, optionally injecting ``resume_data``. Suspended runs (e.g. paused at an approval gate) continue from where they stopped; failed runs re-run from the failed step (completed steps are skipped). Works for both the sync (flowengine checkpoint) and async/DAG (restored FlowContext) execution paths. """ store = run_store if run_store is not None else build_run_store(neurocore_config) if store is None: raise ExecutionError("Persistence is disabled; cannot resume runs.") run = store.load_run(run_id) if run is None: raise ExecutionError(f"Run not found: {run_id}") if run.status not in (RunStatus.SUSPENDED, RunStatus.FAILED): raise ExecutionError( f"Run {run_id} has status {run.status}; only suspended or failed " f"runs can be resumed." ) blueprint = Blueprint(**run.blueprint_snapshot) tracker = _Tracker( store, run_id, neurocore_config.persistence.persist_step_snapshots ) started_perf = time.time() # Sync path: flowengine owns the checkpoint. if run.checkpoint_id: skill_instances, merged_configs = _create_skill_instances( blueprint, registry, neurocore_config ) flow_config = _build_flow_config(blueprint, merged_configs) engine = FlowEngine( flow_config, skill_instances, # type: ignore[arg-type] # Skill is a BaseComponent validate_types=False, checkpoint_store=checkpoint_store_for(store), ) try: context = engine.resume(run.checkpoint_id, resume_data) except Exception as exc: run.status = RunStatus.FAILED run.error = str(exc) run.updated_at = utcnow_iso() store.save_run(run) raise ExecutionError(f"Resume failed: {exc}") from exc for st in context.metadata.step_timings: skill = skill_instances.get(st.component) store.save_step( StepRecord( run_id=run_id, step_index=st.step_index, component=st.component, skill_type=skill.skill_meta.name if skill else None, status=StepStatus.COMPLETED, started_at=st.started_at.isoformat(), duration_ms=st.duration * 1000, output_keys=list(skill.skill_meta.provides or []) if skill else [], ) ) run.checkpoint_id = None _finalize_run(store, run, context, started_perf) return context # Async/DAG path: the stored final_context IS the checkpoint. context = FlowContext.from_dict(run.final_context or {}) context.metadata.suspended = False context.metadata.suspended_at_node = None context.metadata.suspension_reason = None if resume_data is not None: context.set("resume_data", resume_data) skip = set(context.metadata.completed_nodes) try: context = execute_blueprint( blueprint, registry, neurocore_config, _tracker=tracker, _base_context=context, _skip=skip, ) except Exception as exc: run.status = RunStatus.FAILED run.error = str(exc) run.updated_at = utcnow_iso() store.save_run(run) raise run.error = None _finalize_run(store, run, context, started_perf) return context
[docs] async def execute_blueprint_stream( blueprint: Blueprint, registry: SkillRegistry, config: NeuroCoreConfig, initial_data: dict[str, Any] | None = None, ) -> AsyncIterator[Any]: """Execute a blueprint, yielding FlowEvents as each step runs. Usage: async for event in execute_blueprint_stream(bp, registry, cfg): print(event.event_type, event.step_name) """ from neurocore.runtime.events import FlowEvent, FlowEventType instances, merged_configs = _create_skill_instances(blueprint, registry, config) steps = _get_sequential_steps(blueprint) flow_start = time.time() yield FlowEvent(FlowEventType.FLOW_STARTED, step_name="", data={"blueprint": blueprint.name}) context = FlowContext() if initial_data: for k, v in initial_data.items(): context.set(k, v) for step in steps: skill = instances[step.component] step_start = time.time() yield FlowEvent(FlowEventType.STEP_STARTED, step_name=step.component) try: context = await _run_skill_async(skill, context) duration = (time.time() - step_start) * 1000 yield FlowEvent( FlowEventType.STEP_COMPLETED, step_name=step.component, duration_ms=duration, data={k: v for k, v in context.data.items() if k in (skill.skill_meta.provides or [])}, ) except Exception as exc: duration = (time.time() - step_start) * 1000 yield FlowEvent( FlowEventType.STEP_FAILED, step_name=step.component, duration_ms=duration, error=str(exc), ) yield FlowEvent( FlowEventType.FLOW_FAILED, step_name="", duration_ms=(time.time() - flow_start) * 1000, error=str(exc), ) raise yield FlowEvent( FlowEventType.FLOW_COMPLETED, step_name="", duration_ms=(time.time() - flow_start) * 1000, )
[docs] def load_and_run( blueprint_path: Path, *, project_root: Path | None = None, initial_data: dict[str, Any] | None = None, track: bool = True, ) -> FlowContext: """High-level function: load config, discover skills, execute blueprint. Convenience wrapper that does everything: 1. Load neurocore.yaml config 2. Discover all skills (directory + entry points) 3. Load and parse the blueprint 4. Execute via FlowEngine, persisting run history (when ``track`` and persistence are enabled) Args: blueprint_path: Path to the blueprint YAML file. project_root: Optional project root (auto-detected if not provided). initial_data: Optional initial context data. track: Persist a run record + step history (default True). Returns: FlowContext with execution results. """ # Load config neurocore_config = load_config(project_root=project_root) # Discover skills registry = discover_skills(neurocore_config) # Load blueprint blueprint = load_blueprint(blueprint_path) # Execute if track: return execute_blueprint_tracked( blueprint, registry, neurocore_config, initial_data=initial_data, blueprint_path=blueprint_path, ) return execute_blueprint( blueprint, registry, neurocore_config, initial_data=initial_data, )