AI 协作平台的架构抉择:多 Agent 协同、上下文管理与工程落地

AI 协作平台的架构抉择:多 Agent 协同、上下文管理与工程落地
AI 协作平台的架构抉择多 Agent 协同、上下文管理与工程落地一、协作效率的悖论为什么AI 团队不等于高效协作团队引入 AI 工具后协作效率不升反降这是一个被广泛忽视的现象。原因在于当前大多数 AI 协作方案是单点增强而非系统增强——每个成员各自使用 AI 助手但 AI 之间不通信上下文不共享决策不协同。具体表现产品经理用 AI 写需求文档设计师用 AI 生成设计稿开发用 AI 写代码——三个环节的 AI 各自为战产品经理的 AI 不知道设计师的 AI 改了方案开发的 AI 不知道需求已经更新。结果是人要花更多时间在 AI 之间做翻译和对齐。真正的 AI 协作平台需要解决三个核心问题第一多 Agent 之间的上下文共享与同步第二人类在 AI 协作链路中的决策节点设计第三协作过程中的知识沉淀与复用。这三个问题不解决AI 协作平台就只是多个独立 AI 工具的拼盘。二、多 Agent 协作的架构模式graph TB A[用户意图输入] -- B[Orchestrator 编排器] B -- C[需求分析 Agent] B -- D[设计评审 Agent] B -- E[代码审查 Agent] B -- F[测试生成 Agent] C -- G[共享上下文层] D -- G E -- G F -- G G -- H[知识库] G -- I[决策日志] B -- J{人工决策节点} J --|批准| K[执行] J --|修改| B J --|拒绝| L[终止] style B fill:#e1f5fe style G fill:#e8f5e9 style J fill:#fff3e0 style H fill:#f3e5f52.1 编排器模式Orchestrator编排器是多 Agent 协作的核心组件负责将用户意图拆解为子任务、分配给合适的 Agent、协调 Agent 之间的依赖关系。编排器本身不做具体工作只做调度和决策。from dataclasses import dataclass, field from typing import Optional, Callable from enum import Enum import uuid class AgentRole(Enum): ANALYZER analyzer # 需求分析 DESIGNER designer # 设计评审 REVIEWER reviewer # 代码审查 TESTER tester # 测试生成 WRITER writer # 文档撰写 class TaskStatus(Enum): PENDING pending RUNNING running WAITING_HUMAN waiting_human COMPLETED completed FAILED failed dataclass class AgentTask: Agent 任务单元 task_id: str field(default_factorylambda: uuid.uuid4().hex[:8]) role: AgentRole None description: str input_context: dict field(default_factorydict) output: Optional[dict] None status: TaskStatus TaskStatus.PENDING requires_human_approval: bool False depends_on: list[str] field(default_factorylist) # 依赖的任务 ID dataclass class SharedContext: 共享上下文层所有 Agent 可读写的公共状态 project_id: str requirements: dict field(default_factorydict) design_decisions: dict field(default_factorydict) code_changes: list[dict] field(default_factorylist) review_comments: list[dict] field(default_factorylist) decision_log: list[dict] field(default_factorylist) def update(self, source_agent: str, key: str, value: dict) - None: 更新上下文并记录变更日志 setattr(self, key, value) self.decision_log.append({ agent: source_agent, key: key, timestamp: __import__(time).time(), summary: f{source_agent} 更新了 {key}, }) class Orchestrator: 多 Agent 编排器负责任务拆解、调度和人工决策节点管理 def __init__(self, context: SharedContext): self.context context self.tasks: list[AgentTask] [] self.agent_registry: dict[AgentRole, Callable] {} def register_agent(self, role: AgentRole, handler: Callable) - None: 注册 Agent 处理函数 self.agent_registry[role] handler def plan(self, user_intent: str) - list[AgentTask]: 将用户意图拆解为任务链 # 根据意图类型生成任务序列 task_chain [ AgentTask( roleAgentRole.ANALYZER, descriptionf分析用户意图: {user_intent}, requires_human_approvalTrue, # 需求分析需人工确认 ), AgentTask( roleAgentRole.DESIGNER, description基于需求生成技术方案, depends_on[self.tasks[0].task_id if self.tasks else ], requires_human_approvalTrue, # 方案需人工审批 ), AgentTask( roleAgentRole.REVIEWER, description审查代码变更, depends_on[self.tasks[1].task_id if len(self.tasks) 1 else ], ), AgentTask( roleAgentRole.TESTER, description生成测试用例, depends_on[self.tasks[2].task_id if len(self.tasks) 2 else ], ), ] self.tasks task_chain return task_chain def execute_next(self) - Optional[dict]: 执行下一个可运行的任务 for task in self.tasks: if task.status ! TaskStatus.PENDING: continue # 检查依赖是否完成 deps_met all( any(t.task_id dep_id and t.status TaskStatus.COMPLETED for t in self.tasks) for dep_id in task.depends_on if dep_id # 跳过空依赖 ) if not deps_met: continue # 执行 Agent handler self.agent_registry.get(task.role) if not handler: task.status TaskStatus.FAILED continue task.status TaskStatus.RUNNING result handler(task.input_context, self.context) if task.requires_human_approval: task.status TaskStatus.WAITING_HUMAN return { action: human_approval_needed, task_id: task.task_id, result: result, } task.output result task.status TaskStatus.COMPLETED return {action: all_completed}2.2 共享上下文层的设计共享上下文是多 Agent 协作的关键基础设施。它解决了Agent 之间如何知道彼此做了什么的问题。设计原则是所有 Agent 只通过共享上下文通信不直接调用其他 Agent。import json from datetime import datetime class ContextStore: 持久化上下文存储支持版本控制和变更追踪 def __init__(self, project_id: str): self.project_id project_id self._versions: list[dict] [] self._current: dict { requirements: {}, design: {}, code: {}, reviews: [], decisions: [], } def write(self, agent_id: str, section: str, data: dict) - None: Agent 写入上下文 # 记录变更前的快照 snapshot json.dumps(self._current, ensure_asciiFalse) self._current[section] data self._current[decisions].append({ agent: agent_id, section: section, timestamp: datetime.utcnow().isoformat(), action: update, }) # 保存版本快照支持回溯 self._versions.append({ version: len(self._versions) 1, snapshot: snapshot, changed_by: agent_id, timestamp: datetime.utcnow().isoformat(), }) def read(self, section: str) - dict: Agent 读取上下文 return self._current.get(section, {}) def get_changes_since(self, version: int) - list[dict]: 获取指定版本以来的所有变更 return [v for v in self._versions if v[version] version] def rollback(self, version: int) - None: 回滚到指定版本 if 0 version len(self._versions): self._current json.loads( self._versions[version - 1][snapshot] )2.3 人工决策节点AI 协作平台不能是完全自动化的。在关键决策点需求确认、方案审批、上线决策必须设置人工审批节点。这不仅是安全要求更是效率要求——让 AI 在错误方向上高速运转比没有 AI 更危险。dataclass class HumanApprovalGate: 人工审批门在关键决策点暂停自动化流程 task_id: str agent_role: AgentRole decision_summary: str # AI 生成的决策摘要 risk_level: str # LOW / MEDIUM / HIGH auto_approve_threshold: str LOW # 低于此风险等级可自动通过 def should_auto_approve(self) - bool: 判断是否可以自动通过 risk_order {LOW: 0, MEDIUM: 1, HIGH: 2} return risk_order.get(self.risk_level, 2) \ risk_order.get(self.auto_approve_threshold, 0) def format_approval_request(self) - str: 格式化审批请求供人类决策者审阅 return ( f[审批请求] 任务 {self.task_id}\n f来源: {self.agent_role.value} Agent\n f风险等级: {self.risk_level}\n f决策摘要: {self.decision_summary}\n f操作: 批准(y) / 修改后批准(m) / 拒绝(n) )三、AI 协作平台的技术实现3.1 基于 Event-Driven 的异步协作架构import asyncio from collections import defaultdict class EventBus: 事件总线Agent 之间的异步通信通道 def __init__(self): self._subscribers: dict[str, list[asyncio.Queue]] defaultdict(list) async def publish(self, event_type: str, payload: dict) - None: 发布事件 for queue in self._subscribers[event_type]: await queue.put(payload) def subscribe(self, event_type: str) - asyncio.Queue: 订阅事件返回异步队列 queue asyncio.Queue() self._subscribers[event_type].append(queue) return queue # 使用示例Agent 之间通过事件总线协作 bus EventBus() async def code_review_agent(): 代码审查 Agent监听代码提交事件 queue bus.subscribe(code_committed) while True: commit await queue.get() review_result await review_code(commit) if review_result[issues]: # 发现问题发布审查事件 await bus.publish(review_completed, { commit_id: commit[id], status: issues_found, issues: review_result[issues], }) else: await bus.publish(review_completed, { commit_id: commit[id], status: approved, }) async def test_agent(): 测试 Agent监听审查通过事件 queue bus.subscribe(review_completed) while True: review await queue.get() if review[status] approved: tests await generate_tests(review[commit_id]) await bus.publish(tests_generated, { commit_id: review[commit_id], test_count: len(tests), })3.2 知识沉淀与复用class KnowledgeBase: 知识库沉淀协作过程中的决策和经验 def __init__(self): self._entries: list[dict] [] def record_decision(self, context: str, decision: str, rationale: str, outcome: Optional[str] None) - None: 记录决策及其理由 self._entries.append({ type: decision, context: context, decision: decision, rationale: rationale, outcome: outcome, timestamp: datetime.utcnow().isoformat(), }) def record_pattern(self, problem: str, solution: str, applicability: str) - None: 记录可复用的解决模式 self._entries.append({ type: pattern, problem: problem, solution: solution, applicability: applicability, timestamp: datetime.utcnow().isoformat(), }) def search_relevant(self, query: str, top_k: int 5) - list[dict]: 检索相关知识简化版生产环境应使用向量检索 # 按关键词匹配排序 scored [] query_terms set(query.lower().split()) for entry in self._entries: text f{entry.get(context, )} {entry.get(problem, )} \ f{entry.get(solution, )} {entry.get(rationale, )} text_terms set(text.lower().split()) overlap len(query_terms text_terms) if overlap 0: scored.append((overlap, entry)) scored.sort(keylambda x: x[0], reverseTrue) return [entry for _, entry in scored[:top_k]]四、AI 协作平台的边界与风险上下文膨胀问题。随着协作流程推进共享上下文的数据量不断增长。如果每个 Agent 都读取完整上下文Token 消耗会指数级增长。解决方案是按需裁剪上下文每个 Agent 只读取与当前任务相关的上下文片段而非全量上下文。但裁剪策略本身是一个难题——裁剪过度会导致 Agent 丢失关键信息裁剪不足则浪费 Token。Agent 间的级联失败。一个 Agent 的错误输出会成为下一个 Agent 的输入导致错误在链路中放大。例如需求分析 Agent 理解错了用户意图设计 Agent 基于错误需求生成了方案代码审查 Agent 基于错误方案审查了代码——三层放大后最终产出与用户意图完全偏离。缓解方案是在每个 Agent 的输出中增加置信度评分低置信度的输出自动触发人工审批。知识库的噪声积累。知识库中的记录质量参差不齐过时的决策模式会误导后续 Agent。必须建立知识库的淘汰机制超过 6 个月未被引用的条目降权与当前项目上下文不匹配的条目标记为可能过时。人工审批疲劳。如果每个决策点都需要人工审批审批者会因决策过多而降低审批质量。建议将自动审批阈值设为 LOW只对 HIGH 风险决策强制人工审批MEDIUM 风险决策采用默认通过 事后审计模式。五、总结AI 协作平台的核心挑战不是单个 Agent 的能力而是多 Agent 之间的协同效率。编排器模式、共享上下文层和人工决策节点是解决协同问题的三大支柱。落地路线建议第一步明确协作流程中的 Agent 角色和任务依赖关系用编排器统一调度第二步建立共享上下文层所有 Agent 只通过上下文通信避免直接耦合第三步在需求确认、方案审批和上线决策三个关键节点设置人工审批门第四步实现事件驱动的异步架构Agent 之间通过事件总线解耦第五步建立知识库的沉淀和淘汰机制确保知识质量不随时间退化。AI 协作的目标不是替代人而是让人的决策更高效、更准确。任何试图完全自动化的协作系统最终都会因为缺乏人的判断力而失败。