langgraph 解读
项目信息
- 项目名称:LangGraph
- 项目描述:LangGraph 是一个低级编排框架,专为构建、管理和部署长时间运行的有状态 AI Agent 而设计。它由 LangChain 公司开发维护,已被 Klarna、Replit、Elastic 等公司用于生产环境。LangGraph = 一个用于构建、部署有状态 AI Agent 的图执行引擎**(受 Google Pregel 启发)。
- 项目地址:https://github.com/langchain-ai/langgraph
- 官方文档:https://docs.langchain.com/oss/python/langgraph/overview
1. 项目概览
1.1 项目定位与核心价值
一个受 Google Pregel 与 Apache Beam 启发的、面向 LLM Agent 的有状态图执行引擎。
一句话定位:LangGraph 是由 LangChain Inc 开发的。它 LangGraph 是一个有状态、多角色 Agent 编排框架,专为构建、管理和部署长时间运行的有状态 AI Agent 而设计,提供了构建、管理和部署长时间运行的有状态 Agent 的低层基础架构。
LangGraph 由 LangChain 公司开发维护,已被 Klarna、Replit、Elastic 等公司用于生产环境。通过"图 + 状态"这一核心抽象,填补了"无状态 LLM 库"与"生产级 Agent 系统"之间的鸿沟。它特别适合:
- 复杂多步骤 AI 工作流
- 长时间运行、需要持久化的 Agent
- 多 Agent 协作场景
- 有人工介入的 AI 流程
- 需要精确回放和审计的合规场景
解决的核心痛点:
- 长时间运行 Agent 的可靠性问题:通过持久化执行 (Durable Execution) 让 Agent 在故障后从断点精确恢复
- 人机协同的复杂性:提供 Human-in-the-Loop 能力,允许在任意节点暂停、检查和修改状态
- 多角色 Agent 的状态管理:提供全面的记忆系统,包括短期工作记忆和跨会话的长期记忆
1.2 仓库规模与技术栈
| 指标 | 数值 |
|---|---|
| 总文件数 | 632 |
| 总代码行数 | 237,138 |
| 源文件行数 | 183,494 |
| Python 库 | 8 个 |
| JS/TS 库 | 1 个 (占位) |
| 测试文件 | ~445 (Python test files) |
| 类别 | 技术选择 |
|---|---|
| 后端语言 | Python 3.10+ (主力) |
| 核心依赖 | langchain-core, pydantic, xxhash |
| 构建工具 | Hatchling (打包), uv (依赖管理) |
| 代码质量 | Ruff, MyPy, codespell |
| 测试框架 | pytest + pytest-asyncio + syrupy |
| 序列化 | ormsgpack, orjson |
| 数据库 | PostgreSQL (psycopg), SQLite (aiosqlite), Redis |
| CLI 框架 | Click |
| HTTP | httpx, websockets |
1.3 生态定位
[应用层] Deep Agents (高级 Agent 框架)
↓ built on
[编排层] ★ LangGraph (本仓库) - 低级图编排引擎
↓ depends on
[基础层] LangChain Core - 消息/工具/Runnable 抽象
↓ integrates with
[平台层] LangSmith - 调试/监控/部署关键特征
| 特征 | 说明 |
|---|---|
| 持久化执行 (Durable Execution) | Agent 可在失败中恢复并从中断点继续运行 |
| 人在回路 (Human-in-the-Loop) | 在执行任意点暂停、检查/修改状态 |
| 综合记忆 (Comprehensive Memory) | 短期工作记忆 + 长期持久记忆 |
| 可调试性 | 与 LangSmith 深度集成,支持轨迹回放 |
| 生产级部署 | 通过 LangSmith Deployment 平台可一键部署 |
| 多语言 SDK | Python(主)+ JS/TS(独立仓库 langgraphjs) |
1.4 行业应用与应用场景
Trusted by companies shaping the future of agents – including Klarna, Replit, Elastic
解决的核心问题
| 问题 | 传统方案痛点 | LangGraph 方案 |
|---|---|---|
| LLM 调用无状态 | 无法跨调用共享上下文 | 状态图 + Channel 系统持久化 |
| 长任务中断恢复难 | 重新执行耗 token/时间 | Checkpoint 机制精确恢复 |
| 多角色协作复杂 | 需手写状态机 | 声明式图 + 边/条件边 |
| 工具调用编排混乱 | if-else 嵌套 | ToolNode + ReAct Agent |
| Agent 调试困难 | 黑盒无轨迹 | LangSmith tracing + 状态回放 |
典型使用场景
1. 多轮对话机器人 (Chatbot)
└─ StateGraph + MessagesState + Checkpoint
2. ReAct 工具调用 Agent
└─ create_react_agent (prebuilt) + ToolNode
3. RAG (检索增强生成)
└─ 分支路由 + 子图 + 长期记忆 Store
4. 多 Agent 协作 (Multi-Agent)
└─ 子图 + Send/Command + 状态隔离
5. 人机协作 (Human-in-the-Loop)
└─ interrupt() + checkpointer
6. 计划与执行 (Plan-and-Execute)
└─ LLM 规划 + 执行循环 + 重规划
7. 反思与改进 (Reflexion)
└─ 自我评估节点 + 反馈循环
8. 长期记忆 (Long-term Memory)
└─ BaseStore + 语义检索1.5 技术栈与依赖
主语言与版本
| 库 | 主语言 | Python 版本 | 框架依赖 |
|---|---|---|---|
| langgraph | Python 3.10+ | 3.10-3.13 | langchain-core, pydantic v2, xxhash |
| langgraph-checkpoint | Python 3.10+ | - | langchain-core, ormsgpack |
| langgraph-prebuilt | Python 3.10+ | - | langgraph-checkpoint, langchain-core |
| langgraph-sdk | Python 3.10+ | - | httpx, orjson, langchain-core, websockets |
| langgraph-cli | Python 3.10+ | - | typer, click, docker |
| langgraph-checkpoint-postgres | Python 3.10+ | - | psycopg, asyncpg |
| langgraph-checkpoint-sqlite | Python 3.10+ | - | aiosqlite |
| sdk-js | 仅 README 占位 | - | JS SDK 在独立仓库 |
核心第三方依赖
- langchain-core (Runnable 抽象、Messages 类型)
- pydantic v2 (类型校验、状态 Schema)
- ormsgpack (高效序列化,比 msgpack 快)
- xxhash (快速哈希)
- psycopg / asyncpg (Postgres 驱动)
- aiosqlite (SQLite 异步驱动)
- httpx / orjson / websockets (SDK 通信)
- typer / click (CLI)
- redis (可选缓存)内部架构依赖图
checkpoint (基类)
├── checkpoint-sqlite (实现)
├── checkpoint-postgres (实现)
├── prebuilt (依赖)
└── langgraph (核心)
prebuilt
└── checkpoint
langgraph (核心)
├── checkpoint
├── prebuilt
└── sdk-py (轻量)
cli
└── (部署工具,不直接被生产代码使用)
sdk-py
├── langgraph
└── cli
sdk-js (独立仓库)典型 Case 代码样例
最简工作流(入门示例)
from langgraph.graph import START, StateGraph
from typing_extensions import TypedDict
class State(TypedDict):
text: str
def node_a(state: State) -> dict:
return {"text": state["text"] + "a"}
def node_b(state: State) -> dict:
return {"text": state["text"] + "b"}
graph = StateGraph(State)
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
result = graph.compile().invoke({"text": ""})
# 输出: {'text': 'ab'}ReAct Agent(工具调用)
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
def search(query: str):
"""Call to surf the web."""
if "sf" in query.lower():
return "It's 60 degrees and foggy."
return "It's 90 degrees and sunny."
tools = [search]
model = ChatAnthropic(model="claude-3-7-sonnet-latest")
app = create_react_agent(model, tools)
app.invoke({
"messages": [{"role": "user", "content": "what is the weather in sf"}]
})有状态对话(带 Checkpoint)
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import InMemorySaver
# 编译时挂载 checkpointer
app = workflow.compile(checkpointer=InMemorySaver())
# 通过 thread_id 隔离会话
config = {"configurable": {"thread_id": "user-123"}}
app.invoke({"messages": [...]}, config)
# 下次调用会自动恢复
app.invoke({"messages": [...]}, config) # 看到历史Human-in-the-Loop
from langgraph.types import interrupt
from langgraph.prebuilt.interrupt import HumanInterrupt, HumanResponse
def my_node(state):
request: HumanInterrupt = {
"action_request": {"action": "send_email", "args": {...}},
"config": {"allow_ignore": True, "allow_respond": True},
"description": "..."
}
response = interrupt([request])[0]
# 根据 response 继续执行2. 核心设计思想
2.1 灵感来源
LangGraph is inspired by Pregel and Apache Beam. The public interface draws inspiration from NetworkX.
- Pregel (Google): 分布式图计算 BSP(Bulk Synchronous Parallel)模型
- Apache Beam: 统一的批流处理抽象
- NetworkX: 简洁优雅的图操作 API
2.2 三大核心抽象
1. Channel (通道)
└─ 节点间数据流转的载体
└─ 类型: LastValue / Topic / Binop / Ephemeral / NamedBarrier
2. Node (节点)
└─ 计算单元(Python 函数 / Runnable)
└─ 读 Channel → 计算 → 写 Channel
3. Edge (边) + Branch (条件边)
└─ 控制节点执行顺序
└─ 静态边 + 动态条件分支2.3 Superstep 模型
Pregel 采用 BSP 模型:
Step 1: 所有节点并发执行(读 Channel 旧值)
Step 2: 同步屏障(Barrier)
Step 3: 写 Channel(原子提交)
Step 4: 进入下一个 Superstep2.4 与生态的关系
LangChain 生态
LangChain (高层 LLM 应用框架)
└── Deep Agents (高层 Agent API)
└── LangGraph (底层图执行引擎)
├── Checkpoint (持久化)
├── Prebuilt (ReAct, ToolNode)
└── SDK (REST API)
└── CLI (本地开发/部署)
└── LangSmith (云端部署/可观测性)关键术语
| 术语 | 含义 |
|---|---|
| Pregel | 核心执行引擎(类比 Google Pregel) |
| StateGraph | 高级有状态图 API(用户主要入口) |
| Channel | 节点间数据流通道 |
| Checkpoint | 状态快照(线程/会话) |
| Thread | 会话线程 ID |
| Superstep | 图执行的一个原子步骤 |
| Interrupt | 人类介入的暂停点 |
| Branch | 条件边(运行时决定下一个节点) |
| Send | 动态派发(map-reduce 模式) |
| Store | 长期记忆(跨线程) |
| Cache | LLM 响应缓存 |
| ToolNode | 工具调用执行节点 |
| create_react_agent | ReAct 模式 Agent 工厂 |
2.5 选型对比(LangGraph vs 竞品)
| 维度 | LangGraph | 竞品 A | 竞品 B |
|---|---|---|---|
| 抽象层级 | 低(用户控制) | 高 | 中 |
| 持久化 | 原生 Checkpoint | 需自实现 | 弱 |
| 图执行 | BSP / Superstep | DAG | 事件流 |
| 与 LangChain 集成 | 深度 | 无 | 弱 |
| 多语言 | Py + JS | 单一 | 单一 |
| 生产级部署 | LangSmith 平台 | 需自建 | 需自建 |
- 设计哲学 - "Low-level framework",不抽象 prompts 和架构
- 生态完整 - 从单机到生产部署的完整链路
- 可扩展性 - Channel 抽象允许自定义状态管理
- 性能优化 - 使用 ormsgpack、xxhash 等高性能库
- 安全考虑 - Checkpoint 反序列化可限制模块(
LANGGRAPH_STRICT_MSGPACK) - 标准兼容 - 兼容 Python 3.10-3.13,pydantic v2
- 可测试性 - checkpoint-conformance 子库提供一致性测试套件
- JS 生态独立 - JS SDK 迁至独立仓库
langgraphjs
3. 整体架构设计
3.1 架构概述
LangGraph 采用分层架构 + 图式计算模型。整体分为四层:
- 用户 API 层 (
graph/):提供声明式的图构建 DSL,用户通过StateGraph、MessageGraph定义节点、边、条件分支 - 执行引擎层 (
pregel/):实现 Pregel 风格的图执行引擎,负责节点调度、状态管理、检查点、流式输出 - 状态通道层 (
channels/):管理图状态的具体存储和更新策略,支持 LastValue、BinaryOperator、Delta 等多种通道类型 - 持久化基础层 (
checkpoint/,cache/,store/):提供可插拔的检查点、缓存和长期记忆后端
3.2 整体架构图
+===================================================================+
| 用户 API 层 (Graph API) |
| |
| +---------------------------+ +----------------------------+ |
| | StateGraph | | MessageGraph | |
| | add_node / add_edge | | (消息优先图) | |
| | add_conditional_edges | | | |
| | compile() | | | |
| +-------------+-------------+ +-------------+--------------+ |
| | | |
+================|==============================|====================+
| |
v v
+===================================================================+
| 执行引擎层 (Pregel Runtime) |
| |
| +----------------------------------------------------------------+ |
| | Pregel (主循环控制器) | |
| | +-------------+ +-------------+ +-------------+ | |
| | | _loop.py | | _runner.py | | _executor.py| | |
| | | Superstep | | Task 调度 | | 并行执行器 | | |
| | | 循环调度 | | 管理 | | | | |
| | +------+------+ +------+------+ +------+------+ | |
| | | | | | |
| +---------+----------------+----------------+--------------------+ |
| | | | |
| +---------v----------------v----------------v--------------------+ |
| | 子模块 (横切关注点) | |
| | +----------+ +----------+ +----------+ +----------+ | |
| | |_checkpoint| _read.py | _write.py | _retry.py | | |
| | | 检查点 | 状态读取 | 状态写入 | 重试策略 | | |
| | +----------+ +----------+ +----------+ +----------+ | |
| | +----------+ +----------+ +----------+ +----------+ | |
| | |_algo.py | _validate | _config.py| _io.py | | |
| | | 图算法 | 结构验证 | 配置管理 | I/O 映射 | | |
| | +----------+ +----------+ +----------+ +----------+ | |
| +---------------------------------------------------------------+ |
+================|====================================================+
|
v
+===================================================================+
| 状态通道层 (Channels) |
| |
| +----------+ +----------+ +----------+ +----------+ |
| |LastValue | |BinaryOp | |Delta | |Topic | |
| |覆盖策略 | |聚合策略 | |增量(β) | |发布订阅 | |
| +----------+ +----------+ +----------+ +----------+ |
+================|====================================================+
|
v
+===================================================================+
| 持久化基础层 (Persistence) |
| |
| +------------------+ +------------------+ +------------------+ |
| | checkpoint/ | | cache/ | | store/ | |
| | BaseCheckpoint | | BaseCache | | BaseStore | |
| | Saver | | | | (长期记忆) | |
| | +-----------+ | | +-----------+ | | +-----------+ | |
| | | Postgres | | | | Memory | | | | Memory | | |
| | | SQLite | | | | Redis | | | | (可扩展) | | |
| | | Memory | | | +-----------+ | | +-----------+ | |
| | +-----------+ | | | | | |
| +------------------+ +------------------+ +------------------+ |
+===================================================================+分层职责说明
| 层 | 职责 | 对外接口 | 依赖关系 |
|---|---|---|---|
| 用户 API 层 | 提供声明式图构建 DSL | StateGraph, MessageGraph, add_node, add_edge, compile() | 依赖执行引擎层 |
| 执行引擎层 | 图编译、节点调度、状态管理、流式输出 | Pregel, PregelProtocol | 依赖状态通道层 + 持久化层 |
| 状态通道层 | 管理不同更新策略的状态存储 | BaseChannel | 仅被执行引擎层使用 |
| 持久化基础层 | 检查点、缓存、长期记忆的抽象和实现 | BaseCheckpointSaver, BaseCache, BaseStore | 独立可插拔 |
3.3 目录结构
langgraph/ # 【核心基建】LangGraph 单体仓库根目录
├── .github/ # 【配置】GitHub CI/CD 与社区管理
│ ├── workflows/ # 【配置】GitHub Actions CI 工作流 (lint, test, release)
│ ├── ISSUE_TEMPLATE/ # 【配置】Issue 模板
│ ├── PULL_REQUEST_TEMPLATE.md # 【配置】PR 模板
│ ├── THREAT_MODEL.md # 【配置】安全威胁模型 (58KB 详细文档)
│ ├── dependabot.yml # 【配置】依赖自动更新配置
│ ├── actions/ # 【配置】自定义 Composite Actions
│ ├── images/ # 【配置】Logo 资源 (dark/light)
│ └── scripts/ # 【工具集】CI 辅助脚本
├── docs/ # 【配置】文档辅助资源
│ ├── generate_redirects.py # 【工具集】重定向规则生成脚本
│ ├── redirects.json # 【配置】URL 重定向映射表
│ └── llms.txt # 【配置】LLM 友好的文档索引文件
├── libs/ # 【核心基建】所有库的父目录
│ ├── checkpoint/ # 【核心基建】检查点基础接口 (checkpoint base)
│ │ ├── pyproject.toml # 【配置】langgraph-checkpoint v4.1.1
│ │ └── langgraph/
│ │ ├── checkpoint/
│ │ │ ├── base/
│ │ │ │ ├── __init__.py # 【核心基建】BaseCheckpointSaver - 检查点抽象基类
│ │ │ │ └── id.py # 【工具集】UUID6 分布式 ID 生成器
│ │ │ ├── serde/
│ │ │ │ ├── __init__.py # 【核心基建】序列化模块导出
│ │ │ │ ├── types.py # 【核心基建】ChannelProtocol, 序列化类型定义
│ │ │ │ ├── base.py # 【核心基建】SerializerProtocol 序列化协议
│ │ │ │ ├── jsonplus.py # 【核心基建】JsonPlusSerializer - JSON+MessagePack 序列化
│ │ │ │ ├── _msgpack.py # MessagePack 序列化内部实现
│ │ │ │ ├── encrypted.py # 【核心基建】EncryptedSerializer - 加密序列化
│ │ │ │ └── event_hooks.py # 【核心基建】序列化事件钩子
│ │ │ └── memory/
│ │ │ └── __init__.py # 【核心基建】InMemorySaver - 内存检查点 (开发/测试用)
│ │ ├── cache/
│ │ │ ├── base/
│ │ │ │ └── __init__.py # 【核心基建】BaseCache - 缓存抽象基类
│ │ │ ├── memory/
│ │ │ │ └── __init__.py # 【工具集】InMemoryCache - 内存缓存实现
│ │ │ └── redis/
│ │ │ └── __init__.py # 【工具集】RedisCache - Redis 缓存实现
│ │ └── store/
│ │ ├── base/
│ │ │ ├── __init__.py # 【核心基建】BaseStore - 长期记忆存储基类
│ │ │ ├── embed.py # 【核心基建】Embedding 向量化搜索辅助
│ │ │ └── batch.py # 【核心基建】批量操作辅助
│ │ └── memory/
│ │ └── __init__.py # 【工具集】InMemoryStore - 内存长期记忆存储
│ ├── checkpoint-conformance/ # 【工具集】检查点一致性测试套件
│ │ ├── pyproject.toml # 【配置】langgraph-checkpoint-conformance v0.0.2
│ │ └── langgraph/
│ │ └── checkpoint/
│ │ └── conformance/ # 【工具集】Checkpointer 实现的一致性验证测试
│ ├── checkpoint-postgres/ # 【业务模块】Postgres 检查点持久化实现
│ │ ├── pyproject.toml # 【配置】langgraph-checkpoint-postgres v3.1.0
│ │ └── langgraph/
│ │ └── checkpoint/
│ │ └── postgres/ # 【业务模块】AsyncPostgresSaver - psycopg 异步实现
│ ├── checkpoint-sqlite/ # 【业务模块】SQLite 检查点持久化实现
│ │ ├── pyproject.toml # 【配置】langgraph-checkpoint-sqlite v3.1.0
│ │ └── langgraph/
│ │ └── checkpoint/
│ │ └── sqlite/ # 【业务模块】AsyncSqliteSaver - aiosqlite 实现
│ ├── cli/ # 【业务模块】LangGraph CLI 命令行工具
│ │ ├── pyproject.toml # 【配置】langgraph-cli (Click 框架)
│ │ └── langgraph_cli/
│ │ ├── cli.py # 【核心基建】CLI 入口 (Click 命令组注册)
│ │ ├── deploy.py # 【业务模块】deploy 命令 - 生产部署到 LangSmith
│ │ ├── dev.py # 【业务模块】dev 命令 - 本地开发服务器
│ │ ├── docker.py # 【业务模块】Docker 镜像构建与容器管理
│ │ ├── config.py # 【工具集】langgraph.json 配置解析
│ │ ├── analytics.py # 【工具集】匿名使用统计 (可 opt-out)
│ │ ├── exec.py # 【工具集】子进程执行与 Runner 管理
│ │ ├── progress.py # 【工具集】终端进度显示
│ │ ├── templates.py # 【工具集】项目模板脚手架 (new 命令)
│ │ ├── schemas.py # 【核心基建】配置数据模型
│ │ ├── dependency_tracking.py # 【工具集】Python 依赖追踪
│ │ ├── host_backend.py # 【工具集】主机后端抽象
│ │ ├── archive.py # 【工具集】源码打包归档
│ │ ├── util.py # 【工具集】通用工具函数
│ │ ├── uv_lock.py # 【工具集】uv.lock 文件解析
│ │ ├── _ignore.py # 文件忽略规则
│ │ ├── __main__.py # 【核心基建】python -m langgraph_cli 入口
│ │ └── constants.py # 【配置】CLI 常量
│ ├── langgraph/ # 【核心基建】核心框架 - 低级编排引擎
│ │ ├── pyproject.toml # 【配置】langgraph v1.2.4
│ │ └── langgraph/
│ │ ├── graph/ # 【核心基建】图构建 API 层 (用户接口)
│ │ │ ├── __init__.py # 【核心基建】导出 StateGraph, MessageGraph, END, START
│ │ │ ├── state.py # 【核心基建】StateGraph 构建器 - 图定义 DSL
│ │ │ ├── message.py # 【核心基建】MessageGraph - 消息优先图
│ │ │ ├── _node.py # 节点内部实现
│ │ │ ├── _branch.py # 条件分支内部实现
│ │ │ └── ui.py # 【工具集】Mermaid 图形可视化生成
│ │ ├── pregel/ # 【核心基建】Pregel 执行引擎 (核心运行时)
│ │ │ ├── __init__.py # 【核心基建】导出 Pregel, NodeBuilder
│ │ │ ├── main.py # 【核心基建】Pregel 主循环 - 编译图执行入口 (4335 行)
│ │ │ ├── _loop.py # 【核心基建】执行循环 - Superstep 调度核心
│ │ │ ├── _runner.py # 【核心基建】任务执行器 - 单节点运行管理
│ │ │ ├── _executor.py # 【核心基建】并行执行器 - 多任务并发调度
│ │ │ ├── _algo.py # 【核心基建】图算法 - 节点调度拓扑排序
│ │ │ ├── _read.py # 【核心基建】ChannelRead - 状态读取操作
│ │ │ ├── _write.py # 【核心基建】ChannelWrite - 状态写入操作
│ │ │ ├── _checkpoint.py # 【核心基建】检查点管理 - 保存/恢复/父链遍历
│ │ │ ├── _validate.py # 【核心基建】图结构验证 - 合法性检查
│ │ │ ├── _config.py # 【核心基建】配置管理 - configurable 字段合并
│ │ │ ├── _io.py # 【核心基建】I/O 映射 - 节点输入输出绑定
│ │ │ ├── _call.py # 【核心基建】节点调用 - Runnable 执行包装
│ │ │ ├── _retry.py # 【核心基建】重试策略 - 指数退避执行
│ │ │ ├── _messages.py # 【核心基建】消息流处理 - token 级流式
│ │ │ ├── _tools.py # 【核心基建】工具集成辅助
│ │ │ ├── _draw.py # 【工具集】图绘制辅助
│ │ │ ├── _log.py # 【工具集】执行日志记录
│ │ │ ├── _utils.py # 【工具集】Pregel 通用工具
│ │ │ ├── protocol.py # 【核心基建】PregelProtocol - 编译图的抽象协议
│ │ │ ├── types.py # 【核心基建】Pregel 内部类型定义
│ │ │ ├── debug.py # 【核心基建】Debug 流模式实现
│ │ │ ├── remote.py # 【核心基建】远程图执行支持
│ │ │ └── _remote_run_stream.py # 远程运行流式传输
│ │ ├── channels/ # 【核心基建】状态通道系统 (状态管理核心)
│ │ │ ├── __init__.py # 【核心基建】通道模块导出
│ │ │ ├── base.py # 【核心基建】BaseChannel - 通道抽象基类
│ │ │ ├── last_value.py # 【核心基建】LastValue - 最后写入覆盖策略
│ │ │ ├── binop.py # 【核心基建】BinaryOperatorAggregate - 二元聚合 (operator.add)
│ │ │ ├── delta.py # 【核心基建】DeltaChannel - 增量通道 (Beta, 大状态优化)
│ │ │ ├── ephemeral_value.py # 【工具集】EphemeralValue - 临时的隐藏状态
│ │ │ ├── named_barrier_value.py # 【工具集】NamedBarrierValue - 命名屏障等待
│ │ │ ├── any_value.py # 【工具集】AnyValue - 任意值策略
│ │ │ └── topic.py # 【工具集】Topic - 发布/订阅通道
│ │ ├── managed/ # 【核心基建】托管值系统 (框架注入的自动计算值)
│ │ │ ├── __init__.py # 【核心基建】托管值模块导出
│ │ │ ├── base.py # 【核心基建】ManagedValueSpec - 托管值规范
│ │ │ └── is_last_step.py # 【核心基建】IsLastStepManager - 最后一步检测
│ │ ├── stream/ # 【核心基建】流式输出子系统
│ │ │ ├── __init__.py # 【核心基建】流式模块导出
│ │ │ ├── _types.py # 【核心基建】流式类型定义
│ │ │ ├── _convert.py # 【核心基建】v1 → v2 流式格式转换
│ │ │ ├── _mux.py # 【核心基建】多模式流多路复用
│ │ │ ├── run_stream.py # 【核心基建】运行流式包装器
│ │ │ ├── stream_channel.py # 【核心基建】通道流式监听
│ │ │ └── transformers.py # 【核心基建】流式事件转换器
│ │ ├── func/ # 【核心基建】函数式 API
│ │ │ └── __init__.py # 【核心基建】@task, @entrypoint 函数式装饰器
│ │ ├── _internal/ # 内部实现 (不应外部依赖)
│ │ │ ├── __init__.py # 内部模块导出
│ │ │ ├── _cache.py # 节点结果缓存
│ │ │ ├── _config.py # 内部配置辅助
│ │ │ ├── _constants.py # 内部常量定义
│ │ │ ├── _fields.py # 内部字段处理
│ │ │ ├── _future.py # 异步 Future 包装
│ │ │ ├── _pydantic.py # Pydantic 模型创建
│ │ │ ├── _queue.py # 内部队列 (Sync/Async)
│ │ │ ├── _replay.py # 状态回放
│ │ │ ├── _retry.py # 默认重试策略
│ │ │ ├── _runnable.py # Runnable 包装
│ │ │ ├── _scratchpad.py # 暂存区 (per-invocation)
│ │ │ ├── _serde.py # 序列化辅助
│ │ │ ├── _timeout.py # 超时策略
│ │ │ └── _typing.py # 内部类型工具
│ │ ├── utils/ # 【工具集】通用工具
│ │ │ └── __init__.py # 【工具集】工具函数导出
│ │ ├── types.py # 【核心基建】全局类型定义 (Send, Command, Interrupt 等)
│ │ ├── config.py # 【核心基建】get_config() - 运行时配置获取
│ │ ├── constants.py # 【配置】全局常量 (START, END, TAG_HIDDEN)
│ │ ├── errors.py # 【核心基建】异常类型 (GraphInterrupt, InvalidUpdateError 等)
│ │ ├── runtime.py # 【核心基建】LangGraphRuntime - Agent 运行时上下文
│ │ ├── callbacks.py # 【核心基建】LangGraph 回调管理器
│ │ ├── typing.py # 【核心基建】泛型类型变量 (StateT, InputT, OutputT)
│ │ ├── version.py # 【配置】版本号
│ │ └── warnings.py # 【工具集】弃用警告工具类
│ ├── prebuilt/ # 【业务模块】高级 API - 预构建 Agent/Tool 组件
│ │ ├── pyproject.toml # 【配置】langgraph-prebuilt v1.1.0
│ │ └── langgraph/
│ │ └── prebuilt/
│ │ ├── __init__.py # 【核心基建】导出 create_react_agent, ToolNode, tools_condition
│ │ ├── chat_agent_executor.py # 【业务模块】create_react_agent - ReAct Agent 工厂
│ │ ├── tool_node.py # 【业务模块】ToolNode - 工具调用执行节点
│ │ ├── tool_validator.py # 【业务模块】ValidationNode - 工具参数验证
│ │ ├── _tool_call_transformer.py # 工具调用流式转换
│ │ ├── _tool_call_stream.py # 工具调用流式处理
│ │ └── interrupt.py # 【业务模块】中断辅助工具
│ ├── sdk-js/ # 【业务模块】JS/TS SDK (仅 README 占位)
│ │ └── README.md # 【配置】SDK 说明 (实际代码在 langgraphjs 仓库)
│ └── sdk-py/ # 【业务模块】Python SDK - LangGraph Server 客户端
│ ├── pyproject.toml # 【配置】langgraph-sdk (动态版本)
│ └── langgraph_sdk/
│ ├── __init__.py # 【核心基建】SDK 版本与模块导出
│ ├── client.py # 【核心基建】LangGraphClient - HTTP/WS 主客户端
│ ├── schema.py # 【核心基建】API 数据模型 (Assistants, Threads, Runs)
│ ├── sse.py # 【核心基建】Server-Sent Events 流式解析
│ ├── runtime.py # 【核心基建】Runtime 管理
│ ├── auth/ # 【业务模块】认证模块 (API Key, Token)
│ ├── encryption/ # 【业务模块】加密模块
│ ├── errors.py # 【核心基建】SDK 异常类型
│ ├── cache.py # 【工具集】SDK 缓存
│ ├── _async/ # 异步客户端实现
│ ├── _sync/ # 同步客户端实现
│ ├── _shared/ # 共享内部逻辑
│ └── stream/ # 【核心基建】流式响应处理
├── README.md # 【配置】英文项目说明
├── README-CN.md # 【配置】中文项目说明 (本分析生成)
├── CLAUDE.md # 【配置】AI Agent 工作指令
├── Makefile # 【配置】顶层构建脚本
└── .gitignore # 【配置】Git 忽略规则4. 模块依赖与调用关系
4.1 全局入口与核心路由
逻辑说明:用户通过
StateGraph构建图,调用compile()生成CompiledStateGraph(继承自Pregel)。Pregel是核心执行引擎,实现了PregelProtocol和 LangChain 的Runnable接口。执行时通过invoke()/stream()/astream()进入主循环_loop.py,由_algo.py计算调度拓扑,_executor.py并发执行任务,_runner.py管理单个节点运行。调用拓扑:
StateGraph.compile() (用户入口)
+---> Pregel.__init__()
|
+---> _validate.validate_graph() (验证节点/边合法性)
|
+---> Channel.subscribe_to() (建立通道-节点订阅关系)
|
+---> Pregel.invoke() / astream() (执行入口)
|
+---> _loop.PregelLoop (主循环)
|
+---> _checkpoint.get_checkpoint() (恢复/创建检查点)
|
+---> _algo.prepare_next_tasks() (计算下一步任务)
| |
| +---> _read.ChannelRead (读取通道值)
| +---> _write.ChannelWrite (准备通道写入)
|
+---> _executor.execute_tasks() (并行执行任务)
| |
| +---> _runner.PregelRunner (单任务管理)
| | |
| | +---> _call.call() (实际执行节点)
| | +---> _retry.PregelRetry (重试逻辑)
| | +---> _messages.stream_messages() (消息流)
| |
| +---> _io.map_input() (输入映射)
| +---> _io.map_output_updates() (输出映射)
|
+---> _checkpoint.create_checkpoint() (保存检查点)
|
+---> stream/ (流式输出分发)
|
+---> _mux.StreamMultiplexer (多模式多路复用)
+---> _convert.convert_to_v2() (格式转换)4.2 核心业务实体与关联
- 实体定义:
| 实体 | 描述 | 核心属性 |
|---|---|---|
| StateGraph | 用户构建的图定义 | nodes, edges, conditional_edges, state_schema |
| Pregel (CompiledStateGraph) | 编译后的可执行图 | channels, nodes, checkpointer, stream_modes |
| Checkpoint | 某一时刻的状态快照 | channel_values, channel_versions, versions_seen |
| PregelExecutableTask | 待执行的任务单元 | name, input, proc (Runnable), config, triggers |
| Channel | 状态通道 | 类型: LastValue / BinaryOperator / Delta / Topic |
| Send | 动态路由消息 | node (目标节点), arg (携带状态) |
| Command | 执行控制指令 | graph, update, resume, goto |
| Interrupt | 中断信号 | value, id |
- 实体引用拓扑:
[StateGraph] 1 ----> 1 [Pregel (CompiledStateGraph)]
|
+-- 1 ----> N [BaseChannel]
| |
| +-- LastValue
| +-- BinaryOperatorAggregate
| +-- DeltaChannel
| +-- Topic
|
+-- 1 ----> 0..1 [BaseCheckpointSaver]
| |
| +-- InMemorySaver
| +-- AsyncPostgresSaver
| +-- AsyncSqliteSaver
|
+-- 1 ----> N [PregelExecutableTask]
| |
| +-- proc: Runnable (实际节点函数)
| +-- triggers: 触发通道
|
+-- 1 ----> N [Send] (动态路由)
+-- 1 ----> N [Command] (控制指令)
+-- 0..N -> N [Subgraph] (嵌套子图)5. 核心模块详解
模块一:Pregel 执行引擎
模块名称:Pregel Execution Engine (
pregel/)设计说明:Pregel 是整个 LangGraph 的心脏,实现了受 Google Pregel 启发的超步 (Superstep) 执行模型。每轮超步包含三个步骤:(1) 计划 — 根据通道版本确定需要执行的节点;(2) 执行 — 并行执行所有就绪任务;(3) 更新 — 将结果写入通道并保存检查点。采用策略模式支持多种耐久性模式 (
sync,async,exit)。内部结构图:
+===========================================================+
| Pregel (主控制器) |
| compile() | invoke() | stream() | astream() |
+==========================+================================+
|
+----------- v -----------+
| _loop.py (主循环) |
| PregelLoop.superstep() |
+-----------+-------------+
|
+-----------------+------------------+
| | |
+-----v-----+ +------v------+ +------v------+
|_algo.py | |_executor.py | |_checkpoint.py|
|调度算法 | |并行执行器 | |检查点管理 |
|prepare_ | |execute_ | |save/load/ |
|next_tasks | |tasks() | |parent_chain |
+-----------+ +------+------+ +--------------+
|
+----------v----------+
| _runner.py |
| PregelRunner |
| call() / tick() |
+----------+----------+
|
+-----------------+-------------------+
| | |
+-----v-----+ +------v------+ +-------v------+
|_call.py | |_retry.py | |_messages.py |
|节点调用 | |重试/退避 | |消息流式处理 |
+-----------+ +-------------+ +--------------+模块二:StateGraph 图构建器
模块名称:StateGraph Builder (
graph/state.py)设计说明:
StateGraph是用户构建 Agent 工作流的主入口,采用构建器模式 + 编译模式。用户通过链式调用add_node(),add_edge(),add_conditional_edges()定义图结构,最后调用compile()将声明式定义编译为可执行的Pregel实例。期间自动进行通道创建、节点包装 (coerce_to_runnable)、图结构验证。内部结构图:
+===========================================================+
| StateGraph |
| +-----------------------------------------------------+ |
| | nodes: dict[str, StateNodeSpec] | |
| | channels: dict[str, BaseChannel] | |
| | edges: set[tuple[str, str]] | |
| | branches: dict[str, list[BranchSpec]] | |
| | entry_point: str | |
| +-----------------------------------------------------+ |
| |
| add_node(name, action) ---> 包装为 StateNodeSpec |
| add_edge(from, to) ---> 添加普通边 |
| add_conditional_edges() ---> 添加 BranchSpec |
| compile(checkpointer) ---> + |
+==================================|========================+
|
v
+===========================================================+
| CompiledStateGraph (Pregel) |
| +-----------------------------------------------------+ |
| | 自动创建: | |
| | - ChannelWrite / ChannelRead (每个节点) | |
| | - branch:to:{node} 通道 (条件路由) | |
| | - __start__ / __end__ 虚拟节点 | |
| | - 调度拓扑 (PregelNode 列表) | |
| +-----------------------------------------------------+ |
+===========================================================+模块三:状态通道系统
模块名称:Channel System (
channels/)设计说明:通道是 LangGraph 状态管理的核心抽象,每个状态 key 对应一个通道实例。通道定义了状态如何在多节点并发写入时合并。支持多种合并策略:
LastValue(直接覆盖)、BinaryOperatorAggregate(如operator.add累加)、DeltaChannel(增量更新,用于大状态优化)、Topic(发布/订阅模式)。内部结构图:
+===========================================================+
| BaseChannel (抽象基类) |
| update(values) | get() | checkpoint() | consume() |
+==========================+================================+
|
+----------------+-------------------+
| | |
+-------v------+ +-----v------+ +--------v--------+
| LastValue | | BinaryOp | | DeltaChannel |
| (覆盖策略) | | Aggregate | | (增量Beta) |
| | | (聚合策略) | | |
| update: | | update: | | update: 追加 |
| 覆盖旧值 | | 累加合并 | | 增量写入 |
+--------------+ +------------+ +-----------------+
| | |
+-------v------+ +-----v------+ +--------v--------+
|EphemeralValue| |NamedBarrier| | Topic |
|(临时不可见) | |(命名屏障) | | (发布/订阅) |
+--------------+ +------------+ +------------------+模块四:检查点持久化系统
模块名称:Checkpoint System (
checkpoint/)设计说明:采用端口-适配器模式。
BaseCheckpointSaver定义检查点的标准接口 (get, put, list, put_writes, delete_thread, copy_thread, prune),具体实现 (Memory, SQLite, Postgres) 作为适配器注入。序列化层 (serde/) 使用 JSON + MessagePack 混合方案,支持加密序列化。使用 UUID6 作为检查点 ID,保证唯一性和时间有序性。内部结构图:
+===========================================================+
| BaseCheckpointSaver (端口) |
| |
| get(config) -> Checkpoint |
| put(config, checkpoint, metadata) -> Config |
| put_writes(config, writes, task_id) |
| list(config, filter, before, limit) -> Iterator |
| get_tuple(config) -> CheckpointTuple |
| delete_thread(thread_id) |
| copy_thread(src, dst) |
| prune(thread_ids, strategy) |
+==========================+================================+
|
+---------------+---------------+
| | |
+--------v-------+ +-----v------+ +----v---------+
| InMemorySaver | | Postgres | | SQLite |
| (内存, 开发) | | Saver | | Saver |
| | | (生产) | | (轻量/边缘) |
| dict 存储 | | psycopg | | aiosqlite |
+----------------+ +------------+ +--------------+
|
| serde: SerializerProtocol
v
+===========================================================+
| 序列化层 (serde/) |
| JsonPlusSerializer (JSON + MessagePack 混合) |
| EncryptedSerializer (AES 加密包装) |
+===========================================================+模块五:CLI 命令行系统
模块名称:LangGraph CLI (
cli/)设计说明:基于 Click 框架构建的命令行工具,提供
langgraph dev(本地开发)、langgraph build(构建 Docker 镜像)、langgraph deploy(部署到 LangSmith Cloud)、langgraph new(脚手架创建项目) 等命令。通过langgraph.json配置文件声明依赖、图和环境变量。内部结构图:
+===========================================================+
| CLI 入口 (cli.py) |
| @click.group() -> langgraph 命令 |
+==========================+================================+
|
+----------------------+----------------------+
| | |
+---v--------+ +---------v------+ +---------v------+
| dev.py | | deploy.py | | templates.py |
| langgraph | | langgraph | | langgraph |
| dev | | deploy | | new |
| | | | | |
| 本地开发 | | 部署到 | | 项目脚手架 |
| 服务器 | | LangSmith | | 生成 |
+----+-------+ +-------+--------+ +----------------+
| |
v v
+----+-------+ +-------+--------+
| docker.py | | config.py |
| Docker 镜像| | langgraph.json |
| 构建与运行| | 配置解析 |
+------------+ +----------------+6. 关键数据流程
场景一:标准图执行流程 (invoke)
场景说明:用户通过 graph.invoke(input, config) 触发一次完整的图执行。从输入状态开始,经过多个节点的串联执行,最终返回最终状态。过程中自动保存检查点,支持从中断恢复。
场景二:Human-in-the-Loop 中断与恢复
- 场景说明:节点调用
interrupt()函数暂停执行,抛出GraphInterrupt异常。检查点保存当前状态。用户通过Command(resume=value)恢复执行,从同一节点的开头重跑。
场景三:Map-Reduce 并行模式 (Send)
- 场景说明:条件边函数返回多个
Send对象,每个指向同一个节点但携带不同的状态。执行引擎并行执行所有 Send 任务,结果通过 BinaryOperator 通道自动聚合。
7. 接口与契约规范
7.1 核心内部模块契约
/**
* PregelProtocol - 编译图的抽象协议
*
* 所有编译后的图 (CompiledStateGraph) 和子图都实现此接口。
* 继承自 LangChain 的 Runnable 接口,提供完整的执行生命周期。
*/
interface PregelProtocol<StateT, ContextT, InputT, OutputT> extends Runnable<InputT, Any> {
// === 配置 ===
with_config(config?: RunnableConfig, ...kwargs: Any): PregelProtocol;
// === 图结构 ===
get_graph(config?: RunnableConfig, xray?: number | boolean): DrawableGraph;
// === 状态检查 ===
get_state(config: RunnableConfig, subgraphs?: boolean): StateSnapshot;
get_state_history(
config: RunnableConfig,
filter?: Record<string, Any>,
before?: RunnableConfig,
limit?: number
): Iterator<StateSnapshot>;
// === 状态更新 ===
update_state(
config: RunnableConfig,
values: Record<string, Any> | Any | null,
as_node?: string
): RunnableConfig;
// === 执行 (带版本控制) ===
invoke(
input: InputT | Command | null,
config?: RunnableConfig,
context?: ContextT,
stream_mode?: StreamMode | StreamMode[],
version?: "v1" | "v2"
): GraphOutput<OutputT> | Record<string, Any> | Any;
stream(
input: InputT | Command | null,
config?: RunnableConfig,
stream_mode?: StreamMode | StreamMode[],
version?: "v1" | "v2"
): Iterator<StreamPart<StateT, OutputT>>;
}/**
* BaseCheckpointSaver - 检查点持久化抽象
*
* 所有检查点实现 (Memory, Postgres, SQLite) 都继承此基类。
* 采用模板方法模式:同步方法默认调用异步实现。
*/
interface BaseCheckpointSaver<V extends int | float | str> {
// 序列化器
serde: SerializerProtocol;
// === 读取 ===
get(config: RunnableConfig): Checkpoint | null;
get_tuple(config: RunnableConfig): CheckpointTuple | null;
// === 写入 ===
put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions
): RunnableConfig;
put_writes(
config: RunnableConfig,
writes: Sequence<[string, Any]>,
task_id: string,
task_path?: string
): void;
// === 列表 ===
list(
config?: RunnableConfig,
filter?: Record<string, Any>,
before?: RunnableConfig,
limit?: number
): Iterator<CheckpointTuple>;
// === 管理 ===
delete_thread(thread_id: string): void;
delete_for_runs(run_ids: string[]): void;
copy_thread(source_thread_id: string, target_thread_id: string): void;
prune(thread_ids: string[], strategy?: "keep_latest" | "delete"): void;
// === 版本 ===
get_next_version(current: V | null, channel: null): V;
}/**
* ChannelProtocol - 状态通道抽象
*
* 每个状态 key 对应一个 Channel 实例,定义状态更新和读取策略。
*/
interface ChannelProtocol<ValueType> {
/** 唯一标识 */
key: string;
/** 更新状态 (可能被多个节点并发调用) */
update(values: Sequence<ValueType>): void;
/** 获取当前值 */
get(): ValueType;
/** 消费值 (用于触发节点执行) */
consume() -> boolean;
/** 生成检查点快照 */
checkpoint(): ValueType | null;
/** 从检查点恢复 */
from_checkpoint(checkpoint: ValueType): void;
}7.2 对外 API 契约 (LangGraph Server API)
# LangGraph Server 创建 Assistants API
paths:
/assistants:
post:
summary: 创建 Assistant
requestBody:
content:
application/json:
schema:
type: object
required: [graph_id, config]
properties:
graph_id:
type: string
description: 已注册的图 ID
config:
type: object
description: 默认配置
metadata:
type: object
responses:
'201':
description: Assistant 创建成功
content:
application/json:
schema:
$ref: '#/components/schemas/Assistant'
/threads:
post:
summary: 创建对话线程
requestBody:
content:
application/json:
schema:
type: object
properties:
metadata:
type: object
responses:
'201':
description: Thread 创建成功
content:
application/json:
schema:
$ref: '#/components/schemas/Thread'
/threads/{thread_id}/runs:
post:
summary: 在指定线程上执行图
parameters:
- name: thread_id
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
type: object
required: [assistant_id]
properties:
assistant_id:
type: string
input:
type: object
config:
type: object
stream_mode:
type: array
items:
type: string
enum: [values, updates, messages, custom, debug]
responses:
'200':
description: Run 创建成功,返回流式事件
content:
text/event-stream:
schema:
type: string
/threads/{thread_id}/runs/{run_id}/join:
get:
summary: 等待 Run 完成并返回最终状态
parameters:
- name: thread_id
in: path
required: true
schema:
type: string
- name: run_id
in: path
required: true
schema:
type: string
responses:
'200':
description: Run 完成
content:
application/json:
schema:
$ref: '#/components/schemas/Run'8. 快速开始
8.1 环境配置
# 安装核心库
pip install -U langgraph
# 安装检查点后端 (可选)
pip install -U langgraph-checkpoint-sqlite # 开发/轻量
pip install -U langgraph-checkpoint-postgres # 生产8.2 构建第一个 Agent
from langgraph.graph import StateGraph, END, START
from typing import TypedDict, Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
graph = StateGraph(State)
graph.add_node("agent", call_llm)
graph.add_node("tools", call_tools)
graph.add_edge("agent", "tools")
graph.add_conditional_edges("tools", decide_next, {"continue": "agent", "end": END})
graph.add_edge(START, "agent")
app = graph.compile()
result = app.invoke({"messages": [HumanMessage(content="你好!")]})