LangGraphJS 解读
项目信息
- 项目名称:LangGraphJS
- 项目描述:LangGraphJS 是一个低层级、可组合的 Agent 编排框架,专为构建具备持久化状态、支持人工干预、可长期运行的 AI Agent 工作流而生。它是 LangGraph(Python 版的 JavaScript/TypeScript 对应实现),被 Replit、Uber、LinkedIn、GitLab 等企业用于生产环境。
- 项目地址:https://github.com/langchain-ai/langgraphjs
- 官方文档:https://docs.langchain.com/oss/javascript/langgraph/overview
1. 项目概览
1.1 项目定位与核心价值
LangGraphJS 是一个低层级、可组合的 Agent 编排框架,专为构建具备持久化状态、支持人工干预、可长期运行的 AI Agent 工作流而生。它是 LangGraph(Python 版的 JavaScript/TypeScript 对应实现),被 Replit、Uber、LinkedIn、GitLab 等企业用于生产环境。
- 安装:
npm install @langchain/langgraph @langchain/core - 仓库:https://github.com/langchain-ai/langgraphjs
- 许可证:MIT
解决的核心痛点:
| 痛点 | 解决方案 |
|---|---|
| Agent 状态不可控 | 通过有状态图(Stateful Graph)让每一步执行可追踪、可中断、可恢复 |
| 长流程不可靠 | 检查点(Checkpoint)机制提供持久执行(Durable Execution),自动从断点恢复 |
| 人工介入困难 | 内置 interrupt() 断点中断和 Command({ resume }) 人工审批机制 |
| 跨会话记忆缺失 | BaseStore 长期键值存储 + 短期状态 Channels 双轨记忆系统 |
1.2 目标用户与使用场景
目标用户:
- LLM 应用开发者(需要构建复杂 Agent 工作流的 Node.js/TypeScript 开发者)
- 企业级用户(已有 Replit、Uber、LinkedIn、GitLab 等生产验证)
- 框架集成者(需在现有应用中嵌入可控 AI 工作流)
核心业务场景:
- 智能客服系统:多轮对话 + 多分支路由,在退款/投诉等关键节点引入人工审批
- 数据分析流水线:数据采集→清洗→分析→报告生成的多 Agent 自动化链路
- 代码生成与审查:代码编写→单元测试→代码审查→安全扫描的多步骤工作流
- RAG 增强检索:查询改写→检索→重排序→答案生成的多阶段流程
1.3 技术栈与选型对比
| 层级 | 技术选择 | 选型理由 |
|---|---|---|
| 语言/运行时 | TypeScript (ES2021), Node.js >=18 | 类型安全 + 现代 JS 特性 |
| 包管理 | pnpm + Turborepo (monorepo) | 高效多包管理,增量构建 |
| 代码检查 | oxlint + oxfmt (oxc) | Rust 实现,比 ESLint 快 50-100x |
| 测试 | Vitest + Playwright | 快速单元测试 + E2E 浏览器测试 |
| 执行模型 | Pregel (Google Pregel 论文) | 超步执行模型天然适合状态恢复 |
| 状态管理 | 自研 Channels 系统 | 版本化的状态变更追踪 |
| 数据校验 | Zod v3/v4 + @standard-schema | 运行时类型校验与 Schema 生成 |
| 多框架适配 | React 19, Svelte 5, Vue 3, Angular | 覆盖主流前端生态 |
1.4 生态位分析
Deep Agents (高层 Agent 封装)
↑ 基于
LangGraph (本仓库 - 底层编排引擎)
↑ 可选集成
LangChain (工具/模型集成)
↑ 依赖
@langchain/core (核心抽象)2. 整体架构设计
2.1 架构概述
LangGraphJS 采用经典的四层分层架构,自底向上依次为:
- Channels 层:提供状态通道原语(LastValue、Topic、BinaryOperatorAggregate 等),是状态管理的原子单元。每个 Channel 都是一个类型安全的、可版本化的状态容器,支持从检查点恢复。
- Checkpointer 层:负责状态持久化与恢复,提供 BaseCheckpointSaver 抽象接口,支持 SQLite、PostgreSQL、MongoDB、Redis 等多种后端。同时提供 Store(长期存储)和 Cache(缓存)机制。
- Pregel 层:核心执行引擎,实现基于消息传递的超步(Superstep)执行模型。每个超步中,框架确定哪些节点应该执行(基于通道更新触发),并在每个超步结束时持久化检查点。
- Graph 层:面向用户的高级 API,提供 Graph、StateGraph、MessageGraph 等便于定义工作流的声明式接口。StateGraph 是使用最广泛的入口,基于 Annotation 系统定义状态结构。
外部对接 LangSmith(可观测性)、LangChain(模型/工具集成)、多框架 SDK(React/Svelte/Vue/Angular)。
2.2 整体架构图
+=======================================================================+
| LangGraphJS 整体架构 |
+=======================================================================+
|
+-----------------------------------------------------------------------+
| Graph 层 (用户 API 层) |
| |
| +------------------+ +------------------+ +--------------------+ |
| | StateGraph | | Graph | | MessageGraph | |
| | [addNode] | | [addNode] | | [消息专用简化接口] | |
| | [addEdge] | | [addEdge] | +--------------------+ |
| | [addConditional] | | [addConditional] | |
| | [compile] ───────> | CompiledGraph | <───── 返回给用户 |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------------------------------+ |
| | Annotation | | Functional API: entrypoint() / task() | |
| | [状态结构定义] | | [函数式编程模型] | |
| +------------------+ +------------------------------------------+ |
+----------------------------+------------------------------------------+
| 基于
v
+-----------------------------------------------------------------------+
| Pregel 层 (执行引擎层) |
| |
| +---------------------+ +---------------------+ |
| | PregelLoop | | PregelRunner | |
| | [超步循环控制] |<-->| [并行任务执行] | |
| | [中断/恢复判断] | | [重试策略] | |
| +----------+----------+ +----------+----------+ |
| | | |
| +----------v--------------------------v----------+ |
| | Pregel (主控制器) | |
| | [invoke/stream/updateState] | |
| | [通道读取] [任务调度] [检查点持久化] | |
| +----------+----------------------+--------------+ |
| | | |
| +----------v----------+ +--------v-----------+ |
| | algo.ts | | io.ts | |
| | [_applyWrites] | | [readChannels] | |
| | [_prepareNextTasks] | | [mapInput] | |
| | [shouldInterrupt] | | [mapOutputUpdates] | |
| +----------------------+ +--------------------+ |
+----------------------------+------------------------------------------+
| 使用
v
+-----------------------------------------------------------------------+
| Channels 层 (状态通道层) |
| |
| +-------------+ +------------+ +------------------+ +-----------+ |
| | LastValue | | Topic | | BinaryOperator | | AnyValue | |
| | [单值存储] | | [发布/订阅] | | Aggregate | | [任意值] | |
| | [每步一值] | | [去重/累积] | | [归约操作] | | | |
| +------+------+ +-----+------+ +--------+---------+ +-----+-----+ |
| | | | | |
| +------v--------------v-----------------v------------------v-----+ |
| | BaseChannel<Value, Update, Checkpoint> | |
| | [fromCheckpoint()] [update()] [get()] [checkpoint()] | |
| +----------------------------------------------------------------+ |
| +------------------+ +------------------+ +------------------+ |
| | EphemeralValue | | NamedBarrierValue| |DynamicBarrierValue| |
| | [临时值,不持久化] | | [命名屏障同步] | | [动态屏障同步] | |
| +------------------+ +------------------+ +------------------+ |
+----------------------------+------------------------------------------+
| 持久化
v
+-----------------------------------------------------------------------+
| Checkpoint 层 (持久化层) |
| |
| +---------------------+ +---------------------+ |
| | BaseCheckpointSaver | | BaseStore | |
| | [检查点保存/加载] | | [长期键值存储] | |
| | [list()] [getTuple()]| | [put()] [get()] | |
| | [put()] [putWrites()]| | [search()] [list()] | |
| +----------+-----------+ +----------+----------+ |
| | | |
| +----------v--------------------------v--------------------------+ |
| | 后端实现 (插件化架构) | |
| | +--------+ +--------+ +---------+ +-------+ +--------+ | |
| | | SQLite | |Postgres| | MongoDB | | Redis | | Memory | | |
| | +--------+ +--------+ +---------+ +-------+ +--------+ | |
| +----------------------------------------------------------------+ |
| +----------------------------------------------------------------+ |
| | Serde (序列化层) | |
| | [JsonPlusSerializer] [fast-safe-stringify] | |
| +----------------------------------------------------------------+ |
+-----------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------+
| 外部系统 |
| +------------------+ +------------------+ +--------------------+ |
| | LangSmith | | LangChain Core | | LangGraph SDK | |
| | [可观测性/追踪] | | [模型/工具集成] | | [API 客户端] | |
| +------------------+ +------------------+ +--------------------+ |
+-----------------------------------------------------------------------+2.3 目录结构
langgraphjs/
├── libs/ # 【核心基建】所有可发布包 (pnpm monorepo)
│ │
│ ├── langgraph-core/ # 【核心基建】核心库 - 发布为 @langchain/langgraph
│ │ ├── src/
│ │ │ ├── channels/ # 【核心基建】状态通道系统
│ │ │ │ ├── base.ts # 【核心基建】BaseChannel 抽象基类 + createCheckpoint
│ │ │ │ ├── last_value.ts # 【核心基建】LastValue + LastValueAfterFinish
│ │ │ │ ├── topic.ts # 【核心基建】Topic 发布-订阅通道 (unique/accumulate)
│ │ │ │ ├── binop.ts # 【核心基建】BinaryOperator + BinaryOperatorAggregate
│ │ │ │ ├── any_value.ts # 【核心基建】AnyValue 任意值通道
│ │ │ │ ├── ephemeral_value.ts # 【核心基建】EphemeralValue 临时值 (不持久化)
│ │ │ │ ├── named_barrier_value.ts # 【核心基建】NamedBarrierValue 命名屏障同步
│ │ │ │ ├── dynamic_barrier_value.ts # 【核心基建】DynamicBarrierValue 动态屏障
│ │ │ │ └── untracked_value.ts # 【核心基建】UntrackedValue 非追踪通道
│ │ │ │
│ │ │ ├── graph/ # 【核心基建】图定义与编译层
│ │ │ │ ├── graph.ts # 【核心基建】Graph 基类 + CompiledGraph + Branch
│ │ │ │ ├── state.ts # 【核心基建】StateGraph - 用户主入口 API (1683行)
│ │ │ │ ├── annotation.ts # 【核心基建】Annotation/Ascription 状态定义 DSL
│ │ │ │ ├── messages_annotation.ts # 【业务模块】MessagesAnnotation/MessagesZodState
│ │ │ │ ├── message.ts # 【业务模块】pushMessage 消息操作工具
│ │ │ │ ├── types.ts # 【核心基建】ExtractStateType/StateDefinitionInit 类型
│ │ │ │ └── zod/ # 【工具集】Zod Schema 集成
│ │ │ │ ├── index.ts # 【工具集】zod() 导出
│ │ │ │ ├── schema.ts # 【工具集】ZodSchemaAdapter for Annotation
│ │ │ │ └── meta.ts # 【工具集】SchemaMetaRegistry
│ │ │ │
│ │ │ ├── pregel/ # 【核心基建】Pregel 消息传递执行引擎
│ │ │ │ ├── index.ts # 【核心基建】Pregel 主控制器 - invoke/stream (2685行)
│ │ │ │ ├── algo.ts # 【核心基建】_applyWrites + _prepareNextTasks + shouldInterrupt
│ │ │ │ ├── loop.ts # 【核心基建】PregelLoop - 超步循环控制
│ │ │ │ ├── runner.ts # 【核心基建】PregelRunner - 并行任务执行器
│ │ │ │ ├── io.ts # 【核心基建】readChannels + mapInput + mapOutput*
│ │ │ │ ├── read.ts # 【核心基建】PregelNode + ChannelRead
│ │ │ │ ├── write.ts # 【核心基建】ChannelWrite + ChannelWriteEntry
│ │ │ │ ├── stream.ts # 【核心基建】IterableReadableWritableStream
│ │ │ │ ├── messages.ts # 【核心基建】StreamMessagesHandler
│ │ │ │ ├── messages-v2.ts # 【核心基建】StreamProtocolMessagesHandler (v2协议)
│ │ │ │ ├── debug.ts # 【工具集】printStepCheckpoint/printStepTasks 调试
│ │ │ │ ├── retry.ts # 【核心基建】_runWithRetry 重试策略
│ │ │ │ ├── call.ts # 【核心基建】getRunnableForFunc 函数包装
│ │ │ │ ├── types.ts # 【核心基建】Pregel 内部类型定义
│ │ │ │ └── utils/ # 【工具集】工具函数
│ │ │ │ ├── index.ts # 【工具集】RetryPolicy/CachePolicy/Subgraph 检测
│ │ │ │ ├── subgraph.ts # 【工具集】isPregelLike 子图检测
│ │ │ │ └── config.ts # 【工具集】getStore/getWriter/getConfig
│ │ │ │
│ │ │ ├── func/ # 【业务模块】Functional API - entrypoint/task
│ │ │ │ └── index.ts # 【业务模块】entrypoint() + task() 函数式编程入口
│ │ │ │
│ │ │ ├── prebuilt/ # 【业务模块】预构建 Agent 组件
│ │ │ │ └── index.ts # 【业务模块】createReactAgent/ToolNode 等开箱即用
│ │ │ │
│ │ │ ├── state/ # 【核心基建】状态 Schema 管理
│ │ │ │ ├── index.ts # 【核心基建】StateSchema 定义与工厂
│ │ │ │ ├── prebuilt/ # 【工具集】预构建状态 (MessagesState 等)
│ │ │ │ └── values/ # 【工具集】值类型状态定义
│ │ │ │
│ │ │ ├── stream/ # 【核心基建】流式输出系统
│ │ │ │ ├── index.ts # 【核心基建】GraphRunStream/EventLog/ProtocolEvent
│ │ │ │ └── transformers/ # 【核心基建】流转换器 (消息/值/生命周期/子图发现)
│ │ │ │
│ │ │ ├── setup/ # 【工具集】初始化
│ │ │ │ └── async_local_storage.ts # 【工具集】AsyncLocalStorage 全局单例初始化
│ │ │ │
│ │ │ ├── tests/ # 【质量保证】测试套件
│ │ │ │ ├── pregel.test.ts # 【质量保证】Pregel 核心测试 (12,721行)
│ │ │ │ ├── prebuilt.test.ts # 【质量保证】预构建组件测试 (3,821行)
│ │ │ │ ├── python_port/ # 【质量保证】Python 版本兼容性测试移植
│ │ │ │ ├── pregel/ # 【质量保证】Pregel 专项测试
│ │ │ │ ├── prebuilt/ # 【质量保证】预构建专项测试
│ │ │ │ └── data/ # 【质量保证】测试数据
│ │ │ │
│ │ │ ├── web.ts # 【核心基建】浏览器端入口 (主导出)
│ │ │ ├── index.ts # 【核心基建】Node.js 入口 (含 AsyncLocalStorage 初始化)
│ │ │ ├── remote.ts # 【业务模块】远程图调用 (langgraph/remote)
│ │ │ ├── constants.ts # 【核心基建】全局常量 (START/END/Command/Send)
│ │ │ ├── errors.ts # 【核心基建】错误类层次 (GraphRecursionError 等)
│ │ │ ├── interrupt.ts # 【核心基建】interrupt() 函数
│ │ │ ├── writer.ts # 【核心基建】writer() 函数
│ │ │ ├── utils.ts # 【工具集】通用工具函数
│ │ │ └── hash.ts # 【工具集】XXH3 哈希实现
│ │ ├── spec/ # 【⚠️ 待分析】规范文件
│ │ ├── scripts/ # 【工具集】构建脚本
│ │ └── package.json # 【配置】npm 包配置
│ │
│ ├── langgraph/ # 【业务模块】顶层 re-export 包装 (npm: langgraph)
│ │ └── src/ # 轻量重导出, 依赖 @langchain/langgraph
│ │
│ ├── checkpoint/ # 【核心基建】检查点基础库 (npm: @langchain/langgraph-checkpoint)
│ │ └── src/
│ │ ├── base.ts # 【核心基建】Checkpoint 数据结构 + BaseCheckpointSaver
│ │ ├── types.ts # 【核心基建】PendingWrite/CheckpointTuple/CheckpointMetadata
│ │ ├── id.ts # 【工具集】uuid6 生成
│ │ ├── memory.ts # 【工具集】MemorySaver 内存检查点实现
│ │ ├── serde/
│ │ │ ├── base.ts # 【核心基建】SerializerProtocol 序列化协议
│ │ │ ├── jsonplus.ts # 【核心基建】JsonPlusSerializer 增强 JSON
│ │ │ ├── types.ts # 【核心基建】SendProtocol/SCHEDULED 协议类型
│ │ │ └── utils/ # 【工具集】fast-safe-stringify
│ │ ├── store/
│ │ │ └── index.ts # 【核心基建】BaseStore/AsyncBatchedStore/InMemoryStore
│ │ └── cache/
│ │ └── index.ts # 【核心基建】BaseCache/LRUCache
│ │
│ ├── checkpoint-sqlite/ # 【工具集】SQLite 检查点后端 (better-sqlite3)
│ ├── checkpoint-postgres/ # 【工具集】PostgreSQL 检查点后端 (pg)
│ ├── checkpoint-mongodb/ # 【工具集】MongoDB 检查点后端
│ ├── checkpoint-redis/ # 【工具集】Redis 检查点后端 (集成测试用)
│ ├── checkpoint-validation/ # 【质量保证】检查点数据校验工具
│ │
│ ├── sdk/ # 【核心基建】LangGraph API 客户端 SDK
│ │ └── src/
│ │ ├── client.ts # 【核心基建】Client 主类 (HTTP/SSE/WS)
│ │ ├── client/
│ │ │ ├── stream/ # 【核心基建】ThreadStream 流式客户端 (2451行)
│ │ │ ├── assistants/ # 【业务模块】Assistants CRUD API
│ │ │ ├── threads/ # 【业务模块】Threads CRUD API
│ │ │ ├── runs/ # 【业务模块】Runs 执行 API
│ │ │ ├── crons/ # 【业务模块】Cron Job API
│ │ │ └── store/ # 【业务模块】Store API
│ │ ├── react/ # 【UI 视图】React useStream/useStreamState Hooks
│ │ ├── react-ui/ # 【UI 视图】React 服务端渲染 UI 组件
│ │ ├── ui/ # 【UI 视图】共享 UI 工具 (Manager/Errors)
│ │ ├── stream/ # 【核心基建】流处理管道 (discovery/projections)
│ │ ├── auth/ # 【业务模块】认证模块
│ │ ├── logging/ # 【工具集】日志模块
│ │ ├── utils/ # 【工具集】通用工具函数
│ │ └── singletons/ # 【工具集】单例 (fetch override)
│ │
│ ├── sdk-react/ # 【UI 视图】React SDK 适配层
│ ├── sdk-svelte/ # 【UI 视图】Svelte SDK 适配层
│ ├── sdk-vue/ # 【UI 视图】Vue SDK 适配层
│ ├── sdk-angular/ # 【UI 视图】Angular SDK 适配层
│ │
│ ├── langgraph-cli/ # 【工具集】CLI 命令行工具
│ │ └── src/ # CLI 命令实现
│ ├── langgraph-api/ # 【工具集】API 服务端实现
│ │ ├── src/ # API 服务端源码
│ │ └── tests/ # API 测试
│ │
│ ├── langgraph-cua/ # 【业务模块】Computer Use Agent (CUA)
│ │ ├── src/ # CUA 核心实现
│ │ ├── examples/ # CUA 示例
│ │ └── scripts/ # CUA 脚本
│ │
│ ├── langgraph-supervisor/ # 【业务模块】Supervisor Agent 模式
│ │ ├── src/ # Supervisor 核心实现
│ │ ├── static/ # 静态资源
│ │ └── scripts/ # Supervisor 脚本
│ │
│ ├── langgraph-swarm/ # 【业务模块】Swarm Agent 模式
│ │ ├── src/ # Swarm 核心实现
│ │ ├── static/ # 静态资源
│ │ ├── examples/ # Swarm 示例
│ │ └── scripts/ # Swarm 脚本
│ │
│ ├── langgraph-ui/ # 【UI 视图】共享 UI 组件库
│ │ └── src/ # UI 组件源码
│ │
│ └── create-langgraph/ # 【工具集】项目脚手架 (npm create langgraph)
│
├── examples/ # 【配置】示例项目集 (22 个子项目)
│ ├── quickstart/ # 【业务模块】快速入门示例
│ ├── how-tos/ # 【业务模块】操作指南 (50+ 示例条目)
│ ├── chatbots/ # 【业务模块】聊天机器人
│ ├── multi_agent/ # 【业务模块】多 Agent 协作
│ ├── rag/ # 【业务模块】RAG 检索增强生成
│ ├── agent_executor/ # 【业务模块】Agent 执行器
│ ├── plan-and-execute/ # 【业务模块】计划→执行模式
│ ├── reflection/ # 【业务模块】反思模式
│ ├── rewoo/ # 【业务模块】ReWOO (Reasoning WithOut Observation)
│ ├── streaming/ # 【业务模块】流式传输
│ ├── sql-agent/ # 【业务模块】SQL Agent
│ ├── chatbot-simulation-evaluation/ # 【业务模块】聊天模拟评估
│ ├── ai-elements/ # 【UI 视图】AI Elements 组件库
│ ├── assistant-ui-claude/ # 【UI 视图】助手 UI (Claude 风格)
│ ├── ui-react/ # 【UI 视图】React UI 示例
│ ├── ui-react-transport/ # 【UI 视图】React Transport 示例
│ ├── ui-svelte/ # 【UI 视图】Svelte UI 示例
│ ├── ui-vue/ # 【UI 视图】Vue UI 示例
│ ├── ui-angular/ # 【UI 视图】Angular UI 示例
│ └── ui-multimodal/ # 【UI 视图】多模态 UI 示例
│
├── docs/ # 【配置】文档站点
├── internal/ # 【工具集】内部开发工具
│ ├── bench/ # 【工具集】性能基准测试
│ ├── build/ # 【工具集】构建工具链
│ └── environment_tests/ # 【工具集】多环境兼容性测试
├── scripts/ # 【工具集】构建与发布脚本
├── .github/ # 【配置】CI/CD 工作流
├── .changeset/ # 【配置】变更集管理
├── .vscode/ # 【配置】VS Code 编辑器配置
├── reports/ # 【配置】分析报告输出
├── package.json # 【配置】根 monorepo 配置
├── pnpm-workspace.yaml # 【配置】pnpm 工作空间定义
├── pnpm-lock.yaml # 【配置】依赖锁定文件
├── turbo.json # 【配置】Turborepo 构建配置
├── tsconfig.json # 【配置】TypeScript 全局配置
├── deno.json # 【配置】Deno 运行时配置
├── CLAUDE.md # 【配置】AI 辅助开发指南
├── CONTRIBUTING.md # 【配置】贡献指南
├── LICENSE # 【配置】MIT 许可证
├── README.md # 【配置】项目 README (symlink)
└── README-CN.md # 【配置】中文 README3. 模块依赖与调用关系
3.1 全局入口与核心路由
逻辑说明:LangGraphJS 对外暴露多个子路径入口。主入口 @langchain/langgraph 导出 StateGraph、Graph 等核心 API,以及 MemorySaver(内存检查点)。子路径如 /channels、/pregel、/prebuilt 分别导出特定层的功能。SDK 独立包 @langchain/langgraph-sdk 提供与 LangGraph API 服务端的通信能力。
调用拓扑:
@langchain/langgraph (libs/langgraph-core/src/web.ts)
|
+---> graph/ (Graph API)
| +---> StateGraph ────> Pregel (编译目标)
| +---> MessageGraph ──> Pregel (编译目标)
| +---> Annotation ────> Channels (状态通道工厂)
| +---> Graph (低层图基类)
|
+---> pregel/ (执行引擎)
| +---> PregelLoop ────> algo.ts (_applyWrites, _prepareNextTasks)
| +---> PregelRunner ──> retry.ts (_runWithRetry)
| +---> io.ts (readChannels, mapInput, mapOutput*)
| +---> debug.ts (printStep*)
| +---> write.ts (ChannelWrite)
| +---> read.ts (PregelNode, ChannelRead)
| +---> stream.ts (IterableReadableWritableStream)
| +---> messages.ts (StreamMessagesHandler)
| +---> utils/ (RetryPolicy, CachePolicy, subgraph detection)
|
+---> channels/ (状态通道)
| +---> BaseChannel (抽象基类)
| +---> LastValue (单值通道)
| +---> LastValueAfterFinish (屏障后可用)
| +---> Topic (发布-订阅通道)
| +---> BinaryOperatorAggregate (归约通道)
| +---> AnyValue (任意值通道)
| +---> EphemeralValue (临时值,不持久化)
| +---> NamedBarrierValue (命名屏障)
| +---> DynamicBarrierValue (动态屏障)
| +---> binop.ts (BinaryOperator 类型)
|
+---> func/ (函数式 API)
| +---> entrypoint() ────> Pregel (轻量入口)
| +---> task() ──────────> Pregel (任务定义)
|
+---> prebuilt/ (预构建组件)
| +---> createReactAgent()
| +---> ToolNode
| +---> AgentState (预定义状态)
|
+---> state/ (状态管理)
| +---> StateSchema
| +---> prebuilt/ (预构建状态)
| +---> values/ (值类型状态)
|
+---> stream/ (流式输出)
| +---> GraphRunStream (图运行流)
| +---> transformers/ (消息/值/生命周期转换器)
| +---> EventLog (事件日志)
|
+---> constants.ts (核心常量: START, END, Command, Send, INTERRUPT)
+---> errors.ts (错误类型层次)
@langchain/langgraph-checkpoint (libs/checkpoint/src/)
|
+---> base.ts ────> Checkpoint, emptyCheckpoint, BaseCheckpointSaver
+---> types.ts ───> PendingWrite, CheckpointMetadata, CheckpointTuple
+---> serde/ ─────> SerializerProtocol, JsonPlusSerializer
+---> store/ ─────> BaseStore, AsyncBatchedStore, InMemoryStore
+---> cache/ ─────> BaseCache, LRUCache
+---> memory.ts ──> MemorySaver (内存检查点实现)
@langchain/langgraph-sdk (libs/sdk/src/)
|
+---> client.ts ────────> Client (HTTP/SSE/WS 客户端)
+---> client/stream/ ───> ThreadStream, MessageAssembler
+---> client/assistants/> Assistant API
+---> client/threads/ ──> Thread API
+---> client/runs/ ─────> Run API
+---> client/crons/ ────> Cron Job API
+---> client/store/ ────> Store API
+---> auth/ ────────────> 认证模块
+---> react/ ───────────> useStream() / useStreamState() Hooks
+---> react-ui/ ────────> UI 服务端渲染组件
+---> ui/ ──────────────> 共享 UI 工具
+---> stream/ ───────────> 流处理管道2.2 核心业务实体与关联
实体定义:
- Graph:工作流的有向图定义,包含节点(Node)和边(Edge)
- StateGraph:带共享状态的有向图,节点可读写状态
- Channel:状态的原子存储单元,有类型化的 ValueType / UpdateType / CheckpointType
- Checkpoint:整个图在某时刻的状态快照,包含所有 channel_values 和 channel_versions
- Pregel:编译后的可执行图,负责协调执行
- Task:Pregel 调度执行的单个工作单元
- Thread:一次会话/执行实例,有其独立的检查点链
- Run:一次具体的图调用/执行
实体引用拓扑:
[StateGraph] 1 ----> 1 [CompiledGraph]
|
| 编译为
v
[CompiledGraph] 1 ----> 1 [Pregel]
|
| 管理 N 个
v
[Pregel] 1 ----> N [Channel]
|
| 每超步持久化
v
[Pregel] 1 ----> N [Checkpoint]
|
| 调度 N 个
v
[Pregel] 1 ----> N [Task] (per superstep)
[Thread] 1 ----> N [Checkpoint] (历史链)
[Thread] 1 ----> N [Run] (执行记录)
[BaseCheckpointSaver] 1 ----> N [Checkpoint] (持久化读写)
[BaseStore] 1 ----> N [Item] (长期存储条目)
[Client (SDK)] 1 ----> N [Thread] (远程管理)
[Client (SDK)] 1 ----> N [Run] (远程执行)
[Client (SDK)] 1 ----> N [Assistant] (助手配置)4. 核心模块详解
模块一:Pregel 执行引擎
设计说明:Pregel 是 LangGraphJS 的核心执行引擎,实现了受 Google Pregel 启发的消息传递超步执行模型。每个超步(Superstep)包含三个阶段:
- (1) 计划阶段 - 确定哪些节点应该执行;
- (2) 执行阶段 - 并行运行已计划的节点;
- (3) 更新阶段 - 将节点输出写入通道并持久化检查点。
关键设计模式包括:策略模式(可插拔的 checkpoint/retry/cache 策略)、观察者模式(stream events)、状态机模式(中断/恢复/错误处理)。
内部结构图:
+-----------------------------------------------------------------------+
| Pregel 执行引擎 |
| |
| Pregel.invoke(input, config) |
| | |
| v |
| +---------------------------+ |
| | PregelLoop (超步循环) | |
| | | |
| | for each superstep: | |
| | 1. _prepareNextTasks() |──> algo.ts |
| | - 根据通道更新确定 | | |
| | 哪些节点被触发 | v |
| | - 生成 Task 列表 | 检查 trigger_to_nodes |
| | | 映射 + channel_versions |
| | 2. PregelRunner.tick() |──> runner.ts |
| | - 并行执行所有 Task | | |
| | - 收集 PendingWrite | v |
| | | [_runWithRetry] + [AbortSignal] |
| | 3. _applyWrites() |──> algo.ts |
| | - 将 writes 应用到 | | |
| | 对应 Channel.update() | v |
| | - 更新 channel_versions | 各 Channel.update(values) |
| | | |
| | 4. checkpoint.save() |──> BaseCheckpointSaver |
| | - 持久化当前状态快照 | | |
| | | v |
| | 5. shouldInterrupt()? |──> 检查 INTERRUPT 标记 |
| | if yes: break loop | |
| +---------------------------+ |
| | |
| v |
| 返回最终 State / 流式输出 |
+-----------------------------------------------------------------------+模块二:Channels 状态通道系统
设计说明:Channels 是 LangGraph 的状态原子单元。每个 Channel 是一个泛型类 BaseChannel<ValueType, UpdateType, CheckpointType>,定义了三个核心操作:
update()接收更新序列并改变内部状态;get()返回当前值;checkpoint()导出可序列化的快照。 不同的 Channel 子类实现了不同的语义:LastValue 存储最后写入的单个值(每步只接收一个值)、Topic 是发布/订阅模型(支持去重和累积)、BinaryOperatorAggregate 支持自定义归约函数。这种设计允许框架精确追踪每个状态字段的变化,实现细粒度的检查点恢复。
内部结构图:
+-----------------------------------------------------------------------+
| Channels 状态通道系统 |
| |
| +------------------------------------------------------------------+ |
| | BaseChannel<V, U, C> (抽象基类) | |
| | + fromCheckpoint(checkpoint?: C): this | |
| | + update(values: U[]): boolean | |
| | + get(): V | |
| | + checkpoint(): C | undefined | |
| | + consume(): boolean | |
| +----+-------------------+-------------------+--------------------+ |
| | | | |
| v v v |
| +---------+ +------------------+ +-----------------------+ |
| |LastValue| | Topic<V> | |BinaryOperatorAggregate| |
| |---------| |------------------| |-----------------------| |
| |单值存储 | |values: V[] | |reducer: (a,b)=>a | |
| |每步一值 | |seen: Set<V> | |value: V | |
| |最后写入 | |unique: bool | |支持自定义归约函数 | |
| |胜出 | |accumulate: bool | |如: (x,y)=>x.concat(y) | |
| +----+----+ +--------+---------+ +-----------+-----------+ |
| | | | |
| v v v |
| +---------+ +------------------+ +-----------------------+ |
| |AnyValue | |NamedBarrierValue | |DynamicBarrierValue | |
| |---------| |------------------| |-----------------------| |
| |任意类型 | |等待指定节点集合 | |等待动态确定的节点集合 | |
| |不做限制 | |全部完成后可读 | |用于并行任务同步 | |
| +---------+ +------------------+ +-----------------------+ |
| |
| +------------------+ +------------------+ |
| | EphemeralValue | |LastValueAfterFinish| |
| |------------------| |---------------------| |
| | 临时值,不持久化 | | 超步完成后才可读 | |
| | 每次超步重置 | | 读取后自动清空 | |
| +------------------+ +---------------------+ |
+-----------------------------------------------------------------------+模块三:Graph 定义与编译
设计说明:Graph 层是用户直接交互的 API 层。StateGraph 是最核心的类,它通过声明式 API 让用户定义节点(addNode)、边(addEdge)、条件边(addConditionalEdges),然后通过 compile() 方法生成 CompiledStateGraph。编译过程将用户定义的图结构转换为 Pregel 可执行的内部表示。Annotation API 提供类型安全的状态定义,每个字段可指定 reducer 和默认值。MessageGraph 是专门为消息状态优化的简化接口。
内部结构图:
+-----------------------------------------------------------------------+
| Graph 定义与编译流程 |
| |
| 用户代码: |
| const workflow = new StateGraph(MyAnnotation) |
| .addNode("nodeA", myFunc) |
| .addNode("nodeB", otherFunc) |
| .addEdge(START, "nodeA") |
| .addConditionalEdges("nodeA", router, {x: "nodeB", y: END}) |
| .compile({ checkpointer: mySaver }) |
| |
| +------------------------------------------------------------------+ |
| | StateGraph.compile() | |
| | | | |
| | v | |
| | 1. 验证图结构 (无孤立节点、无循环依赖) | |
| | 2. 为每个状态字段创建对应 Channel | |
| - Annotation.spec[key] ──> Channel 实例 | |
| | 3. 创建内部通道: | |
| | - __start__: EphemeralValue | |
| | - __end__: LastValue | |
| | - branch:to:nodeX: EphemeralValue (条件路由) | |
| | - 每个状态字段: LastValue / Topic (根据 reducer) | |
| | 4. 构建 Pregel 节点 (PregelNode): | |
| - 每个用户节点包装为 PregelNode | |
| | - 注入 ChannelRead (输入通道) 和 ChannelWrite (输出通道) | |
| | 5. 构建 trigger_to_nodes 映射: | |
| | - channel_name ──> [被触发的节点列表] | |
| | 6. 生成 CompiledStateGraph (持有 Pregel 实例) | |
| +------------------------------------------------------------------+ |
| | |
| v |
| CompiledStateGraph.invoke(state, config) |
| | |
| v |
| Pregel.invoke(input, config) ──> 进入 Pregel 执行引擎 |
+-----------------------------------------------------------------------+模块四:Checkpoint 持久化系统
设计说明:Checkpoint 系统负责 Agent 状态的持久化和恢复。系统核心是 Checkpoint 数据结构(包含 channel_values 和 channel_versions)和 BaseCheckpointSaver 抽象接口。每次超步结束后,框架自动调用 checkpointer.put() 保存当前快照。BaseStore 提供独立的长期键值存储(与检查点正交),用于跨会话的持久记忆。Cache 层提供热点数据缓存,减少数据库查询。序列化层(Serde)处理复杂类型的序列化/反序列化,支持自定义序列化器。
内部结构图:
+-----------------------------------------------------------------------+
| Checkpoint 持久化系统 |
| |
| +------------------------------------------------------------------+ |
| Checkpoint 结构: | |
| { | |
| v: 4, // 格式版本 | |
| id: "uuid6", // 检查点 ID | |
| ts: "ISO timestamp", // 时间戳 | |
| | channel_values: { // 各通道当前值 | |
| "messages": [...], | |
| "agent_status": "idle", | |
| }, | |
| | channel_versions: { // 各通道版本号 (用于冲突检测) | |
| "messages": 5, | |
| "agent_status": 3, | |
| }, | |
| | versions_seen: { // 每个节点看到的版本 (用于触发判断) | |
| "nodeA": { "messages": 3, "agent_status": 1 }, | |
| } | |
| } | |
+------------------------------------------------------------------+ |
|
+------------------------------------------------------------------+ |
| | BaseCheckpointSaver (抽象接口) | |
| + getTuple(config): CheckpointTuple | undefined | |
| + list(config, options): AsyncGenerator<CheckpointTuple> | |
| | + put(config, checkpoint, metadata, newVersions): | |
| | Promise<{configurable: ...}> | |
| | + putWrites(config, writes, taskId): Promise<void> | |
| | + deleteThread(threadId): Promise<void> | |
+--+---------------------------------------------------------------+ |
| |
v v v v |
+------+ +-------------+ +----------+ +--------+ |
| |Memory| |SqliteSaver | |PostgresSaver| |MongoDBSaver| |
| |Saver | |(better- | |(pg driver) | |(mongodb) | |
|------| | sqlite3) | | | | | |
| |内存 | |本地文件 | |远程数据库 | |文档数据库 | |
| +------+ +-------------+ +----------+ +--------+ |
|
+------------------------------------------------------------------+ |
| | BaseStore (长期键值存储) | |
| + put(namespace, key, value): Promise<void> | |
| + get(namespace, key): Promise<Item | null> | |
| | + search(namespace, filter): AsyncGenerator<Item> | |
| + listNamespaces(): AsyncGenerator<NamespacePath> | |
+------------------------------------------------------------------+ |
|
+------------------------------------------------------------------+ |
| | Cache (缓存层) | |
| | - 缓存热点检查点和 PendingWrite | |
| | - 减少数据库往返 | |
| | - 支持 TTL 和 LRU 淘汰策略 | |
+------------------------------------------------------------------+ |
+-----------------------------------------------------------------------+模块五:SDK 客户端与流式传输
设计说明:@langchain/langgraph-sdk 是独立于核心库的客户端包,用于与 LangGraph API 服务端通信。ThreadStream 是核心抽象,封装了 SSE/WebSocket 协议的流式交互。MessageAssembler 将碎片化的流式消息组装为完整结构。SDK 提供 React(useStream/useStreamState)、Svelte、Vue 的多框架 Hook。内部使用 p-queue 控制并发,p-retry 实现自动重试。
内部结构图:
+-----------------------------------------------------------------------+
| SDK 客户端架构 |
| |
+------------------------------------------------------------------+ |
| Client (HTTP 客户端) | |
| | + assistants: AssistantsClient ──> CRUD 助手配置 | |
| | + threads: ThreadsClient ────────> CRUD 会话线程 | |
| | + runs: RunsClient ──────────────> 创建/监控执行 | |
| | + crons: CronsClient ────────────> 定时任务管理 | |
| | + store: StoreClient ────────────> 长期存储操作 | |
+---------------------------+--------------------------------------+ |
| |
v |
+------------------------------------------------------------------+ |
| ThreadStream (流式客户端) | |
| + subscribe(options): ──> 订阅流式事件 | |
| - TransportAdapter (SSE / WebSocket) | |
| - MultiCursorBuffer (多光标缓冲) | |
| - MessageAssembler (消息组装) | |
| - MediaAssembler (多媒体组装) | |
| - ToolCallAssembler (工具调用组装) | |
| - SubgraphDiscoveryHandle (子图发现) | |
| - StreamingMessageHandle (流式消息句柄) | |
+---------------------------+--------------------------------------+ |
| |
v |
+------------------------------------------------------------------+ |
| 框架适配层 | |
| + react/ ──> useStream() Hook, useStreamState() Hook | |
| + react-ui/ ──> 服务端渲染 UI 组件 | |
| + sdk-react/ ──> React 包装 (npm: @langchain/langgraph-sdk-react)| |
| + sdk-svelte/ ─> Svelte 包装 | |
| + sdk-vue/ ────> Vue 包装 | |
| + sdk-angular/ > Angular 包装 | |
+------------------------------------------------------------------+ |
+-----------------------------------------------------------------------+5. 关键数据流程
场景一:StateGraph 完整执行流程
场景说明:用户构建一个包含两个节点(agent → tools)的 StateGraph,编译后调用 invoke()。展示从输入到输出的完整执行流程,包括通道初始化、节点调度、检查点持久化、中断处理等关键环节。
场景二:检查点恢复流程(Human-in-the-Loop)
场景说明:Agent 执行到人工审批节点时中断,用户检查状态后通过 Command 恢复执行。展示中断→检查状态→恢复→继续执行的完整流程。
场景三:流式传输与 SDK 交互
场景说明:前端通过 SDK 的 ThreadStream 订阅 LangGraph API 的流式事件,展示 SSE 连接建立、事件流式接收、消息组装和 UI 更新的完整链路。
6. 接口与契约规范
6.1 核心内部模块契约
/**
* 状态通道基础契约 - 所有通道类型的抽象基类
*/
export abstract class BaseChannel<
ValueType = unknown,
UpdateType = unknown,
CheckpointType = unknown,
> {
abstract lc_graph_name: string;
lg_is_channel: true;
// 从检查点恢复通道状态
abstract fromCheckpoint(checkpoint?: CheckpointType): this;
// 应用一组更新到通道
// @throws {InvalidUpdateError} 当更新序列无效时
abstract update(values: UpdateType[]): boolean;
// 获取当前通道值
// @throws {EmptyChannelError} 当通道为空时
abstract get(): ValueType;
// 导出可序列化的检查点快照
abstract checkpoint(): CheckpointType | undefined;
// 标记当前值为已消费 (默认 no-op)
consume(): boolean;
}/**
* 检查点持久化契约
*/
export interface BaseCheckpointSaver<
N extends string = string,
C extends string = string,
> {
// 获取指定配置对应的检查点
getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined>;
// 列出某线程的检查点历史
list(
config: RunnableConfig,
options?: CheckpointListOptions
): AsyncGenerator<CheckpointTuple>;
// 保存检查点
put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
newVersions: ChannelVersions
): Promise<{ configurable: Record<string, unknown> }>;
// 保存待处理的写入
putWrites(
config: RunnableConfig,
writes: PendingWrite[],
taskId: string
): Promise<void>;
// 删除线程的所有检查点
deleteThread(threadId: string): Promise<void>;
}/**
* 长期存储契约
*/
export abstract class BaseStore {
// 存储键值
abstract put(
namespace: string[],
key: string,
value: Record<string, unknown>
): Promise<void>;
// 读取键值
abstract get(
namespace: string[],
key: string
): Promise<Item | null>;
// 搜索(支持过滤、分页)
abstract search(
namespace: string[],
filter?: SearchFilter
): AsyncGenerator<Item>;
// 列出命名空间
abstract listNamespaces(
options?: ListNamespacesOptions
): AsyncGenerator<NamespacePath>;
}/**
* Pregel 执行参数
*/
export interface PregelParams {
channels: Record<string, BaseChannel>;
nodes: Record<string, PregelNode>;
triggerToNodes: Record<string, string[]>;
inputChannels: string | string[];
outputChannels: string | string[];
streamChannels: string | string[];
interruptAfterNodes?: string[];
interruptBeforeNodes?: string[];
checkpointer?: BaseCheckpointSaver;
store?: BaseStore;
cache?: BaseCache;
debug?: boolean;
}
/**
* 图编译选项
*/
export interface StateGraphOptions {
checkpointer?: BaseCheckpointSaver;
store?: BaseStore;
cache?: BaseCache;
interruptBefore?: string[];
interruptAfter?: string[];
streamTransformers?: StreamTransformer[];
}6.2 对外 API 契约(SDK 协议 - OpenSpec 风格)
components:
schemas:
Run:
type: object
required: [run_id, thread_id, assistant_id, status]
properties:
run_id:
type: string
description: 运行唯一标识符
thread_id:
type: string
description: 所属会话线程 ID
assistant_id:
type: string
description: 助手配置 ID
status:
type: string
enum: [pending, running, completed, failed, interrupted, cancelled]
result:
type: object
description: 执行结果 (status=completed 时)
error:
type: string
description: 错误信息 (status=failed 时)
ThreadCreate:
type: object
properties:
thread_id:
type: string
description: 可选,自定义线程 ID
metadata:
type: object
description: 附加元数据
StreamEvent:
type: object
properties:
event:
type: string
enum: [values, messages, updates, events, debug, error, end]
data:
type: object
description: 事件数据,类型取决于 event 字段
paths:
/threads:
post:
summary: 创建新线程
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ThreadCreate'
responses:
'200':
description: 线程创建成功
content:
application/json:
schema:
$ref: '#/components/schemas/Thread'
/threads/{thread_id}/runs:
post:
summary: 在指定线程中创建执行
parameters:
- name: thread_id
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
type: object
required: [assistant_id]
properties:
assistant_id:
type: string
input:
type: object
config:
type: object
responses:
'200':
description: 执行已创建并启动
'409':
description: 线程已有正在进行的执行
/threads/{thread_id}/runs/stream:
post:
summary: 流式执行 (SSE)
parameters:
- name: thread_id
in: path
required: true
responses:
'200':
description: SSE 事件流
content:
text/event-stream:
schema:
$ref: '#/components/schemas/StreamEvent'
/threads/{thread_id}/state:
get:
summary: 获取线程当前状态
parameters:
- name: thread_id
in: path
required: true
responses:
'200':
description: 当前状态快照
post:
summary: 更新线程状态(用于人工干预)7. 快速开始
7.1 环境配置
# 要求 Node.js >=18
node --version
# 安装 pnpm (如未安装)
npm install -g pnpm
# 安装依赖
pnpm install7.2 安装与使用
npm install @langchain/langgraph @langchain/core7.3 最小示例
import { StateGraph, Annotation, END, START } from "@langchain/langgraph";
import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";
// 1. 定义状态
const State = Annotation.Root({
messages: Annotation<BaseMessage[]>({
reducer: (left, right) => left.concat(right),
default: () => [],
}),
});
// 2. 构建图
const graph = new StateGraph(State)
.addNode("agent", async (state) => {
return { messages: [new AIMessage("Hello! How can I help?")] };
})
.addEdge(START, "agent")
.addEdge("agent", END)
.compile();
// 3. 执行
const result = await graph.invoke({
messages: [new HumanMessage("Hi!")],
});
console.log(result.messages);7.4 项目构建
# 构建所有包
pnpm build
# 运行测试
pnpm test
# 运行单个测试
pnpm test:single /path/to/yourtest.test.ts
# 代码检查与格式化
pnpm lint
pnpm format