节点系统
节点系统是 CortexForge 认知引擎的 kernel 子系统。节点是文件驱动认知管道中的工作单元,FileEventBus 负责事件分发,Circuit 管理整个调度生命周期。
一个完整的认知周期:
FileEvent → FileEventBus.dispatch_forever() 从队列取出
→ 遍历 nodes → on_event() 匹配?
→ 是 → state = READY → node.run() 被唤醒
→ execute() → [FileUpdate, ...]
→ FileEventBus.apply_update() 落盘 → publish 新 FileEvent
→ 否 → 继续等待下一事件类的层次
Node (ABC) # 抽象基类
├── Agent(Node) # LLM 驱动型
└── Router(Node) # 纯逻辑型Node 基类
class Node:
id: str # 唯一标识
guards: List[FilePattern] # 监听的 glob 模式列表
produces: List[FileDescriptor] # 产出的文件描述符列表
def on_event(event) -> bool # 事件匹配判断
async def execute() -> list[FileUpdate] # 执行逻辑
async def run() # 主循环💡 小贴士
Node.run() 在被 Circuit 以 asyncio.Task 托管后循环等待 _ready_event,被总线置位后调用 execute()。
Agent
class Agent(Node):
def __init__(self, node_id, host=None, *, system_prompt="", memory=None):
...持有 _host(ApplicationHost 引用)、_system_prompt(注入 LLM 的消息)、_memory(UnifiedMemoryManager 引用)。
💡 小贴士
Agent 通过 self.think(messages) 方法调用 LLM(Agent 基类方法),底层走 ModelGateway(src/brain/ai/gateway.py)——项目内所有 LLM 调用的统一入口。支持 gateway.fast / gateway.quality / gateway.multimodal 多角色。
Router
class Router(Node):
...只是标记了 type = "router"。Router 的实现类不调用 LLM,执行纯机械逻辑。
文件相关数据结构
FileDescriptor
@dataclass(slots=True)
class FileDescriptor:
path: str # 文件路径
schema: str = "json" # 格式(json / text)
lock: str = "write_overwrite"# 锁策略FilePattern
@dataclass(slots=True)
class FilePattern:
pattern: str # glob 模式,如 "inbox/event_*.json"
def match(self, file_path: str) -> boolFileEvent
@dataclass(slots=True)
class FileEvent:
path: str # 变更文件路径
change_type: str # 变更类型
timestamp: str # 时间戳
version: int = 0
metadata: dict = {} # 含 source_node 等FileUpdate
@dataclass(slots=True)
class FileUpdate:
descriptor: FileDescriptor
content: Any
mode: str = "overwrite" # overwrite / append节点状态机
状态定义在 NodeState 枚举中: IDLE、READY、RUNNING、WAITING、ERROR、TERMINATED。WAITING 状态已在 EventBridge 和节点基类中使用 (用于慢路径主动等待)。
FileEventBus — 事件总线
src/brain/kernel/event_bus.py
事件分发中枢,连接文件系统和节点。核心机制:
dispatch_forever()协程从asyncio.Queue取FileEvent- 遍历所有节点调用
on_event(),匹配的节点标记为READY并_ready_event.set() - 节点被唤醒后执行
execute(),产出FileUpdate apply_update()带asyncio.Lock写入文件,写入后自动publish新的FileEvent
# 写入带锁,保证同一文件并发安全
async def apply_update(self, update, node_id):
lock = self._get_lock(descriptor.path)
async with lock:
self._write_file(...)
self.publish(FileEvent(path=descriptor.path, change_type="write"))文件格式支持 "json" 和普通文本,json 模式下可选 "append" 模式 (向数组追加元素)。
Circuit — 电路管理器
src/brain/kernel/circuit.py
职责单一: 创建 FileEventBus,注入所有节点,管理 dispatch_forever 和各 node.run() 协程的生命周期。
启动流程:
FileEventBus(nodes)创建事件总线和asyncio.Queuedispatch_forever()协程启动,持续从队列取事件- 每个节点创建
node.run()协程,等待_ready_event被置位 _bootstrap_heartbeat()注入初始脉冲,激活第一个节点
关闭: circuit.stop() 将所有节点标记 TERMINATED,取消各 node.run() 协程和 dispatch_forever。
💡 小贴士
Circuit 与 EventBridge 的完整协作流程见 内核运行时。
已实现的节点列表
Agent 节点 (LLM 驱动)
src/brain/nodes/agents/:
| 文件 | 注册名 | 状态 | 职责 |
|---|---|---|---|
internalizer.py | internalizer | ✅ 启用 | B→A 转义者:事件 → 第一人称体验 |
externalizer.py | externalizer | ✅ 启用 | A→B 转义者:自我决定 → JSON 动作 |
memory_consolidator.py | memory_consolidator | 禁用 | 记忆沉淀与 now.md 归档 |
action_planner.py | action_planner | 禁用 | (旧 Kernel-β) LLM 动作规划 |
impulse_gate.py | impulse_gate | 禁用 | (旧 Kernel-β) 门控判断 |
polaris_agent.py | polaris | 禁用 | (旧 Kernel-α) 单体 Agent |
Router 节点 (纯机械)
src/brain/nodes/routers/:
| 文件 | 注册名 | 状态 | 职责 |
|---|---|---|---|
message_preprocessor.py | message_preprocessor | ✅ 启用 | 事件收束 & 消息防抖 |
command_dispatcher.py | command_dispatcher | ✅ 启用 | JSON 解析 → 命令派发 |
heartbeat_generator.py | heartbeat_generator | ✅ 启用 | 周期性心跳 (60s) |
timer_scheduler.py | timer_scheduler | ✅ 启用 | cron 规则 → 节律触发器 |
metrics_collector.py | metrics_collector | 禁用 | 文件流转统计 |
switch_router.py | switch_router | 禁用 | 条件分支 |
merge_router.py | merge_router | 禁用 | 归并多个文件 |
broadcast_router.py | broadcast_router | 禁用 | 广播到多个下游 |
dead_letter_router.py | dead_letter_router | 禁用 | 超期文件回收 |
非节点组件
| 文件 | 类 | 职责 |
|---|---|---|
src/brain/nodes/self_stream.py | SelfStream | Pool A 数据访问层 (now.md / state.md / memories/) |
src/brain/nodes/event_bridge.py | (函数) | AppEvent → 文件事件 桥接 |
设计约束
- Agent 不直接调用 LLM API — 走
ModelGateway统一入口 (src/brain/ai/gateway.py) - 节点间不直接通信,仅通过文件 (
FileEventBus) 传递数据 - 节点无内部状态 (LLM 上下文除外),
execute()后可回收 - 文件生命周期:
pending/→done/→archived/ - 同一文件并发写入由
FileEventBus.apply_update()的asyncio.Lock串行化 - Agent 通过
self.think(messages)调用 LLM(Agent基类方法),底层走gateway.quality或gateway.fast