Skip to content

langgraph 解读

项目信息

  • 项目名称:LangGraph
  • 项目描述:LangGraph 是一个低级编排框架,专为构建、管理和部署长时间运行的有状态 AI Agent 而设计。它由 LangChain 公司开发维护,已被 Klarna、Replit、Elastic 等公司用于生产环境。LangGraph = 一个用于构建、部署有状态 AI Agent 的图执行引擎**(受 Google Pregel 启发)。
  • 项目地址:https://github.com/langchain-ai/langgraph
  • 官方文档:https://docs.langchain.com/oss/python/langgraph/overview

1. 项目概览

1.1 项目定位与核心价值

一个受 Google Pregel 与 Apache Beam 启发的、面向 LLM Agent 的有状态图执行引擎。

一句话定位:LangGraph 是由 LangChain Inc 开发的。它 LangGraph 是一个有状态、多角色 Agent 编排框架,专为构建、管理和部署长时间运行的有状态 AI Agent 而设计,提供了构建、管理和部署长时间运行的有状态 Agent 的低层基础架构。

LangGraph 由 LangChain 公司开发维护,已被 Klarna、Replit、Elastic 等公司用于生产环境。通过"图 + 状态"这一核心抽象,填补了"无状态 LLM 库"与"生产级 Agent 系统"之间的鸿沟。它特别适合:

  • 复杂多步骤 AI 工作流
  • 长时间运行、需要持久化的 Agent
  • 多 Agent 协作场景
  • 有人工介入的 AI 流程
  • 需要精确回放和审计的合规场景

解决的核心痛点

  1. 长时间运行 Agent 的可靠性问题:通过持久化执行 (Durable Execution) 让 Agent 在故障后从断点精确恢复
  2. 人机协同的复杂性:提供 Human-in-the-Loop 能力,允许在任意节点暂停、检查和修改状态
  3. 多角色 Agent 的状态管理:提供全面的记忆系统,包括短期工作记忆和跨会话的长期记忆

1.2 仓库规模与技术栈

指标数值
总文件数632
总代码行数237,138
源文件行数183,494
Python 库8 个
JS/TS 库1 个 (占位)
测试文件~445 (Python test files)
类别技术选择
后端语言Python 3.10+ (主力)
核心依赖langchain-core, pydantic, xxhash
构建工具Hatchling (打包), uv (依赖管理)
代码质量Ruff, MyPy, codespell
测试框架pytest + pytest-asyncio + syrupy
序列化ormsgpack, orjson
数据库PostgreSQL (psycopg), SQLite (aiosqlite), Redis
CLI 框架Click
HTTPhttpx, websockets

1.3 生态定位

[应用层]   Deep Agents (高级 Agent 框架)
              ↓ built on
[编排层] ★ LangGraph (本仓库) - 低级图编排引擎
              ↓ depends on
[基础层]   LangChain Core - 消息/工具/Runnable 抽象
              ↓ integrates with
[平台层]   LangSmith - 调试/监控/部署

关键特征

