一个循环统治一切
整个 AI 代理的核心,只是一个 30 行的 while 循环
当你按下回车键
你每天用 Claude Code 写代码,但你有没有想过——当你输入一句话后,背后到底发生了什么?
想象一台自动售货机:你按下按钮(输入指令),机器内部处理(AI 思考),然后吐出商品(执行结果)。如果需要多次操作,它会一直循环——直到你满意离开。
用户输入指令
LLM 思考决策
执行工具
返回结果
循环继续...
这个循环就是所有 AI 编程工具的核心。不管是 Claude Code、Cursor 还是 GitHub Copilot,底层都是同样的模式:LLM 决定做什么,代码负责执行。
30 行代码的魔法
下面就是驱动整个 AI 代理的核心代码。别被吓到——我们逐行解读。
def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM,
messages=messages, tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant",
"content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
定义一个叫 agent_loop 的函数,接收对话历史作为输入
无限循环开始——一直运行直到 AI 决定停下来
把对话发给 AI 模型,让它思考下一步该做什么
告诉 AI 用哪个模型、系统指令是什么
传入对话历史和可用的工具列表
最多生成 8000 个 token 的回复
把 AI 的回复加入对话历史,这样下一轮 AI 能记住自己说过什么
关键判断:AI 是否还要继续使用工具?
如果不需要了——循环结束,把文字回复返回给用户
如果还要用工具——准备一个空列表来收集工具结果
遍历 AI 回复中的每个内容块
如果这个块是「使用工具」的请求
执行工具(这里是运行一个终端命令)
把工具的执行结果打包
标记类型为「工具结果」
关联到 AI 请求的那个工具调用 ID
附上执行输出的内容
把工具结果加入对话历史,然后回到循环顶部,让 AI 继续思考
这就是所有 AI 代理的核心:模型决定做什么,代码只负责执行。循环本身永远不变——变的只是工具。理解这一点,你就理解了所有 AI 编程工具的工作原理。
工具分发表
想让 AI 多一个能力?不需要改循环代码——只需要在分发表里加一行。
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}
这是一张「工具名 → 执行函数」的对照表
bash:AI 说「运行这个命令」→ 在终端中执行
read_file:AI 说「读这个文件」→ 返回文件内容
write_file:AI 说「创建这个文件」→ 写入磁盘
edit_file:AI 说「修改这段代码」→ 精确替换文本
bash
在终端中执行任意命令——安装依赖、运行测试、查看进程
read_file
读取文件内容——AI 需要先「看到」代码才能理解和修改
write_file
创建新文件——从零开始写一个完整的文件
edit_file
精确修改——找到旧代码,替换为新代码,不动其他部分
看它运转起来
点击「下一步」,一步步看 Agent Loop 如何处理一个真实请求。
检验你的理解
你让 AI 帮你创建一个新文件,AI 会用哪个工具?
如果 AI 的 stop_reason 不是 tool_use,会发生什么?
想让 AI 支持一个新工具「搜索网页」,你需要改循环代码吗?
AI 如何自己做计划
待办清单 + 子代理 = AI 不再迷路
AI 的便利贴看板
AI 在执行复杂任务时经常「忘记」自己该干什么——做到第三步突然跑去做别的。解决方法?给它一个待办清单。
就像项目经理用便利贴看板一样——把任务写在便利贴上,从「待办」移到「进行中」再到「完成」。
class TodoManager:
def update(self, items: list) -> str:
in_progress = [i for i in items
if i.get("status") == "in_progress"]
if len(in_progress) > 1:
return "Error: only 1 task can be in_progress"
self.items = items
return self.render()
def render(self) -> str:
lines = []
for i, item in enumerate(self.items):
mark = {"pending": " ",
"in_progress": ">",
"completed": "x"}
lines.append(
f"[{mark[item['status']]}] #{i+1}: {item['subject']}")
done = sum(1 for i in self.items
if i["status"] == "completed")
lines.append(f"({done}/{len(self.items)} completed)")
return "\n".join(lines)
定义一个待办清单管理器
「更新」功能——AI 每次修改待办清单时调用
找出所有标记为「进行中」的任务
规则:同时只能有 1 个任务在进行中——防止 AI 同时干太多事
如果违反规则,返回错误提示
保存更新后的任务列表
渲染成可视化的待办清单返回给 AI
「渲染」功能——把任务列表变成好看的文字
遍历每个任务
[ ] = 待办,[>] = 进行中,[x] = 已完成
拼成类似:[>] #1: 重构登录模块
统计已完成的数量
最后显示进度:(2/5 completed)
把所有行拼在一起返回
提醒注入:防止 AI 忘事
AI 有时会沉浸在工作中,忘记更新待办清单。解决方法?每隔 3 轮自动「偷偷」提醒它。
rounds_since_todo = 0
if rounds_since_todo >= 3:
results.append({
"type": "text",
"text": "<reminder>Update your todos.</reminder>"
})
记录 AI 上次更新待办清单后过了几轮
如果已经过了 3 轮还没更新...
在工具返回结果中偷偷加一条提醒
类型是普通文本(不是工具结果)
内容是:「记得更新你的待办清单!」
这条提醒被混在工具结果里发送给 AI。AI 看到后会自觉地更新待办清单——它以为这是系统要求,其实是我们的小心机。这种「在工具返回中注入额外信息」的模式在harness 工程中非常常见。
子代理:干净的上下文隔离
当主 AI 把一个子任务交给子代理时,子代理用全新的对话历史工作,完成后只返回一个摘要。主代理的上下文完全不受影响。
def run_subagent(prompt: str) -> str:
sub_messages = [{"role": "user",
"content": prompt}]
for _ in range(30):
response = client.messages.create(
model=MODEL,
system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=CHILD_TOOLS,
max_tokens=8000,
)
sub_messages.append(
{"role": "assistant",
"content": response.content})
if response.stop_reason != "tool_use":
break
# ... execute tools ...
return "".join(b.text for b in response.content
if hasattr(b, "text"))
创建一个子代理来处理子任务
关键:全新的对话历史!只有用户交代的任务描述
最多循环 30 次(安全上限,防止无限循环)
调用 AI 模型——和主循环完全一样的模式
用同一个模型
但用子代理专用的系统指令
传入子代理自己的对话历史
子代理可用的工具集(通常是主代理的子集)
把 AI 回复加入子代理的对话历史
任务完成?跳出循环
继续执行工具...
只返回最终的文字摘要——所有中间过程都留在子代理里
检验你的理解
AI 正在执行一个 5 步任务,到第 3 步时突然去做了别的事。最可能的原因是?
为什么子代理要用一个全新的 messages 列表?
你让 AI 同时重构 3 个独立模块。最佳方案是?
知识加载与记忆管理
像图书馆员一样——只记目录,按需取书
AI 的图书馆
如果把所有知识都塞进 AI 的「大脑」,它会被信息淹没。好的图书馆员不会背下每本书,但她清楚每本书在哪——需要时直接去取。
每个技能只用 ~100 个 token 存放名称和描述,常驻在系统提示中
AI 需要某个技能时,通过 tool_result 注入完整文档,用完即走
按需加载的秘密
AI 说「我需要 PDF 技能」→ 系统从文件中读取完整指令 → 注入到对话中。
class SkillLoader:
def _load_all(self):
for f in sorted(self.skills_dir.rglob("SKILL.md")):
text = f.read_text()
meta, body = self._parse_frontmatter(text)
self.skills[meta["name"]] = {
"meta": meta, "body": body,
}
def get_content(self, name: str) -> str:
skill = self.skills.get(name)
if not skill:
return f"Skill '{name}' not found."
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"
定义一个技能加载器
「扫描所有技能」功能——启动时运行一次
在技能目录中找到所有 SKILL.md 文件
读取文件内容
分离「元数据」(名称、描述)和「正文」(详细指令)
按名称存入字典,方便后续查找
「获取技能」功能——AI 需要时才调用
根据名称查找技能
如果没找到,返回提示信息
返回完整的技能文档,用 XML 标签包裹,注入到对话中
name:
技能名称——AI 用这个名字来请求加载
description:
一行描述——让 AI 知道这个技能能做什么
tags:
搜索标签——帮助 AI 找到正确的技能
三层记忆压缩
AI 的上下文窗口是有限的——就像手机内存。压缩就是在不丢关键信息的前提下释放空间。
把旧的工具结果替换为占位符「[之前:使用了 bash]」,但保留 read_file 的结果(可能还要参考)
当 token 超过阈值(如 50000),先保存完整记录到文件,然后让 AI 总结整个对话
AI 可以随时调用 compact 工具,立即执行压缩——当它觉得上下文太臃肿时
def auto_compact(messages: list) -> list:
transcript_path = TRANSCRIPTS_DIR / \
f"transcript_{int(time.time())}.jsonl"
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg) + "\n")
summary_response = client.messages.create(
model=MODEL,
system="Summarize this conversation.",
messages=[{"role": "user",
"content": str(messages)}],
max_tokens=2000,
)
summary = summary_response.content[0].text
return [
{"role": "user",
"content": f"<summary>{summary}</summary>"},
{"role": "assistant",
"content": "Understood. Continuing."},
]
自动压缩函数——当对话太长时调用
先给完整对话记录创建一个备份文件
(用时间戳命名,保证唯一)
打开文件准备写入
把每条消息逐行写入
(这就是完整的「飞行记录仪」备份)
请另一个 AI 来总结这段对话
指令:「总结这段对话,保留关键决策和发现」
把整个对话历史作为输入
取出总结文本
用总结替换所有旧消息——从此 AI 只记得摘要
用户视角:一条总结消息
AI 视角:「明白了,继续工作」
检验你的理解
大约每 4 个字符 = 1 个 token。一段 200 字的中文大约消耗 150-200 个 token。AI 的上下文窗口通常有 10 万到 100 万 token——听起来很多,但长时间工作后很容易用满。
AI 需要用 PDF 处理技能,但系统提示里只有技能名称。完整内容在哪里加载?
AI 已经工作了 2 小时,对话超过 50000 tokens。接下来会自动发生什么?
为什么微压缩只替换旧的 bash 结果,不替换 read_file 的结果?
任务系统与后台运行
把任务写在「墙上」而不是「脑子里」——持久化 + 不阻塞
写在墙上的任务
上一模块讲到上下文压缩会清除对话历史——但任务不能丢。解决方法?把任务写在文件系统里,就像施工现场把蓝图钉在墙上。
class TaskManager:
def create(self, subject: str,
description: str = "") -> str:
task = {
"id": self._next_id,
"subject": subject,
"description": description,
"status": "pending",
"blockedBy": [],
"owner": "",
}
path = TASKS_DIR / f"task_{task['id']}.json"
path.write_text(json.dumps(task, indent=2))
self._next_id += 1
return f"Created task #{task['id']}"
定义一个任务管理器
「创建任务」功能
构建一个任务对象:
自动分配唯一 ID
任务标题(如「实现登录功能」)
任务描述(具体要做什么)
初始状态:待办
依赖列表——这个任务要等哪些任务先完成
负责人——哪个代理在做这个任务
写入文件系统——每个任务一个 JSON 文件
(即使对话被压缩,文件依然存在)
返回创建成功的提示
pending
待办——任务已创建但还没开始
in_progress
进行中——某个代理正在执行
completed
已完成——任务已经搞定
依赖图:谁先谁后
有些任务必须等其他任务完成才能开始。依赖图让 AI 自动管理执行顺序。
def _clear_dependency(self, completed_id: int):
for f in TASKS_DIR.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
f.write_text(json.dumps(task, indent=2))
当一个任务完成时,清除它的「阻塞关系」
遍历所有任务文件
读取任务内容
如果这个任务的「等待列表」里有刚完成的任务 ID
把它移出等待列表——不再阻塞
保存更新后的任务文件
后台运行:发射后不管
AI 不需要傻等一个耗时 5 分钟的编译——发起后台任务,立即继续处理其他事情。就像派一个工人去搅拌水泥,你继续搭框架。
class BackgroundManager:
def run(self, command: str) -> str:
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {
"status": "running",
"result": None,
"command": command,
}
thread = threading.Thread(
target=self._execute,
args=(task_id, command),
daemon=True,
)
thread.start()
return f"Background task {task_id} started"
后台任务管理器
「运行」功能——接收一个终端命令
生成一个唯一 ID(8 位随机字符)
记录这个后台任务的状态
状态:运行中
结果:还没有(等执行完才有)
记录要执行的命令
启动一个新线程来执行命令
指定执行函数
传入任务 ID 和命令
设为守护线程(主程序退出时自动结束)
立即启动,不等待
立即返回——AI 可以继续做别的事
后台任务完成后,结果会被推入一个通知队列。主循环在每次迭代前检查队列——如果有完成的后台任务,把结果注入到对话中。AI 无需主动轮询,结果会自动送达。
检验你的理解
AI 正在编译一个大项目(需要 5 分钟),同时用户问了一个简单问题。AI 应该怎么做?
任务为什么要保存在文件系统而不是内存中?
Task 3 的 blockedBy 列表是 [1, 2]。Task 1 完成了,Task 3 能开始吗?
AI 团队协作
当一个 AI 不够用——组建团队,异步通信,自主领取任务
一支 AI 电影剧组
想象一个电影剧组:导演(主代理)协调所有人,摄影师、音效师、演员(队友代理)各有专长。每个人有自己的对讲机频道,异步沟通,互不阻塞。
分配任务、协调团队、审批计划、发起关机
专注 UI 开发、CSS 动画、用户交互
专注 API 接口、数据库、业务逻辑
专注编写测试、运行测试、报告 bug
JSONL 收件箱:异步通信
每个队友有一个 .jsonl 文件作为收件箱——写入只需追加一行,读取后立即清空。
class MessageBus:
def send(self, sender: str, to: str,
content: str,
msg_type: str = "message"):
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
inbox = self.inbox_dir / f"{to}.jsonl"
with open(inbox, "a") as f:
f.write(json.dumps(msg) + "\n")
def read_inbox(self, name: str) -> list:
inbox = self.inbox_dir / f"{name}.jsonl"
if not inbox.exists():
return []
text = inbox.read_text().strip()
msgs = [json.loads(l)
for l in text.splitlines()]
inbox.write_text("")
return msgs
消息总线——团队的通信系统
「发送消息」功能
指定发件人、收件人
消息内容和类型
构建一条消息:
消息类型(普通消息、关机请求等)
谁发的
说了什么
什么时候发的
找到收件人的收件箱文件
以追加模式打开——不会覆盖已有消息
写入一行 JSON(一条消息 = 一行)
「读取收件箱」功能
找到自己的收件箱文件
如果文件不存在,说明没有新消息
读取文件内容
逐行解析成消息列表
清空收件箱——读完就删,防止重复处理
返回所有新消息
协议握手:安全关机
为什么不能直接强制关闭队友?因为它可能正在写文件或执行关键操作。协议握手确保每个队友都能优雅地完成手头工作再退出。
每个关机请求都有唯一的 request_id。当多个关机请求同时进行时,request_id 确保每个响应都能准确匹配到对应的请求——这就是请求关联模式。
自主代理:自己找活干
最酷的部分——队友不需要导演时刻分配任务。它们会自动进入空闲模式,每 5 秒检查收件箱和任务板,自己认领未分配的任务。
unclaimed = scan_unclaimed_tasks()
if unclaimed:
task = unclaimed[0]
claim_task(task["id"], name)
messages.append({
"role": "user",
"content": f"<auto-claimed>Task #{task['id']}: {task['subject']}"
})
resume = True
扫描所有未认领的任务(状态为 pending、无 owner、无阻塞)
如果找到了未认领的任务...
取第一个
认领它——把 owner 设为自己的名字,状态改为 in_progress
把「自动认领了任务」这件事告诉 AI
注入到对话中:「你自动认领了 Task #7:实现用户认证」
标记为「需要恢复工作状态」
检验你的理解
为什么用 JSONL 文件作为收件箱,而不是内存中的列表?
导演发送 shutdown_request 给 Frontend,但 Frontend 回复 approve: false。会发生什么?
自主代理在空闲 60 秒后自动关机。为什么需要这个超时?
平行宇宙:隔离执行
给每个任务一个独立的代码副本——并行不冲突
两个 AI 同时改一个文件?
当多个 AI 代理并行工作时,如果它们修改同一个文件——必然冲突。解决方法?给每个任务一个独立的平行宇宙。
写入磁盘...
覆盖了 A 的修改!代码丢失!
每个 agent 在自己的目录副本中工作,完成后合并回主线
双平面架构
控制平面(任务)协调工作,执行平面(Worktree)隔离代码。两者通过任务 ID 绑定。
控制平面
.tasks/task_12.json — 记录任务状态、依赖、负责人。知道「要做什么」。
执行平面
.worktrees/feature-xyz/ — 独立的代码副本 + 独立的 Git 分支。知道「在哪做」。
绑定关系
task["worktree"] = "feature-xyz" — 任务和 worktree 通过 ID 关联。
class WorktreeManager:
def create(self, name: str,
task_id: int = None,
base_ref: str = "HEAD") -> str:
wt_path = self.base_dir / name
branch = f"wt/{name}"
subprocess.run(
["git", "worktree", "add",
"-b", branch,
str(wt_path), base_ref],
cwd=str(self.repo_root),
check=True,
)
entry = {
"name": name,
"path": str(wt_path),
"branch": branch,
"task_id": task_id,
"status": "active",
}
self._add_to_index(entry)
if task_id:
self._bind_task(task_id, name)
self.events.emit(
"worktree.create.after",
worktree=entry)
Worktree 管理器
「创建 worktree」功能
可以绑定到一个任务 ID
从哪个版本开始复制(默认是最新版)
确定 worktree 的目录路径
创建一个专用的 Git 分支(如 wt/auth-refactor)
执行 Git 命令创建 worktree
git worktree add
创建新分支
放在指定路径,基于指定版本
在仓库根目录执行
如果失败就报错
记录这个 worktree 的信息:
名称
目录路径
Git 分支名
绑定的任务 ID
状态:活跃
添加到索引文件中
如果指定了任务 ID
把任务和 worktree 绑定
发出事件——记录到审计日志
事件审计:系统的飞行记录仪
每一个操作都被记录在 events.jsonl 中——出了问题回放事件就能找到原因。
worktree.create.before
创建 worktree 之前——还没执行
worktree.create.after
创建 worktree 之后——成功了
worktree.create.failed
创建失败了——记录错误原因
worktree.remove.after
删除 worktree 之后——清理完毕
task.completed
任务完成——worktree 可以清理了
在多代理并行工作的系统中,出了问题很难定位「谁在什么时候做了什么」。事件日志就是系统的飞行记录仪——回放事件序列就能还原现场。这就是为什么所有生产级系统都需要审计日志。
全景图:12 个机制的完整拼图
点击任意组件查看它的作用。从一个 30 行的循环开始,我们一步步搭建出了一个多代理自主协作系统。
核心引擎
规划与隔离
知识与记忆
持久化与并发
多代理协作
循环从未改变。从 Session 1 到 Session 12,核心的 while 循环一个字都没变。每个 session 只是在循环周围添加一个新的 harness 机制——工具、知识、压缩、任务、团队、隔离。模型是智能,harness 是世界。构建好 harness,模型会完成剩下的一切。