Skip to content

节点系统

节点系统是 CortexForge 认知引擎的 kernel 子系统。节点是文件驱动认知管道中的工作单元,FileEventBus 负责事件分发,Circuit 管理整个调度生命周期。

一个完整的认知周期:

FileEvent → FileEventBus.dispatch_forever() 从队列取出
  → 遍历 nodes → on_event() 匹配?
      → 是 → state = READY → node.run() 被唤醒
              → execute() → [FileUpdate, ...]
                  → FileEventBus.apply_update() 落盘 → publish 新 FileEvent
      → 否 → 继续等待下一事件

类的层次

Node (ABC)             # 抽象基类
├── Agent(Node)        # LLM 驱动型
└── Router(Node)       # 纯逻辑型

Node 基类

python
class Node:
    id: str                          # 唯一标识
    guards: List[FilePattern]        # 监听的 glob 模式列表
    produces: List[FileDescriptor]   # 产出的文件描述符列表

    def on_event(event) -> bool      # 事件匹配判断
    async def execute() -> list[FileUpdate]  # 执行逻辑
    async def run()                  # 主循环

💡 小贴士

Node.run() 在被 Circuitasyncio.Task 托管后循环等待 _ready_event,被总线置位后调用 execute()

Agent

python
class Agent(Node):
    def __init__(self, node_id, host=None, *, system_prompt="", memory=None):
        ...

持有 _host(ApplicationHost 引用)、_system_prompt(注入 LLM 的消息)、_memory(UnifiedMemoryManager 引用)。

💡 小贴士

Agent 通过 self.think(messages) 方法调用 LLM(Agent 基类方法),底层走 ModelGatewaysrc/brain/ai/gateway.py)——项目内所有 LLM 调用的统一入口。支持 gateway.fast / gateway.quality / gateway.multimodal 多角色。

Router

python
class Router(Node):
    ...

只是标记了 type = "router"。Router 的实现类不调用 LLM,执行纯机械逻辑。

文件相关数据结构

FileDescriptor

python
@dataclass(slots=True)
class FileDescriptor:
    path: str                    # 文件路径
    schema: str = "json"         # 格式(json / text)
    lock: str = "write_overwrite"# 锁策略

FilePattern

python
@dataclass(slots=True)
class FilePattern:
    pattern: str    # glob 模式,如 "inbox/event_*.json"

    def match(self, file_path: str) -> bool

FileEvent

python
@dataclass(slots=True)
class FileEvent:
    path: str                # 变更文件路径
    change_type: str         # 变更类型
    timestamp: str           # 时间戳
    version: int = 0
    metadata: dict = {}      # 含 source_node 等

FileUpdate

python
@dataclass(slots=True)
class FileUpdate:
    descriptor: FileDescriptor
    content: Any
    mode: str = "overwrite"   # overwrite / append

节点状态机

状态定义在 NodeState 枚举中: IDLEREADYRUNNINGWAITINGERRORTERMINATEDWAITING 状态已在 EventBridge 和节点基类中使用 (用于慢路径主动等待)。

FileEventBus — 事件总线

src/brain/kernel/event_bus.py

事件分发中枢,连接文件系统和节点。核心机制:

  1. dispatch_forever() 协程从 asyncio.QueueFileEvent
  2. 遍历所有节点调用 on_event(),匹配的节点标记为 READY_ready_event.set()
  3. 节点被唤醒后执行 execute(),产出 FileUpdate
  4. apply_update()asyncio.Lock 写入文件,写入后自动 publish 新的 FileEvent
python
# 写入带锁,保证同一文件并发安全
async def apply_update(self, update, node_id):
    lock = self._get_lock(descriptor.path)
    async with lock:
        self._write_file(...)
    self.publish(FileEvent(path=descriptor.path, change_type="write"))

文件格式支持 "json" 和普通文本,json 模式下可选 "append" 模式 (向数组追加元素)。

Circuit — 电路管理器

src/brain/kernel/circuit.py

职责单一: 创建 FileEventBus,注入所有节点,管理 dispatch_forever 和各 node.run() 协程的生命周期。

启动流程:

  1. FileEventBus(nodes) 创建事件总线和 asyncio.Queue
  2. dispatch_forever() 协程启动,持续从队列取事件
  3. 每个节点创建 node.run() 协程,等待 _ready_event 被置位
  4. _bootstrap_heartbeat() 注入初始脉冲,激活第一个节点

关闭: circuit.stop() 将所有节点标记 TERMINATED,取消各 node.run() 协程和 dispatch_forever

💡 小贴士

Circuit 与 EventBridge 的完整协作流程见 内核运行时

已实现的节点列表

Agent 节点 (LLM 驱动)

src/brain/nodes/agents/

文件注册名状态职责
internalizer.pyinternalizer✅ 启用B→A 转义者:事件 → 第一人称体验
externalizer.pyexternalizer✅ 启用A→B 转义者:自我决定 → JSON 动作
memory_consolidator.pymemory_consolidator禁用记忆沉淀与 now.md 归档
action_planner.pyaction_planner禁用(旧 Kernel-β) LLM 动作规划
impulse_gate.pyimpulse_gate禁用(旧 Kernel-β) 门控判断
polaris_agent.pypolaris禁用(旧 Kernel-α) 单体 Agent

Router 节点 (纯机械)

src/brain/nodes/routers/

文件注册名状态职责
message_preprocessor.pymessage_preprocessor✅ 启用事件收束 & 消息防抖
command_dispatcher.pycommand_dispatcher✅ 启用JSON 解析 → 命令派发
heartbeat_generator.pyheartbeat_generator✅ 启用周期性心跳 (60s)
timer_scheduler.pytimer_scheduler✅ 启用cron 规则 → 节律触发器
metrics_collector.pymetrics_collector禁用文件流转统计
switch_router.pyswitch_router禁用条件分支
merge_router.pymerge_router禁用归并多个文件
broadcast_router.pybroadcast_router禁用广播到多个下游
dead_letter_router.pydead_letter_router禁用超期文件回收

非节点组件

文件职责
src/brain/nodes/self_stream.pySelfStreamPool A 数据访问层 (now.md / state.md / memories/)
src/brain/nodes/event_bridge.py(函数)AppEvent → 文件事件 桥接

设计约束

  • Agent 不直接调用 LLM API — 走 ModelGateway 统一入口 (src/brain/ai/gateway.py)
  • 节点间不直接通信,仅通过文件 (FileEventBus) 传递数据
  • 节点无内部状态 (LLM 上下文除外),execute() 后可回收
  • 文件生命周期: pending/done/archived/
  • 同一文件并发写入由 FileEventBus.apply_update()asyncio.Lock 串行化
  • Agent 通过 self.think(messages) 调用 LLM(Agent 基类方法),底层走 gateway.qualitygateway.fast

下一步阅读

Built with VitePress