特征说明
持久化执行 (Durable Execution)Agent 可在失败中恢复并从中断点继续运行
人在回路 (Human-in-the-Loop)在执行任意点暂停、检查/修改状态
综合记忆 (Comprehensive Memory)短期工作记忆 + 长期持久记忆
可调试性与 LangSmith 深度集成,支持轨迹回放
生产级部署通过 LangSmith Deployment 平台可一键部署
多语言 SDKPython(主)+ JS/TS(独立仓库 langgraphjs

1.4 行业应用与应用场景

Trusted by companies shaping the future of agents – including Klarna, Replit, Elastic

解决的核心问题

问题传统方案痛点LangGraph 方案
LLM 调用无状态无法跨调用共享上下文状态图 + Channel 系统持久化
长任务中断恢复难重新执行耗 token/时间Checkpoint 机制精确恢复
多角色协作复杂需手写状态机声明式图 + 边/条件边
工具调用编排混乱if-else 嵌套ToolNode + ReAct Agent
Agent 调试困难黑盒无轨迹LangSmith tracing + 状态回放

典型使用场景

text
1. 多轮对话机器人 (Chatbot)
   └─ StateGraph + MessagesState + Checkpoint
2. ReAct 工具调用 Agent
   └─ create_react_agent (prebuilt) + ToolNode
3. RAG (检索增强生成)
   └─ 分支路由 + 子图 + 长期记忆 Store
4. 多 Agent 协作 (Multi-Agent)
   └─ 子图 + Send/Command + 状态隔离
5. 人机协作 (Human-in-the-Loop)
   └─ interrupt() + checkpointer
6. 计划与执行 (Plan-and-Execute)
   └─ LLM 规划 + 执行循环 + 重规划
7. 反思与改进 (Reflexion)
   └─ 自我评估节点 + 反馈循环
8. 长期记忆 (Long-term Memory)
   └─ BaseStore + 语义检索

1.5 技术栈与依赖

主语言与版本

主语言Python 版本框架依赖
langgraphPython 3.10+3.10-3.13langchain-core, pydantic v2, xxhash
langgraph-checkpointPython 3.10+-langchain-core, ormsgpack
langgraph-prebuiltPython 3.10+-langgraph-checkpoint, langchain-core
langgraph-sdkPython 3.10+-httpx, orjson, langchain-core, websockets
langgraph-cliPython 3.10+-typer, click, docker
langgraph-checkpoint-postgresPython 3.10+-psycopg, asyncpg
langgraph-checkpoint-sqlitePython 3.10+-aiosqlite
sdk-js仅 README 占位-JS SDK 在独立仓库

核心第三方依赖

text
- langchain-core (Runnable 抽象、Messages 类型)
- pydantic v2 (类型校验、状态 Schema)
- ormsgpack (高效序列化,比 msgpack 快)
- xxhash (快速哈希)
- psycopg / asyncpg (Postgres 驱动)
- aiosqlite (SQLite 异步驱动)
- httpx / orjson / websockets (SDK 通信)
- typer / click (CLI)
- redis (可选缓存)

内部架构依赖图

text
checkpoint (基类)
  ├── checkpoint-sqlite (实现)
  ├── checkpoint-postgres (实现)
  ├── prebuilt (依赖)
  └── langgraph (核心)

prebuilt
  └── checkpoint

langgraph (核心)
  ├── checkpoint
  ├── prebuilt
  └── sdk-py (轻量)

cli
  └── (部署工具,不直接被生产代码使用)

sdk-py
  ├── langgraph
  └── cli

sdk-js (独立仓库)

典型 Case 代码样例

最简工作流(入门示例)

python
from langgraph.graph import START, StateGraph
from typing_extensions import TypedDict

class State(TypedDict):
    text: str

def node_a(state: State) -> dict:
    return {"text": state["text"] + "a"}

def node_b(state: State) -> dict:
    return {"text": state["text"] + "b"}

graph = StateGraph(State)
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
result = graph.compile().invoke({"text": ""})
# 输出: {'text': 'ab'}

ReAct Agent(工具调用)

python
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent

def search(query: str):
    """Call to surf the web."""
    if "sf" in query.lower():
        return "It's 60 degrees and foggy."
    return "It's 90 degrees and sunny."

tools = [search]
model = ChatAnthropic(model="claude-3-7-sonnet-latest")
app = create_react_agent(model, tools)

app.invoke({
    "messages": [{"role": "user", "content": "what is the weather in sf"}]
})

有状态对话(带 Checkpoint)

python
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import InMemorySaver

# 编译时挂载 checkpointer
app = workflow.compile(checkpointer=InMemorySaver())

# 通过 thread_id 隔离会话
config = {"configurable": {"thread_id": "user-123"}}
app.invoke({"messages": [...]}, config)

# 下次调用会自动恢复
app.invoke({"messages": [...]}, config)  # 看到历史

Human-in-the-Loop

python
from langgraph.types import interrupt
from langgraph.prebuilt.interrupt import HumanInterrupt, HumanResponse

def my_node(state):
    request: HumanInterrupt = {
        "action_request": {"action": "send_email", "args": {...}},
        "config": {"allow_ignore": True, "allow_respond": True},
        "description": "..."
    }
    response = interrupt([request])[0]
    # 根据 response 继续执行

2. 核心设计思想

2.1 灵感来源

LangGraph is inspired by Pregel and Apache Beam. The public interface draws inspiration from NetworkX.

  • Pregel (Google): 分布式图计算 BSP(Bulk Synchronous Parallel)模型
  • Apache Beam: 统一的批流处理抽象
  • NetworkX: 简洁优雅的图操作 API

2.2 三大核心抽象

text
1. Channel (通道)
   └─ 节点间数据流转的载体
   └─ 类型: LastValue / Topic / Binop / Ephemeral / NamedBarrier

2. Node (节点)
   └─ 计算单元(Python 函数 / Runnable)
   └─ 读 Channel → 计算 → 写 Channel

3. Edge (边) + Branch (条件边)
   └─ 控制节点执行顺序
   └─ 静态边 + 动态条件分支

2.3 Superstep 模型

text
Pregel 采用 BSP 模型:
  Step 1: 所有节点并发执行(读 Channel 旧值)
  Step 2: 同步屏障(Barrier)
  Step 3: 写 Channel(原子提交)
  Step 4: 进入下一个 Superstep

2.4 与生态的关系

LangChain 生态

text
LangChain (高层 LLM 应用框架)
  └── Deep Agents (高层 Agent API)
        └── LangGraph (底层图执行引擎)
              ├── Checkpoint (持久化)
              ├── Prebuilt (ReAct, ToolNode)
              └── SDK (REST API)
                    └── CLI (本地开发/部署)
                          └── LangSmith (云端部署/可观测性)

关键术语

术语含义
Pregel核心执行引擎(类比 Google Pregel)
StateGraph高级有状态图 API(用户主要入口)
Channel节点间数据流通道
Checkpoint状态快照(线程/会话)
Thread会话线程 ID
Superstep图执行的一个原子步骤
Interrupt人类介入的暂停点
Branch条件边(运行时决定下一个节点)
Send动态派发(map-reduce 模式)
Store长期记忆(跨线程)
CacheLLM 响应缓存
ToolNode工具调用执行节点
create_react_agentReAct 模式 Agent 工厂

2.5 选型对比(LangGraph vs 竞品)

维度LangGraph竞品 A竞品 B
抽象层级低(用户控制)
持久化原生 Checkpoint需自实现
图执行BSP / SuperstepDAG事件流
与 LangChain 集成深度
多语言Py + JS单一单一
生产级部署LangSmith 平台需自建需自建
  • 设计哲学 - "Low-level framework",不抽象 prompts 和架构
  • 生态完整 - 从单机到生产部署的完整链路
  • 可扩展性 - Channel 抽象允许自定义状态管理
  • 性能优化 - 使用 ormsgpack、xxhash 等高性能库
  • 安全考虑 - Checkpoint 反序列化可限制模块(LANGGRAPH_STRICT_MSGPACK
  • 标准兼容 - 兼容 Python 3.10-3.13,pydantic v2
  • 可测试性 - checkpoint-conformance 子库提供一致性测试套件
  • JS 生态独立 - JS SDK 迁至独立仓库 langgraphjs

3. 整体架构设计

3.1 架构概述

LangGraph 采用分层架构 + 图式计算模型。整体分为四层:

  • 用户 API 层 (graph/):提供声明式的图构建 DSL,用户通过 StateGraphMessageGraph 定义节点、边、条件分支
  • 执行引擎层 (pregel/):实现 Pregel 风格的图执行引擎,负责节点调度、状态管理、检查点、流式输出
  • 状态通道层 (channels/):管理图状态的具体存储和更新策略,支持 LastValue、BinaryOperator、Delta 等多种通道类型
  • 持久化基础层 (checkpoint/, cache/, store/):提供可插拔的检查点、缓存和长期记忆后端

3.2 整体架构图

text
+===================================================================+
|                       用户 API 层 (Graph API)                      |
|                                                                    |
|  +---------------------------+  +----------------------------+    |
|  |       StateGraph          |  |      MessageGraph           |    |
|  |  add_node / add_edge     |  |  (消息优先图)               |    |
|  |  add_conditional_edges   |  |                             |    |
|  |  compile()               |  |                             |    |
|  +-------------+-------------+  +-------------+--------------+    |
|                |                              |                    |
+================|==============================|====================+
                 |                              |
                 v                              v
+===================================================================+
|                     执行引擎层 (Pregel Runtime)                     |
|                                                                    |
|  +----------------------------------------------------------------+ |
|  |                    Pregel (主循环控制器)                         | |
|  |  +-------------+  +-------------+  +-------------+            | |
|  |  | _loop.py    |  | _runner.py  |  | _executor.py|            | |
|  |  | Superstep   |  | Task 调度   |  | 并行执行器  |            | |
|  |  | 循环调度    |  | 管理        |  |             |            | |
|  |  +------+------+  +------+------+  +------+------+            | |
|  |         |                |                |                    | |
|  +---------+----------------+----------------+--------------------+ |
|            |                |                |                      |
|  +---------v----------------v----------------v--------------------+ |
|  |                    子模块 (横切关注点)                          | |
|  |  +----------+ +----------+ +----------+ +----------+         | |
|  |  |_checkpoint| _read.py  | _write.py | _retry.py  |         | |
|  |  | 检查点   | 状态读取   | 状态写入   | 重试策略   |         | |
|  |  +----------+ +----------+ +----------+ +----------+         | |
|  |  +----------+ +----------+ +----------+ +----------+         | |
|  |  |_algo.py  | _validate | _config.py| _io.py     |         | |
|  |  | 图算法   | 结构验证   | 配置管理   | I/O 映射   |         | |
|  |  +----------+ +----------+ +----------+ +----------+         | |
|  +---------------------------------------------------------------+ |
+================|====================================================+
                 |
                 v
+===================================================================+
|                     状态通道层 (Channels)                           |
|                                                                    |
|  +----------+  +----------+  +----------+  +----------+          |
|  |LastValue |  |BinaryOp  |  |Delta     |  |Topic     |          |
|  |覆盖策略  |  |聚合策略  |  |增量(β)   |  |发布订阅  |          |
|  +----------+  +----------+  +----------+  +----------+          |
+================|====================================================+
                 |
                 v
+===================================================================+
|                   持久化基础层 (Persistence)                        |
|                                                                    |
|  +------------------+  +------------------+  +------------------+ |
|  |   checkpoint/    |  |     cache/       |  |     store/       | |
|  |  BaseCheckpoint  |  |  BaseCache       |  |  BaseStore       | |
|  |  Saver           |  |                  |  |  (长期记忆)      | |
|  |  +-----------+   |  |  +-----------+   |  |  +-----------+   | |
|  |  | Postgres  |   |  |  | Memory    |   |  |  | Memory    |   | |
|  |  | SQLite    |   |  |  | Redis     |   |  |  | (可扩展)  |   | |
|  |  | Memory    |   |  |  +-----------+   |  |  +-----------+   | |
|  |  +-----------+   |  |                  |  |                  | |
|  +------------------+  +------------------+  +------------------+ |
+===================================================================+

分层职责说明

职责对外接口依赖关系
用户 API 层提供声明式图构建 DSLStateGraph, MessageGraph, add_node, add_edge, compile()依赖执行引擎层
执行引擎层图编译、节点调度、状态管理、流式输出Pregel, PregelProtocol依赖状态通道层 + 持久化层
状态通道层管理不同更新策略的状态存储BaseChannel仅被执行引擎层使用
持久化基础层检查点、缓存、长期记忆的抽象和实现BaseCheckpointSaver, BaseCache, BaseStore独立可插拔

3.3 目录结构

shell
langgraph/                                         # 【核心基建】LangGraph 单体仓库根目录
├── .github/                                       # 【配置】GitHub CI/CD 与社区管理
   ├── workflows/                                 # 【配置】GitHub Actions CI 工作流 (lint, test, release)
   ├── ISSUE_TEMPLATE/                            # 【配置】Issue 模板
   ├── PULL_REQUEST_TEMPLATE.md                   # 【配置】PR 模板
   ├── THREAT_MODEL.md                            # 【配置】安全威胁模型 (58KB 详细文档)
   ├── dependabot.yml                             # 【配置】依赖自动更新配置
   ├── actions/                                   # 【配置】自定义 Composite Actions
   ├── images/                                    # 【配置】Logo 资源 (dark/light)
   └── scripts/                                   # 【工具集】CI 辅助脚本
├── docs/                                          # 【配置】文档辅助资源
   ├── generate_redirects.py                      # 【工具集】重定向规则生成脚本
   ├── redirects.json                             # 【配置】URL 重定向映射表
   └── llms.txt                                   # 【配置】LLM 友好的文档索引文件
├── libs/                                          # 【核心基建】所有库的父目录
   ├── checkpoint/                                # 【核心基建】检查点基础接口 (checkpoint base)
   ├── pyproject.toml                         # 【配置】langgraph-checkpoint v4.1.1
   └── langgraph/
       ├── checkpoint/
   ├── base/
   ├── __init__.py                # 【核心基建】BaseCheckpointSaver - 检查点抽象基类
   └── id.py                      # 【工具集】UUID6 分布式 ID 生成器
   ├── serde/
   ├── __init__.py                # 【核心基建】序列化模块导出
   ├── types.py                   # 【核心基建】ChannelProtocol, 序列化类型定义
   ├── base.py                    # 【核心基建】SerializerProtocol 序列化协议
   ├── jsonplus.py                # 【核心基建】JsonPlusSerializer - JSON+MessagePack 序列化
   ├── _msgpack.py                # MessagePack 序列化内部实现
   ├── encrypted.py               # 【核心基建】EncryptedSerializer - 加密序列化
   └── event_hooks.py             # 【核心基建】序列化事件钩子
   └── memory/
       └── __init__.py                # 【核心基建】InMemorySaver - 内存检查点 (开发/测试用)
       ├── cache/
   ├── base/
   └── __init__.py                # 【核心基建】BaseCache - 缓存抽象基类
   ├── memory/
   └── __init__.py                # 【工具集】InMemoryCache - 内存缓存实现
   └── redis/
       └── __init__.py                # 【工具集】RedisCache - Redis 缓存实现
       └── store/
           ├── base/
   ├── __init__.py                # 【核心基建】BaseStore - 长期记忆存储基类
   ├── embed.py                   # 【核心基建】Embedding 向量化搜索辅助
   └── batch.py                   # 【核心基建】批量操作辅助
           └── memory/
               └── __init__.py                # 【工具集】InMemoryStore - 内存长期记忆存储
   ├── checkpoint-conformance/                    # 【工具集】检查点一致性测试套件
   ├── pyproject.toml                         # 【配置】langgraph-checkpoint-conformance v0.0.2
   └── langgraph/
       └── checkpoint/
           └── conformance/                   # 【工具集】Checkpointer 实现的一致性验证测试
   ├── checkpoint-postgres/                       # 【业务模块】Postgres 检查点持久化实现
   ├── pyproject.toml                         # 【配置】langgraph-checkpoint-postgres v3.1.0
   └── langgraph/
       └── checkpoint/
           └── postgres/                      # 【业务模块】AsyncPostgresSaver - psycopg 异步实现
   ├── checkpoint-sqlite/                         # 【业务模块】SQLite 检查点持久化实现
   ├── pyproject.toml                         # 【配置】langgraph-checkpoint-sqlite v3.1.0
   └── langgraph/
       └── checkpoint/
           └── sqlite/                        # 【业务模块】AsyncSqliteSaver - aiosqlite 实现
   ├── cli/                                       # 【业务模块】LangGraph CLI 命令行工具
   ├── pyproject.toml                         # 【配置】langgraph-cli (Click 框架)
   └── langgraph_cli/
       ├── cli.py                             # 【核心基建】CLI 入口 (Click 命令组注册)
       ├── deploy.py                          # 【业务模块】deploy 命令 - 生产部署到 LangSmith
       ├── dev.py                             # 【业务模块】dev 命令 - 本地开发服务器
       ├── docker.py                          # 【业务模块】Docker 镜像构建与容器管理
       ├── config.py                          # 【工具集】langgraph.json 配置解析
       ├── analytics.py                       # 【工具集】匿名使用统计 (可 opt-out)
       ├── exec.py                            # 【工具集】子进程执行与 Runner 管理
       ├── progress.py                        # 【工具集】终端进度显示
       ├── templates.py                       # 【工具集】项目模板脚手架 (new 命令)
       ├── schemas.py                         # 【核心基建】配置数据模型
       ├── dependency_tracking.py             # 【工具集】Python 依赖追踪
       ├── host_backend.py                    # 【工具集】主机后端抽象
       ├── archive.py                         # 【工具集】源码打包归档
       ├── util.py                            # 【工具集】通用工具函数
       ├── uv_lock.py                         # 【工具集】uv.lock 文件解析
       ├── _ignore.py                         # 文件忽略规则
       ├── __main__.py                        # 【核心基建】python -m langgraph_cli 入口
       └── constants.py                       # 【配置】CLI 常量
   ├── langgraph/                                 # 【核心基建】核心框架 - 低级编排引擎
   ├── pyproject.toml                         # 【配置】langgraph v1.2.4
   └── langgraph/
       ├── graph/                             # 【核心基建】图构建 API 层 (用户接口)
   ├── __init__.py                    # 【核心基建】导出 StateGraph, MessageGraph, END, START
   ├── state.py                       # 【核心基建】StateGraph 构建器 - 图定义 DSL
   ├── message.py                     # 【核心基建】MessageGraph - 消息优先图
   ├── _node.py                       # 节点内部实现
   ├── _branch.py                     # 条件分支内部实现
   └── ui.py                          # 【工具集】Mermaid 图形可视化生成
       ├── pregel/                            # 【核心基建】Pregel 执行引擎 (核心运行时)
   ├── __init__.py                    # 【核心基建】导出 Pregel, NodeBuilder
   ├── main.py                        # 【核心基建】Pregel 主循环 - 编译图执行入口 (4335 行)
   ├── _loop.py                       # 【核心基建】执行循环 - Superstep 调度核心
   ├── _runner.py                     # 【核心基建】任务执行器 - 单节点运行管理
   ├── _executor.py                   # 【核心基建】并行执行器 - 多任务并发调度
   ├── _algo.py                       # 【核心基建】图算法 - 节点调度拓扑排序
   ├── _read.py                       # 【核心基建】ChannelRead - 状态读取操作
   ├── _write.py                      # 【核心基建】ChannelWrite - 状态写入操作
   ├── _checkpoint.py                 # 【核心基建】检查点管理 - 保存/恢复/父链遍历
   ├── _validate.py                   # 【核心基建】图结构验证 - 合法性检查
   ├── _config.py                     # 【核心基建】配置管理 - configurable 字段合并
   ├── _io.py                         # 【核心基建】I/O 映射 - 节点输入输出绑定
   ├── _call.py                       # 【核心基建】节点调用 - Runnable 执行包装
   ├── _retry.py                      # 【核心基建】重试策略 - 指数退避执行
   ├── _messages.py                   # 【核心基建】消息流处理 - token 级流式
   ├── _tools.py                      # 【核心基建】工具集成辅助
   ├── _draw.py                       # 【工具集】图绘制辅助
   ├── _log.py                        # 【工具集】执行日志记录
   ├── _utils.py                      # 【工具集】Pregel 通用工具
   ├── protocol.py                    # 【核心基建】PregelProtocol - 编译图的抽象协议
   ├── types.py                       # 【核心基建】Pregel 内部类型定义
   ├── debug.py                       # 【核心基建】Debug 流模式实现
   ├── remote.py                      # 【核心基建】远程图执行支持
   └── _remote_run_stream.py          # 远程运行流式传输
       ├── channels/                          # 【核心基建】状态通道系统 (状态管理核心)
   ├── __init__.py                    # 【核心基建】通道模块导出
   ├── base.py                        # 【核心基建】BaseChannel - 通道抽象基类
   ├── last_value.py                  # 【核心基建】LastValue - 最后写入覆盖策略
   ├── binop.py                       # 【核心基建】BinaryOperatorAggregate - 二元聚合 (operator.add)
   ├── delta.py                       # 【核心基建】DeltaChannel - 增量通道 (Beta, 大状态优化)
   ├── ephemeral_value.py             # 【工具集】EphemeralValue - 临时的隐藏状态
   ├── named_barrier_value.py         # 【工具集】NamedBarrierValue - 命名屏障等待
   ├── any_value.py                   # 【工具集】AnyValue - 任意值策略
   └── topic.py                       # 【工具集】Topic - 发布/订阅通道
       ├── managed/                           # 【核心基建】托管值系统 (框架注入的自动计算值)
   ├── __init__.py                    # 【核心基建】托管值模块导出
   ├── base.py                        # 【核心基建】ManagedValueSpec - 托管值规范
   └── is_last_step.py                # 【核心基建】IsLastStepManager - 最后一步检测
       ├── stream/                            # 【核心基建】流式输出子系统
   ├── __init__.py                    # 【核心基建】流式模块导出
   ├── _types.py                      # 【核心基建】流式类型定义
   ├── _convert.py                    # 【核心基建】v1 → v2 流式格式转换
   ├── _mux.py                        # 【核心基建】多模式流多路复用
   ├── run_stream.py                  # 【核心基建】运行流式包装器
   ├── stream_channel.py              # 【核心基建】通道流式监听
   └── transformers.py                # 【核心基建】流式事件转换器
       ├── func/                              # 【核心基建】函数式 API
   └── __init__.py                    # 【核心基建】@task, @entrypoint 函数式装饰器
       ├── _internal/                         # 内部实现 (不应外部依赖)
   ├── __init__.py                    # 内部模块导出
   ├── _cache.py                      # 节点结果缓存
   ├── _config.py                     # 内部配置辅助
   ├── _constants.py                  # 内部常量定义
   ├── _fields.py                     # 内部字段处理
   ├── _future.py                     # 异步 Future 包装
   ├── _pydantic.py                   # Pydantic 模型创建
   ├── _queue.py                      # 内部队列 (Sync/Async)
   ├── _replay.py                     # 状态回放
   ├── _retry.py                      # 默认重试策略
   ├── _runnable.py                   # Runnable 包装
   ├── _scratchpad.py                 # 暂存区 (per-invocation)
   ├── _serde.py                      # 序列化辅助
   ├── _timeout.py                    # 超时策略
   └── _typing.py                     # 内部类型工具
       ├── utils/                             # 【工具集】通用工具
   └── __init__.py                    # 【工具集】工具函数导出
       ├── types.py                           # 【核心基建】全局类型定义 (Send, Command, Interrupt 等)
       ├── config.py                          # 【核心基建】get_config() - 运行时配置获取
       ├── constants.py                       # 【配置】全局常量 (START, END, TAG_HIDDEN)
       ├── errors.py                          # 【核心基建】异常类型 (GraphInterrupt, InvalidUpdateError 等)
       ├── runtime.py                         # 【核心基建】LangGraphRuntime - Agent 运行时上下文
       ├── callbacks.py                       # 【核心基建】LangGraph 回调管理器
       ├── typing.py                          # 【核心基建】泛型类型变量 (StateT, InputT, OutputT)
       ├── version.py                         # 【配置】版本号
       └── warnings.py                        # 【工具集】弃用警告工具类
   ├── prebuilt/                                  # 【业务模块】高级 API - 预构建 Agent/Tool 组件
   ├── pyproject.toml                         # 【配置】langgraph-prebuilt v1.1.0
   └── langgraph/
       └── prebuilt/
           ├── __init__.py                    # 【核心基建】导出 create_react_agent, ToolNode, tools_condition
           ├── chat_agent_executor.py         # 【业务模块】create_react_agent - ReAct Agent 工厂
           ├── tool_node.py                   # 【业务模块】ToolNode - 工具调用执行节点
           ├── tool_validator.py              # 【业务模块】ValidationNode - 工具参数验证
           ├── _tool_call_transformer.py      # 工具调用流式转换
           ├── _tool_call_stream.py           # 工具调用流式处理
           └── interrupt.py                   # 【业务模块】中断辅助工具
   ├── sdk-js/                                    # 【业务模块】JS/TS SDK (仅 README 占位)
   └── README.md                              # 【配置】SDK 说明 (实际代码在 langgraphjs 仓库)
   └── sdk-py/                                    # 【业务模块】Python SDK - LangGraph Server 客户端
       ├── pyproject.toml                         # 【配置】langgraph-sdk (动态版本)
       └── langgraph_sdk/
           ├── __init__.py                        # 【核心基建】SDK 版本与模块导出
           ├── client.py                          # 【核心基建】LangGraphClient - HTTP/WS 主客户端
           ├── schema.py                          # 【核心基建】API 数据模型 (Assistants, Threads, Runs)
           ├── sse.py                             # 【核心基建】Server-Sent Events 流式解析
           ├── runtime.py                         # 【核心基建】Runtime 管理
           ├── auth/                              # 【业务模块】认证模块 (API Key, Token)
           ├── encryption/                        # 【业务模块】加密模块
           ├── errors.py                          # 【核心基建】SDK 异常类型
           ├── cache.py                           # 【工具集】SDK 缓存
           ├── _async/                            # 异步客户端实现
           ├── _sync/                             # 同步客户端实现
           ├── _shared/                           # 共享内部逻辑
           └── stream/                            # 【核心基建】流式响应处理
├── README.md                                      # 【配置】英文项目说明
├── README-CN.md                                   # 【配置】中文项目说明 (本分析生成)
├── CLAUDE.md                                      # 【配置】AI Agent 工作指令
├── Makefile                                       # 【配置】顶层构建脚本
└── .gitignore                                     # 【配置】Git 忽略规则

4. 模块依赖与调用关系

4.1 全局入口与核心路由

  • 逻辑说明:用户通过 StateGraph 构建图,调用 compile() 生成 CompiledStateGraph (继承自 Pregel)。Pregel 是核心执行引擎,实现了 PregelProtocol 和 LangChain 的 Runnable 接口。执行时通过 invoke() / stream() / astream() 进入主循环 _loop.py,由 _algo.py 计算调度拓扑,_executor.py 并发执行任务,_runner.py 管理单个节点运行。

  • 调用拓扑

text
StateGraph.compile() (用户入口)
 +---> Pregel.__init__()
        |
        +---> _validate.validate_graph() (验证节点/边合法性)
        |
        +---> Channel.subscribe_to() (建立通道-节点订阅关系)
        |
        +---> Pregel.invoke() / astream() (执行入口)
               |
               +---> _loop.PregelLoop (主循环)
                      |
                      +---> _checkpoint.get_checkpoint() (恢复/创建检查点)
                      |
                      +---> _algo.prepare_next_tasks() (计算下一步任务)
                      |     |
                      |     +---> _read.ChannelRead (读取通道值)
                      |     +---> _write.ChannelWrite (准备通道写入)
                      |
                      +---> _executor.execute_tasks() (并行执行任务)
                      |     |
                      |     +---> _runner.PregelRunner (单任务管理)
                      |     |     |
                      |     |     +---> _call.call() (实际执行节点)
                      |     |     +---> _retry.PregelRetry (重试逻辑)
                      |     |     +---> _messages.stream_messages() (消息流)
                      |     |
                      |     +---> _io.map_input() (输入映射)
                      |     +---> _io.map_output_updates() (输出映射)
                      |
                      +---> _checkpoint.create_checkpoint() (保存检查点)
                      |
                      +---> stream/ (流式输出分发)
                             |
                             +---> _mux.StreamMultiplexer (多模式多路复用)
                             +---> _convert.convert_to_v2() (格式转换)

4.2 核心业务实体与关联

  • 实体定义
实体描述核心属性
StateGraph用户构建的图定义nodes, edges, conditional_edges, state_schema
Pregel (CompiledStateGraph)编译后的可执行图channels, nodes, checkpointer, stream_modes
Checkpoint某一时刻的状态快照channel_values, channel_versions, versions_seen
PregelExecutableTask待执行的任务单元name, input, proc (Runnable), config, triggers
Channel状态通道类型: LastValue / BinaryOperator / Delta / Topic
Send动态路由消息node (目标节点), arg (携带状态)
Command执行控制指令graph, update, resume, goto
Interrupt中断信号value, id
  • 实体引用拓扑
text
[StateGraph] 1 ----> 1 [Pregel (CompiledStateGraph)]
                             |
                             +-- 1 ----> N [BaseChannel]
                             |              |
                             |              +-- LastValue
                             |              +-- BinaryOperatorAggregate
                             |              +-- DeltaChannel
                             |              +-- Topic
                             |
                             +-- 1 ----> 0..1 [BaseCheckpointSaver]
                             |                 |
                             |                 +-- InMemorySaver
                             |                 +-- AsyncPostgresSaver
                             |                 +-- AsyncSqliteSaver
                             |
                             +-- 1 ----> N [PregelExecutableTask]
                             |              |
                             |              +-- proc: Runnable (实际节点函数)
                             |              +-- triggers: 触发通道
                             |
                             +-- 1 ----> N [Send] (动态路由)
                             +-- 1 ----> N [Command] (控制指令)
                             +-- 0..N -> N [Subgraph] (嵌套子图)

5. 核心模块详解

模块一:Pregel 执行引擎

  • 模块名称:Pregel Execution Engine (pregel/)

  • 设计说明:Pregel 是整个 LangGraph 的心脏,实现了受 Google Pregel 启发的超步 (Superstep) 执行模型。每轮超步包含三个步骤:(1) 计划 — 根据通道版本确定需要执行的节点;(2) 执行 — 并行执行所有就绪任务;(3) 更新 — 将结果写入通道并保存检查点。采用策略模式支持多种耐久性模式 (sync, async, exit)。

  • 内部结构图

text
+===========================================================+
|                    Pregel (主控制器)                        |
|  compile() | invoke() | stream() | astream()              |
+==========================+================================+
                           |
              +----------- v -----------+
              |    _loop.py (主循环)     |
              |  PregelLoop.superstep()  |
              +-----------+-------------+
                          |
        +-----------------+------------------+
        |                 |                  |
  +-----v-----+   +------v------+   +------v------+
  |_algo.py   |   |_executor.py |   |_checkpoint.py|
  |调度算法   |   |并行执行器   |   |检查点管理    |
  |prepare_   |   |execute_     |   |save/load/    |
  |next_tasks |   |tasks()      |   |parent_chain  |
  +-----------+   +------+------+   +--------------+
                          |
               +----------v----------+
               |  _runner.py         |
               |  PregelRunner       |
               |  call() / tick()    |
               +----------+----------+
                          |
        +-----------------+-------------------+
        |                 |                   |
  +-----v-----+   +------v------+   +-------v------+
  |_call.py   |   |_retry.py    |   |_messages.py  |
  |节点调用   |   |重试/退避    |   |消息流式处理 |
  +-----------+   +-------------+   +--------------+

模块二:StateGraph 图构建器

  • 模块名称:StateGraph Builder (graph/state.py)

  • 设计说明StateGraph 是用户构建 Agent 工作流的主入口,采用构建器模式 + 编译模式。用户通过链式调用 add_node(), add_edge(), add_conditional_edges() 定义图结构,最后调用 compile() 将声明式定义编译为可执行的 Pregel 实例。期间自动进行通道创建、节点包装 (coerce_to_runnable)、图结构验证。

  • 内部结构图

text
+===========================================================+
|                     StateGraph                             |
|  +-----------------------------------------------------+  |
|  | nodes: dict[str, StateNodeSpec]                      |  |
|  | channels: dict[str, BaseChannel]                    |  |
|  | edges: set[tuple[str, str]]                         |  |
|  | branches: dict[str, list[BranchSpec]]               |  |
|  | entry_point: str                                     |  |
|  +-----------------------------------------------------+  |
|                                                           |
|  add_node(name, action) ---> 包装为 StateNodeSpec        |
|  add_edge(from, to)       ---> 添加普通边                 |
|  add_conditional_edges()  ---> 添加 BranchSpec             |
|  compile(checkpointer)    ---> +                         |
+==================================|========================+
                                   |
                                   v
+===========================================================+
|                 CompiledStateGraph (Pregel)                |
|  +-----------------------------------------------------+  |
|  | 自动创建:                                            |  |
|  | - ChannelWrite / ChannelRead (每个节点)               |  |
|  | - branch:to:{node} 通道 (条件路由)                    |  |
|  | - __start__ / __end__ 虚拟节点                        |  |
|  | - 调度拓扑 (PregelNode 列表)                          |  |
|  +-----------------------------------------------------+  |
+===========================================================+

模块三:状态通道系统

  • 模块名称:Channel System (channels/)

  • 设计说明:通道是 LangGraph 状态管理的核心抽象,每个状态 key 对应一个通道实例。通道定义了状态如何在多节点并发写入时合并。支持多种合并策略:LastValue (直接覆盖)、BinaryOperatorAggregate (如 operator.add 累加)、DeltaChannel (增量更新,用于大状态优化)、Topic (发布/订阅模式)。

  • 内部结构图

text
+===========================================================+
|                   BaseChannel (抽象基类)                    |
|  update(values) | get() | checkpoint() | consume()        |
+==========================+================================+
                           |
          +----------------+-------------------+
          |                |                   |
  +-------v------+  +-----v------+  +--------v--------+
  | LastValue    |  | BinaryOp   |  | DeltaChannel    |
  | (覆盖策略)  |  | Aggregate  |  | (增量Beta)     |
  |              |  | (聚合策略) |  |                 |
  | update:      |  | update:    |  | update: 追加   |
  |   覆盖旧值   |  |   累加合并 |  |   增量写入     |
  +--------------+  +------------+  +-----------------+
          |                |                   |
  +-------v------+  +-----v------+  +--------v--------+
  |EphemeralValue|  |NamedBarrier|  | Topic            |
  |(临时不可见)  |  |(命名屏障)  |  | (发布/订阅)     |
  +--------------+  +------------+  +------------------+

模块四:检查点持久化系统

  • 模块名称:Checkpoint System (checkpoint/)

  • 设计说明:采用端口-适配器模式BaseCheckpointSaver 定义检查点的标准接口 (get, put, list, put_writes, delete_thread, copy_thread, prune),具体实现 (Memory, SQLite, Postgres) 作为适配器注入。序列化层 (serde/) 使用 JSON + MessagePack 混合方案,支持加密序列化。使用 UUID6 作为检查点 ID,保证唯一性和时间有序性。

  • 内部结构图

text
+===========================================================+
|                BaseCheckpointSaver (端口)                   |
|                                                           |
|  get(config) -> Checkpoint                                |
|  put(config, checkpoint, metadata) -> Config               |
|  put_writes(config, writes, task_id)                       |
|  list(config, filter, before, limit) -> Iterator           |
|  get_tuple(config) -> CheckpointTuple                     |
|  delete_thread(thread_id)                                  |
|  copy_thread(src, dst)                                     |
|  prune(thread_ids, strategy)                               |
+==========================+================================+
                           |
           +---------------+---------------+
           |               |               |
  +--------v-------+ +-----v------+ +----v---------+
  | InMemorySaver  | | Postgres   | | SQLite       |
  | (内存, 开发)   | | Saver      | | Saver        |
  |                | | (生产)     | | (轻量/边缘)  |
  | dict 存储      | | psycopg    | | aiosqlite    |
  +----------------+ +------------+ +--------------+
           |
           | serde: SerializerProtocol
           v
  +===========================================================+
  |                    序列化层 (serde/)                        |
  |  JsonPlusSerializer (JSON + MessagePack 混合)             |
  |  EncryptedSerializer (AES 加密包装)                        |
  +===========================================================+

模块五:CLI 命令行系统

  • 模块名称:LangGraph CLI (cli/)

  • 设计说明:基于 Click 框架构建的命令行工具,提供 langgraph dev (本地开发)、langgraph build (构建 Docker 镜像)、langgraph deploy (部署到 LangSmith Cloud)、langgraph new (脚手架创建项目) 等命令。通过 langgraph.json 配置文件声明依赖、图和环境变量。

  • 内部结构图

text
+===========================================================+
|                  CLI 入口 (cli.py)                          |
|  @click.group() -> langgraph 命令                          |
+==========================+================================+
                           |
    +----------------------+----------------------+
    |                      |                      |
+---v--------+   +---------v------+   +---------v------+
| dev.py     |   | deploy.py      |   | templates.py   |
| langgraph  |   | langgraph      |   | langgraph      |
| dev        |   | deploy         |   | new            |
|            |   |                |   |                |
| 本地开发   |   | 部署到        |   | 项目脚手架     |
| 服务器     |   | LangSmith      |   | 生成           |
+----+-------+   +-------+--------+   +----------------+
     |                   |
     v                   v
+----+-------+   +-------+--------+
| docker.py  |   | config.py      |
| Docker 镜像|   | langgraph.json |
| 构建与运行|   | 配置解析       |
+------------+   +----------------+

6. 关键数据流程

场景一:标准图执行流程 (invoke)

场景说明:用户通过 graph.invoke(input, config) 触发一次完整的图执行。从输入状态开始,经过多个节点的串联执行,最终返回最终状态。过程中自动保存检查点,支持从中断恢复。

场景二:Human-in-the-Loop 中断与恢复

  • 场景说明:节点调用 interrupt() 函数暂停执行,抛出 GraphInterrupt 异常。检查点保存当前状态。用户通过 Command(resume=value) 恢复执行,从同一节点的开头重跑。

场景三:Map-Reduce 并行模式 (Send)

  • 场景说明:条件边函数返回多个 Send 对象,每个指向同一个节点但携带不同的状态。执行引擎并行执行所有 Send 任务,结果通过 BinaryOperator 通道自动聚合。

7. 接口与契约规范

7.1 核心内部模块契约

typescript
/**
 * PregelProtocol - 编译图的抽象协议
 * 
 * 所有编译后的图 (CompiledStateGraph) 和子图都实现此接口。
 * 继承自 LangChain 的 Runnable 接口,提供完整的执行生命周期。
 */
interface PregelProtocol<StateT, ContextT, InputT, OutputT> extends Runnable<InputT, Any> {
  // === 配置 ===
  with_config(config?: RunnableConfig, ...kwargs: Any): PregelProtocol;

  // === 图结构 ===
  get_graph(config?: RunnableConfig, xray?: number | boolean): DrawableGraph;

  // === 状态检查 ===
  get_state(config: RunnableConfig, subgraphs?: boolean): StateSnapshot;
  get_state_history(
    config: RunnableConfig,
    filter?: Record<string, Any>,
    before?: RunnableConfig,
    limit?: number
  ): Iterator<StateSnapshot>;

  // === 状态更新 ===
  update_state(
    config: RunnableConfig,
    values: Record<string, Any> | Any | null,
    as_node?: string
  ): RunnableConfig;

  // === 执行 (带版本控制) ===
  invoke(
    input: InputT | Command | null,
    config?: RunnableConfig,
    context?: ContextT,
    stream_mode?: StreamMode | StreamMode[],
    version?: "v1" | "v2"
  ): GraphOutput<OutputT> | Record<string, Any> | Any;

  stream(
    input: InputT | Command | null,
    config?: RunnableConfig,
    stream_mode?: StreamMode | StreamMode[],
    version?: "v1" | "v2"
  ): Iterator<StreamPart<StateT, OutputT>>;
}
typescript
/**
 * BaseCheckpointSaver - 检查点持久化抽象
 * 
 * 所有检查点实现 (Memory, Postgres, SQLite) 都继承此基类。
 * 采用模板方法模式:同步方法默认调用异步实现。
 */
interface BaseCheckpointSaver<V extends int | float | str> {
  // 序列化器
  serde: SerializerProtocol;

  // === 读取 ===
  get(config: RunnableConfig): Checkpoint | null;
  get_tuple(config: RunnableConfig): CheckpointTuple | null;

  // === 写入 ===
  put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions
  ): RunnableConfig;

  put_writes(
    config: RunnableConfig,
    writes: Sequence<[string, Any]>,
    task_id: string,
    task_path?: string
  ): void;

  // === 列表 ===
  list(
    config?: RunnableConfig,
    filter?: Record<string, Any>,
    before?: RunnableConfig,
    limit?: number
  ): Iterator<CheckpointTuple>;

  // === 管理 ===
  delete_thread(thread_id: string): void;
  delete_for_runs(run_ids: string[]): void;
  copy_thread(source_thread_id: string, target_thread_id: string): void;
  prune(thread_ids: string[], strategy?: "keep_latest" | "delete"): void;

  // === 版本 ===
  get_next_version(current: V | null, channel: null): V;
}
typescript
/**
 * ChannelProtocol - 状态通道抽象
 * 
 * 每个状态 key 对应一个 Channel 实例,定义状态更新和读取策略。
 */
interface ChannelProtocol<ValueType> {
  /** 唯一标识 */
  key: string;

  /** 更新状态 (可能被多个节点并发调用) */
  update(values: Sequence<ValueType>): void;

  /** 获取当前值 */
  get(): ValueType;

  /** 消费值 (用于触发节点执行) */
  consume() -> boolean;

  /** 生成检查点快照 */
  checkpoint(): ValueType | null;

  /** 从检查点恢复 */
  from_checkpoint(checkpoint: ValueType): void;
}

7.2 对外 API 契约 (LangGraph Server API)

yaml
# LangGraph Server 创建 Assistants API
paths:
  /assistants:
    post:
      summary: 创建 Assistant
      requestBody:
        content:
          application/json:
            schema:
              type: object
              required: [graph_id, config]
              properties:
                graph_id:
                  type: string
                  description: 已注册的图 ID
                config:
                  type: object
                  description: 默认配置
                metadata:
                  type: object
      responses:
        '201':
          description: Assistant 创建成功
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Assistant'

  /threads:
    post:
      summary: 创建对话线程
      requestBody:
        content:
          application/json:
            schema:
              type: object
              properties:
                metadata:
                  type: object
      responses:
        '201':
          description: Thread 创建成功
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Thread'

  /threads/{thread_id}/runs:
    post:
      summary: 在指定线程上执行图
      parameters:
        - name: thread_id
          in: path
          required: true
          schema:
            type: string
      requestBody:
        content:
          application/json:
            schema:
              type: object
              required: [assistant_id]
              properties:
                assistant_id:
                  type: string
                input:
                  type: object
                config:
                  type: object
                stream_mode:
                  type: array
                  items:
                    type: string
                    enum: [values, updates, messages, custom, debug]
      responses:
        '200':
          description: Run 创建成功,返回流式事件
          content:
            text/event-stream:
              schema:
                type: string

  /threads/{thread_id}/runs/{run_id}/join:
    get:
      summary: 等待 Run 完成并返回最终状态
      parameters:
        - name: thread_id
          in: path
          required: true
          schema:
            type: string
        - name: run_id
          in: path
          required: true
          schema:
            type: string
      responses:
        '200':
          description: Run 完成
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Run'

8. 快速开始

8.1 环境配置

bash
# 安装核心库
pip install -U langgraph

# 安装检查点后端 (可选)
pip install -U langgraph-checkpoint-sqlite   # 开发/轻量
pip install -U langgraph-checkpoint-postgres  # 生产

8.2 构建第一个 Agent

python
from langgraph.graph import StateGraph, END, START
from typing import TypedDict, Annotated
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]

graph = StateGraph(State)
graph.add_node("agent", call_llm)
graph.add_node("tools", call_tools)
graph.add_edge("agent", "tools")
graph.add_conditional_edges("tools", decide_next, {"continue": "agent", "end": END})
graph.add_edge(START, "agent")

app = graph.compile()
result = app.invoke({"messages": [HumanMessage(content="你好!")]})