Skip to content

LangGraphJS 解读

项目信息

1. 项目概览

1.1 项目定位与核心价值

LangGraphJS 是一个低层级、可组合的 Agent 编排框架,专为构建具备持久化状态、支持人工干预、可长期运行的 AI Agent 工作流而生。它是 LangGraph(Python 版的 JavaScript/TypeScript 对应实现),被 Replit、Uber、LinkedIn、GitLab 等企业用于生产环境。

解决的核心痛点

痛点解决方案
Agent 状态不可控通过有状态图(Stateful Graph)让每一步执行可追踪、可中断、可恢复
长流程不可靠检查点(Checkpoint)机制提供持久执行(Durable Execution),自动从断点恢复
人工介入困难内置 interrupt() 断点中断和 Command({ resume }) 人工审批机制
跨会话记忆缺失BaseStore 长期键值存储 + 短期状态 Channels 双轨记忆系统

1.2 目标用户与使用场景

目标用户

  • LLM 应用开发者(需要构建复杂 Agent 工作流的 Node.js/TypeScript 开发者)
  • 企业级用户(已有 Replit、Uber、LinkedIn、GitLab 等生产验证)
  • 框架集成者(需在现有应用中嵌入可控 AI 工作流)

核心业务场景

  • 智能客服系统:多轮对话 + 多分支路由,在退款/投诉等关键节点引入人工审批
  • 数据分析流水线:数据采集→清洗→分析→报告生成的多 Agent 自动化链路
  • 代码生成与审查:代码编写→单元测试→代码审查→安全扫描的多步骤工作流
  • RAG 增强检索:查询改写→检索→重排序→答案生成的多阶段流程

1.3 技术栈与选型对比

层级技术选择选型理由
语言/运行时TypeScript (ES2021), Node.js >=18类型安全 + 现代 JS 特性
包管理pnpm + Turborepo (monorepo)高效多包管理,增量构建
代码检查oxlint + oxfmt (oxc)Rust 实现,比 ESLint 快 50-100x
测试Vitest + Playwright快速单元测试 + E2E 浏览器测试
执行模型Pregel (Google Pregel 论文)超步执行模型天然适合状态恢复
状态管理自研 Channels 系统版本化的状态变更追踪
数据校验Zod v3/v4 + @standard-schema运行时类型校验与 Schema 生成
多框架适配React 19, Svelte 5, Vue 3, Angular覆盖主流前端生态

1.4 生态位分析

Deep Agents (高层 Agent 封装)
    ↑ 基于
LangGraph (本仓库 - 底层编排引擎)
    ↑ 可选集成
LangChain (工具/模型集成)
    ↑ 依赖
@langchain/core (核心抽象)

2. 整体架构设计

2.1 架构概述

LangGraphJS 采用经典的四层分层架构,自底向上依次为:

  • Channels 层:提供状态通道原语(LastValue、Topic、BinaryOperatorAggregate 等),是状态管理的原子单元。每个 Channel 都是一个类型安全的、可版本化的状态容器,支持从检查点恢复。
  • Checkpointer 层:负责状态持久化与恢复,提供 BaseCheckpointSaver 抽象接口,支持 SQLite、PostgreSQL、MongoDB、Redis 等多种后端。同时提供 Store(长期存储)和 Cache(缓存)机制。
  • Pregel 层:核心执行引擎,实现基于消息传递的超步(Superstep)执行模型。每个超步中,框架确定哪些节点应该执行(基于通道更新触发),并在每个超步结束时持久化检查点。
  • Graph 层:面向用户的高级 API,提供 Graph、StateGraph、MessageGraph 等便于定义工作流的声明式接口。StateGraph 是使用最广泛的入口,基于 Annotation 系统定义状态结构。

外部对接 LangSmith(可观测性)、LangChain(模型/工具集成)、多框架 SDK(React/Svelte/Vue/Angular)。

2.2 整体架构图

text
+=======================================================================+
|                         LangGraphJS 整体架构                           |
+=======================================================================+
                                                                         |
+-----------------------------------------------------------------------+
|                    Graph 层 (用户 API 层)                              |
|                                                                       |
|  +------------------+  +------------------+  +--------------------+   |
|  |    StateGraph    |  |    Graph         |  |  MessageGraph      |   |
|  | [addNode]        |  | [addNode]        |  | [消息专用简化接口]  |   |
|  | [addEdge]        |  | [addEdge]        |  +--------------------+   |
|  | [addConditional] |  | [addConditional] |                          |
|  | [compile]  ───────> | CompiledGraph    |  <───── 返回给用户       |
|  +------------------+  +------------------+                          |
|                                                                       |
|  +------------------+  +------------------------------------------+   |
|  |  Annotation       | | Functional API: entrypoint() / task()     |   |
|  |  [状态结构定义]    | | [函数式编程模型]                          |   |
|  +------------------+  +------------------------------------------+   |
+----------------------------+------------------------------------------+
                             | 基于
                             v
+-----------------------------------------------------------------------+
|                    Pregel 层 (执行引擎层)                              |
|                                                                       |
|  +---------------------+    +---------------------+                   |
|  |    PregelLoop        |    |   PregelRunner      |                  |
|  | [超步循环控制]        |<-->| [并行任务执行]       |                  |
|  | [中断/恢复判断]       |    | [重试策略]           |                  |
|  +----------+----------+    +----------+----------+                   |
|             |                          |                              |
|  +----------v--------------------------v----------+                   |
|  |              Pregel (主控制器)                  |                   |
|  |  [invoke/stream/updateState]                    |                   |
|  |  [通道读取]  [任务调度]  [检查点持久化]          |                   |
|  +----------+----------------------+--------------+                   |
|             |                      |                                  |
|  +----------v----------+  +--------v-----------+                     |
|  |    algo.ts           |  |    io.ts           |                    |
|  | [_applyWrites]        |  | [readChannels]     |                    |
|  | [_prepareNextTasks]   |  | [mapInput]         |                    |
|  | [shouldInterrupt]     |  | [mapOutputUpdates] |                    |
|  +----------------------+  +--------------------+                     |
+----------------------------+------------------------------------------+
                             | 使用
                             v
+-----------------------------------------------------------------------+
|                    Channels 层 (状态通道层)                            |
|                                                                       |
|  +-------------+ +------------+ +------------------+ +-----------+    |
|  |  LastValue  | |  Topic     | | BinaryOperator   | | AnyValue  |    |
|  | [单值存储]   | | [发布/订阅] | | Aggregate        | | [任意值]  |    |
|  | [每步一值]   | | [去重/累积] | | [归约操作]        | |           |    |
|  +------+------+ +-----+------+ +--------+---------+ +-----+-----+   |
|         |              |                 |                  |         |
|  +------v--------------v-----------------v------------------v-----+   |
|  |                    BaseChannel<Value, Update, Checkpoint>       |   |
|  |  [fromCheckpoint()] [update()] [get()] [checkpoint()]          |   |
|  +----------------------------------------------------------------+   |
|  +------------------+  +------------------+  +------------------+    |
|  | EphemeralValue   |  | NamedBarrierValue|  |DynamicBarrierValue|   |
|  | [临时值,不持久化] |  | [命名屏障同步]    |  | [动态屏障同步]    |   |
|  +------------------+  +------------------+  +------------------+    |
+----------------------------+------------------------------------------+
                             | 持久化
                             v
+-----------------------------------------------------------------------+
|                    Checkpoint 层 (持久化层)                            |
|                                                                       |
|  +---------------------+    +---------------------+                   |
|  | BaseCheckpointSaver  |    |    BaseStore        |                  |
|  | [检查点保存/加载]     |    | [长期键值存储]       |                  |
|  | [list()] [getTuple()]|    | [put()] [get()]     |                  |
|  | [put()] [putWrites()]|    | [search()] [list()]  |                  |
|  +----------+-----------+    +----------+----------+                   |
|             |                          |                              |
|  +----------v--------------------------v--------------------------+   |
|  |              后端实现 (插件化架构)                               |   |
|  |  +--------+  +--------+  +---------+  +-------+  +--------+    |   |
|  |  | SQLite |  |Postgres|  | MongoDB |  | Redis |  | Memory |    |   |
|  |  +--------+  +--------+  +---------+  +-------+  +--------+    |   |
|  +----------------------------------------------------------------+   |
|  +----------------------------------------------------------------+   |
|  |  Serde (序列化层)                                               |   |
|  |  [JsonPlusSerializer] [fast-safe-stringify]                    |   |
|  +----------------------------------------------------------------+   |
+-----------------------------------------------------------------------+
                             |
                             v
+-----------------------------------------------------------------------+
|                    外部系统                                            |
|  +------------------+  +------------------+  +--------------------+   |
|  |  LangSmith        |  |  LangChain Core  |  |  LangGraph SDK     |  |
|  | [可观测性/追踪]    |  | [模型/工具集成]   |  | [API 客户端]       |  |
|  +------------------+  +------------------+  +--------------------+   |
+-----------------------------------------------------------------------+

2.3 目录结构

shell
langgraphjs/
├── libs/                                  # 【核心基建】所有可发布包 (pnpm monorepo)

   ├── langgraph-core/                    # 【核心基建】核心库 - 发布为 @langchain/langgraph
   ├── src/
   ├── channels/                  # 【核心基建】状态通道系统
   ├── base.ts               # 【核心基建】BaseChannel 抽象基类 + createCheckpoint
   ├── last_value.ts          # 【核心基建】LastValue + LastValueAfterFinish
   ├── topic.ts              # 【核心基建】Topic 发布-订阅通道 (unique/accumulate)
   ├── binop.ts              # 【核心基建】BinaryOperator + BinaryOperatorAggregate
   ├── any_value.ts          # 【核心基建】AnyValue 任意值通道
   ├── ephemeral_value.ts    # 【核心基建】EphemeralValue 临时值 (不持久化)
   ├── named_barrier_value.ts # 【核心基建】NamedBarrierValue 命名屏障同步
   ├── dynamic_barrier_value.ts # 【核心基建】DynamicBarrierValue 动态屏障
   └── untracked_value.ts    # 【核心基建】UntrackedValue 非追踪通道

   ├── graph/                    # 【核心基建】图定义与编译层
   ├── graph.ts              # 【核心基建】Graph 基类 + CompiledGraph + Branch
   ├── state.ts              # 【核心基建】StateGraph - 用户主入口 API (1683行)
   ├── annotation.ts         # 【核心基建】Annotation/Ascription 状态定义 DSL
   ├── messages_annotation.ts # 【业务模块】MessagesAnnotation/MessagesZodState
   ├── message.ts            # 【业务模块】pushMessage 消息操作工具
   ├── types.ts              # 【核心基建】ExtractStateType/StateDefinitionInit 类型
   └── zod/                  # 【工具集】Zod Schema 集成
       ├── index.ts          # 【工具集】zod() 导出
       ├── schema.ts         # 【工具集】ZodSchemaAdapter for Annotation
       └── meta.ts           # 【工具集】SchemaMetaRegistry

   ├── pregel/                   # 【核心基建】Pregel 消息传递执行引擎
   ├── index.ts              # 【核心基建】Pregel 主控制器 - invoke/stream (2685行)
   ├── algo.ts               # 【核心基建】_applyWrites + _prepareNextTasks + shouldInterrupt
   ├── loop.ts               # 【核心基建】PregelLoop - 超步循环控制
   ├── runner.ts             # 【核心基建】PregelRunner - 并行任务执行器
   ├── io.ts                 # 【核心基建】readChannels + mapInput + mapOutput*
   ├── read.ts               # 【核心基建】PregelNode + ChannelRead
   ├── write.ts              # 【核心基建】ChannelWrite + ChannelWriteEntry
   ├── stream.ts             # 【核心基建】IterableReadableWritableStream
   ├── messages.ts           # 【核心基建】StreamMessagesHandler
   ├── messages-v2.ts        # 【核心基建】StreamProtocolMessagesHandler (v2协议)
   ├── debug.ts              # 【工具集】printStepCheckpoint/printStepTasks 调试
   ├── retry.ts              # 【核心基建】_runWithRetry 重试策略
   ├── call.ts               # 【核心基建】getRunnableForFunc 函数包装
   ├── types.ts              # 【核心基建】Pregel 内部类型定义
   └── utils/                # 【工具集】工具函数
       ├── index.ts          # 【工具集】RetryPolicy/CachePolicy/Subgraph 检测
       ├── subgraph.ts       # 【工具集】isPregelLike 子图检测
       └── config.ts         # 【工具集】getStore/getWriter/getConfig

   ├── func/                     # 【业务模块】Functional API - entrypoint/task
   └── index.ts              # 【业务模块】entrypoint() + task() 函数式编程入口

   ├── prebuilt/                 # 【业务模块】预构建 Agent 组件
   └── index.ts              # 【业务模块】createReactAgent/ToolNode 等开箱即用

   ├── state/                    # 【核心基建】状态 Schema 管理
   ├── index.ts              # 【核心基建】StateSchema 定义与工厂
   ├── prebuilt/             # 【工具集】预构建状态 (MessagesState 等)
   └── values/               # 【工具集】值类型状态定义

   ├── stream/                   # 【核心基建】流式输出系统
   ├── index.ts              # 【核心基建】GraphRunStream/EventLog/ProtocolEvent
   └── transformers/         # 【核心基建】流转换器 (消息/值/生命周期/子图发现)

   ├── setup/                    # 【工具集】初始化
   └── async_local_storage.ts # 【工具集】AsyncLocalStorage 全局单例初始化

   ├── tests/                    # 【质量保证】测试套件
   ├── pregel.test.ts        # 【质量保证】Pregel 核心测试 (12,721行)
   ├── prebuilt.test.ts      # 【质量保证】预构建组件测试 (3,821行)
   ├── python_port/          # 【质量保证】Python 版本兼容性测试移植
   ├── pregel/               # 【质量保证】Pregel 专项测试
   ├── prebuilt/             # 【质量保证】预构建专项测试
   └── data/                 # 【质量保证】测试数据

   ├── web.ts                    # 【核心基建】浏览器端入口 (主导出)
   ├── index.ts                  # 【核心基建】Node.js 入口 (含 AsyncLocalStorage 初始化)
   ├── remote.ts                 # 【业务模块】远程图调用 (langgraph/remote)
   ├── constants.ts              # 【核心基建】全局常量 (START/END/Command/Send)
   ├── errors.ts                 # 【核心基建】错误类层次 (GraphRecursionError 等)
   ├── interrupt.ts             # 【核心基建】interrupt() 函数
   ├── writer.ts                # 【核心基建】writer() 函数
   ├── utils.ts                 # 【工具集】通用工具函数
   └── hash.ts                  # 【工具集】XXH3 哈希实现
   ├── spec/                         # 【⚠️ 待分析】规范文件
   ├── scripts/                      # 【工具集】构建脚本
   └── package.json                  # 【配置】npm 包配置

   ├── langgraph/                        # 【业务模块】顶层 re-export 包装 (npm: langgraph)
   └── src/                          # 轻量重导出, 依赖 @langchain/langgraph

   ├── checkpoint/                       # 【核心基建】检查点基础库 (npm: @langchain/langgraph-checkpoint)
   └── src/
       ├── base.ts                   # 【核心基建】Checkpoint 数据结构 + BaseCheckpointSaver
       ├── types.ts                  # 【核心基建】PendingWrite/CheckpointTuple/CheckpointMetadata
       ├── id.ts                     # 【工具集】uuid6 生成
       ├── memory.ts                 # 【工具集】MemorySaver 内存检查点实现
       ├── serde/
   ├── base.ts               # 【核心基建】SerializerProtocol 序列化协议
   ├── jsonplus.ts           # 【核心基建】JsonPlusSerializer 增强 JSON
   ├── types.ts              # 【核心基建】SendProtocol/SCHEDULED 协议类型
   └── utils/                # 【工具集】fast-safe-stringify
       ├── store/
   └── index.ts              # 【核心基建】BaseStore/AsyncBatchedStore/InMemoryStore
       └── cache/
           └── index.ts              # 【核心基建】BaseCache/LRUCache

   ├── checkpoint-sqlite/                # 【工具集】SQLite 检查点后端 (better-sqlite3)
   ├── checkpoint-postgres/              # 【工具集】PostgreSQL 检查点后端 (pg)
   ├── checkpoint-mongodb/               # 【工具集】MongoDB 检查点后端
   ├── checkpoint-redis/                 # 【工具集】Redis 检查点后端 (集成测试用)
   ├── checkpoint-validation/            # 【质量保证】检查点数据校验工具

   ├── sdk/                              # 【核心基建】LangGraph API 客户端 SDK
   └── src/
       ├── client.ts                 # 【核心基建】Client 主类 (HTTP/SSE/WS)
       ├── client/
   ├── stream/               # 【核心基建】ThreadStream 流式客户端 (2451行)
   ├── assistants/           # 【业务模块】Assistants CRUD API
   ├── threads/              # 【业务模块】Threads CRUD API
   ├── runs/                 # 【业务模块】Runs 执行 API
   ├── crons/                # 【业务模块】Cron Job API
   └── store/                # 【业务模块】Store API
       ├── react/                    # 【UI 视图】React useStream/useStreamState Hooks
       ├── react-ui/                 # 【UI 视图】React 服务端渲染 UI 组件
       ├── ui/                       # 【UI 视图】共享 UI 工具 (Manager/Errors)
       ├── stream/                   # 【核心基建】流处理管道 (discovery/projections)
       ├── auth/                     # 【业务模块】认证模块
       ├── logging/                  # 【工具集】日志模块
       ├── utils/                    # 【工具集】通用工具函数
       └── singletons/               # 【工具集】单例 (fetch override)

   ├── sdk-react/                        # 【UI 视图】React SDK 适配层
   ├── sdk-svelte/                       # 【UI 视图】Svelte SDK 适配层
   ├── sdk-vue/                          # 【UI 视图】Vue SDK 适配层
   ├── sdk-angular/                      # 【UI 视图】Angular SDK 适配层

   ├── langgraph-cli/                    # 【工具集】CLI 命令行工具
   └── src/                          # CLI 命令实现
   ├── langgraph-api/                    # 【工具集】API 服务端实现
   ├── src/                          # API 服务端源码
   └── tests/                        # API 测试

   ├── langgraph-cua/                    # 【业务模块】Computer Use Agent (CUA)
   ├── src/                          # CUA 核心实现
   ├── examples/                     # CUA 示例
   └── scripts/                      # CUA 脚本

   ├── langgraph-supervisor/             # 【业务模块】Supervisor Agent 模式
   ├── src/                          # Supervisor 核心实现
   ├── static/                       # 静态资源
   └── scripts/                      # Supervisor 脚本

   ├── langgraph-swarm/                  # 【业务模块】Swarm Agent 模式
   ├── src/                          # Swarm 核心实现
   ├── static/                       # 静态资源
   ├── examples/                     # Swarm 示例
   └── scripts/                      # Swarm 脚本

   ├── langgraph-ui/                     # 【UI 视图】共享 UI 组件库
   └── src/                          # UI 组件源码

   └── create-langgraph/                 # 【工具集】项目脚手架 (npm create langgraph)

├── examples/                             # 【配置】示例项目集 (22 个子项目)
   ├── quickstart/                       # 【业务模块】快速入门示例
   ├── how-tos/                          # 【业务模块】操作指南 (50+ 示例条目)
   ├── chatbots/                         # 【业务模块】聊天机器人
   ├── multi_agent/                      # 【业务模块】多 Agent 协作
   ├── rag/                              # 【业务模块】RAG 检索增强生成
   ├── agent_executor/                   # 【业务模块】Agent 执行器
   ├── plan-and-execute/                 # 【业务模块】计划→执行模式
   ├── reflection/                       # 【业务模块】反思模式
   ├── rewoo/                            # 【业务模块】ReWOO (Reasoning WithOut Observation)
   ├── streaming/                        # 【业务模块】流式传输
   ├── sql-agent/                        # 【业务模块】SQL Agent
   ├── chatbot-simulation-evaluation/     # 【业务模块】聊天模拟评估
   ├── ai-elements/                      # 【UI 视图】AI Elements 组件库
   ├── assistant-ui-claude/              # 【UI 视图】助手 UI (Claude 风格)
   ├── ui-react/                         # 【UI 视图】React UI 示例
   ├── ui-react-transport/               # 【UI 视图】React Transport 示例
   ├── ui-svelte/                        # 【UI 视图】Svelte UI 示例
   ├── ui-vue/                           # 【UI 视图】Vue UI 示例
   ├── ui-angular/                       # 【UI 视图】Angular UI 示例
   └── ui-multimodal/                    # 【UI 视图】多模态 UI 示例

├── docs/                                 # 【配置】文档站点
├── internal/                             # 【工具集】内部开发工具
   ├── bench/                            # 【工具集】性能基准测试
   ├── build/                            # 【工具集】构建工具链
   └── environment_tests/                # 【工具集】多环境兼容性测试
├── scripts/                              # 【工具集】构建与发布脚本
├── .github/                              # 【配置】CI/CD 工作流
├── .changeset/                           # 【配置】变更集管理
├── .vscode/                              # 【配置】VS Code 编辑器配置
├── reports/                              # 【配置】分析报告输出
├── package.json                          # 【配置】根 monorepo 配置
├── pnpm-workspace.yaml                   # 【配置】pnpm 工作空间定义
├── pnpm-lock.yaml                        # 【配置】依赖锁定文件
├── turbo.json                            # 【配置】Turborepo 构建配置
├── tsconfig.json                         # 【配置】TypeScript 全局配置
├── deno.json                             # 【配置】Deno 运行时配置
├── CLAUDE.md                             # 【配置】AI 辅助开发指南
├── CONTRIBUTING.md                       # 【配置】贡献指南
├── LICENSE                               # 【配置】MIT 许可证
├── README.md                             # 【配置】项目 README (symlink)
└── README-CN.md                          # 【配置】中文 README

3. 模块依赖与调用关系

3.1 全局入口与核心路由

逻辑说明:LangGraphJS 对外暴露多个子路径入口。主入口 @langchain/langgraph 导出 StateGraph、Graph 等核心 API,以及 MemorySaver(内存检查点)。子路径如 /channels/pregel/prebuilt 分别导出特定层的功能。SDK 独立包 @langchain/langgraph-sdk 提供与 LangGraph API 服务端的通信能力。

调用拓扑

text
@langchain/langgraph (libs/langgraph-core/src/web.ts)
 |
 +---> graph/ (Graph API)
 |      +---> StateGraph ────> Pregel (编译目标)
 |      +---> MessageGraph ──> Pregel (编译目标)
 |      +---> Annotation ────> Channels (状态通道工厂)
 |      +---> Graph (低层图基类)
 |
 +---> pregel/ (执行引擎)
 |      +---> PregelLoop ────> algo.ts (_applyWrites, _prepareNextTasks)
 |      +---> PregelRunner ──> retry.ts (_runWithRetry)
 |      +---> io.ts (readChannels, mapInput, mapOutput*)
 |      +---> debug.ts (printStep*)
 |      +---> write.ts (ChannelWrite)
 |      +---> read.ts (PregelNode, ChannelRead)
 |      +---> stream.ts (IterableReadableWritableStream)
 |      +---> messages.ts (StreamMessagesHandler)
 |      +---> utils/ (RetryPolicy, CachePolicy, subgraph detection)
 |
 +---> channels/ (状态通道)
 |      +---> BaseChannel (抽象基类)
 |      +---> LastValue (单值通道)
 |      +---> LastValueAfterFinish (屏障后可用)
 |      +---> Topic (发布-订阅通道)
 |      +---> BinaryOperatorAggregate (归约通道)
 |      +---> AnyValue (任意值通道)
 |      +---> EphemeralValue (临时值,不持久化)
 |      +---> NamedBarrierValue (命名屏障)
 |      +---> DynamicBarrierValue (动态屏障)
 |      +---> binop.ts (BinaryOperator 类型)
 |
 +---> func/ (函数式 API)
 |      +---> entrypoint() ────> Pregel (轻量入口)
 |      +---> task() ──────────> Pregel (任务定义)
 |
 +---> prebuilt/ (预构建组件)
 |      +---> createReactAgent()
 |      +---> ToolNode
 |      +---> AgentState (预定义状态)
 |
 +---> state/ (状态管理)
 |      +---> StateSchema
 |      +---> prebuilt/ (预构建状态)
 |      +---> values/ (值类型状态)
 |
 +---> stream/ (流式输出)
 |      +---> GraphRunStream (图运行流)
 |      +---> transformers/ (消息/值/生命周期转换器)
 |      +---> EventLog (事件日志)
 |
 +---> constants.ts (核心常量: START, END, Command, Send, INTERRUPT)
 +---> errors.ts (错误类型层次)

@langchain/langgraph-checkpoint (libs/checkpoint/src/)
 |
 +---> base.ts ────> Checkpoint, emptyCheckpoint, BaseCheckpointSaver
 +---> types.ts ───> PendingWrite, CheckpointMetadata, CheckpointTuple
 +---> serde/ ─────> SerializerProtocol, JsonPlusSerializer
 +---> store/ ─────> BaseStore, AsyncBatchedStore, InMemoryStore
 +---> cache/ ─────> BaseCache, LRUCache
 +---> memory.ts ──> MemorySaver (内存检查点实现)

@langchain/langgraph-sdk (libs/sdk/src/)
 |
 +---> client.ts ────────> Client (HTTP/SSE/WS 客户端)
 +---> client/stream/ ───> ThreadStream, MessageAssembler
 +---> client/assistants/> Assistant API
 +---> client/threads/ ──> Thread API
 +---> client/runs/ ─────> Run API
 +---> client/crons/ ────> Cron Job API
 +---> client/store/ ────> Store API
 +---> auth/ ────────────> 认证模块
 +---> react/ ───────────> useStream() / useStreamState() Hooks
 +---> react-ui/ ────────> UI 服务端渲染组件
 +---> ui/ ──────────────> 共享 UI 工具
 +---> stream/ ───────────> 流处理管道

2.2 核心业务实体与关联

实体定义

  • Graph:工作流的有向图定义,包含节点(Node)和边(Edge)
  • StateGraph:带共享状态的有向图,节点可读写状态
  • Channel:状态的原子存储单元,有类型化的 ValueType / UpdateType / CheckpointType
  • Checkpoint:整个图在某时刻的状态快照,包含所有 channel_values 和 channel_versions
  • Pregel:编译后的可执行图,负责协调执行
  • Task:Pregel 调度执行的单个工作单元
  • Thread:一次会话/执行实例,有其独立的检查点链
  • Run:一次具体的图调用/执行

实体引用拓扑

text
[StateGraph] 1 ----> 1 [CompiledGraph]
                              |
                              | 编译为
                              v
[CompiledGraph] 1 ----> 1 [Pregel]
                              |
                              | 管理 N 个
                              v
[Pregel] 1 ----> N [Channel]
                              |
                              | 每超步持久化
                              v
[Pregel] 1 ----> N [Checkpoint]
                              |
                              | 调度 N 个
                              v
[Pregel] 1 ----> N [Task] (per superstep)

[Thread] 1 ----> N [Checkpoint] (历史链)
[Thread] 1 ----> N [Run] (执行记录)

[BaseCheckpointSaver] 1 ----> N [Checkpoint] (持久化读写)
[BaseStore] 1 ----> N [Item] (长期存储条目)

[Client (SDK)] 1 ----> N [Thread] (远程管理)
[Client (SDK)] 1 ----> N [Run] (远程执行)
[Client (SDK)] 1 ----> N [Assistant] (助手配置)

4. 核心模块详解

模块一:Pregel 执行引擎

设计说明:Pregel 是 LangGraphJS 的核心执行引擎,实现了受 Google Pregel 启发的消息传递超步执行模型。每个超步(Superstep)包含三个阶段:

  • (1) 计划阶段 - 确定哪些节点应该执行;
  • (2) 执行阶段 - 并行运行已计划的节点;
  • (3) 更新阶段 - 将节点输出写入通道并持久化检查点。

关键设计模式包括:策略模式(可插拔的 checkpoint/retry/cache 策略)、观察者模式(stream events)、状态机模式(中断/恢复/错误处理)。

内部结构图

text
+-----------------------------------------------------------------------+
|                        Pregel 执行引擎                                 |
|                                                                       |
|  Pregel.invoke(input, config)                                         |
|       |                                                               |
|       v                                                               |
|  +---------------------------+                                        |
|  | PregelLoop (超步循环)      |                                        |
|  |                           |                                        |
|  |  for each superstep:      |                                        |
|  |  1. _prepareNextTasks()   |──> algo.ts                            |
|  |     - 根据通道更新确定      |     |                                  |
|  |       哪些节点被触发        |     v                                  |
|  |     - 生成 Task 列表       |  检查 trigger_to_nodes                 |
|  |                           |  映射 + channel_versions               |
|  |  2. PregelRunner.tick()   |──> runner.ts                          |
|  |     - 并行执行所有 Task     |     |                                  |
|  |     - 收集 PendingWrite    |     v                                  |
|  |                           |  [_runWithRetry] + [AbortSignal]        |
|  |  3. _applyWrites()        |──> algo.ts                            |
|  |     - 将 writes 应用到      |     |                                  |
|  |       对应 Channel.update() |     v                                  |
|  |     - 更新 channel_versions |  各 Channel.update(values)            |
|  |                           |                                        |
|  |  4. checkpoint.save()     |──> BaseCheckpointSaver                |
|  |     - 持久化当前状态快照    |     |                                  |
|  |                           |     v                                  |
|  |  5. shouldInterrupt()?    |──> 检查 INTERRUPT 标记                 |
|  |     if yes: break loop    |                                        |
|  +---------------------------+                                        |
|       |                                                               |
|       v                                                               |
|  返回最终 State / 流式输出                                             |
+-----------------------------------------------------------------------+

模块二:Channels 状态通道系统

设计说明:Channels 是 LangGraph 的状态原子单元。每个 Channel 是一个泛型类 BaseChannel<ValueType, UpdateType, CheckpointType>,定义了三个核心操作:

  • update() 接收更新序列并改变内部状态;
  • get() 返回当前值;
  • checkpoint() 导出可序列化的快照。 不同的 Channel 子类实现了不同的语义:LastValue 存储最后写入的单个值(每步只接收一个值)、Topic 是发布/订阅模型(支持去重和累积)、BinaryOperatorAggregate 支持自定义归约函数。这种设计允许框架精确追踪每个状态字段的变化,实现细粒度的检查点恢复。

内部结构图

text
+-----------------------------------------------------------------------+
|                     Channels 状态通道系统                              |
|                                                                       |
|  +------------------------------------------------------------------+ |
|  |               BaseChannel<V, U, C>  (抽象基类)                    | |
|  |  + fromCheckpoint(checkpoint?: C): this                          | |
|  |  + update(values: U[]): boolean                                  | |
|  |  + get(): V                                                      | |
|  |  + checkpoint(): C | undefined                                   | |
|  |  + consume(): boolean                                            | |
|  +----+-------------------+-------------------+--------------------+  |
|       |                   |                   |                       |
|       v                   v                   v                       |
|  +---------+    +------------------+   +-----------------------+      |
|  |LastValue|    | Topic<V>         |   |BinaryOperatorAggregate|      |
|  |---------|    |------------------|   |-----------------------|      |
|  |单值存储  |    |values: V[]       |   |reducer: (a,b)=>a      |      |
|  |每步一值  |    |seen: Set<V>      |   |value: V               |      |
|  |最后写入  |    |unique: bool      |   |支持自定义归约函数       |      |
|  |胜出     |    |accumulate: bool   |   |如: (x,y)=>x.concat(y) |      |
|  +----+----+    +--------+---------+   +-----------+-----------+      |
|       |                  |                         |                  |
|       v                  v                         v                  |
|  +---------+    +------------------+   +-----------------------+      |
|  |AnyValue |    |NamedBarrierValue |   |DynamicBarrierValue    |      |
|  |---------|    |------------------|   |-----------------------|      |
|  |任意类型  |    |等待指定节点集合   |    |等待动态确定的节点集合   |      |
|  |不做限制  |    |全部完成后可读     |    |用于并行任务同步         |      |
|  +---------+    +------------------+   +-----------------------+      |
|                                                                       |
|  +------------------+    +------------------+                         |
|  | EphemeralValue   |    |LastValueAfterFinish|                       |
|  |------------------|    |---------------------|                       |
|  | 临时值,不持久化  |    | 超步完成后才可读     |                      |
|  | 每次超步重置      |    | 读取后自动清空       |                      |
|  +------------------+    +---------------------+                      |
+-----------------------------------------------------------------------+

模块三:Graph 定义与编译

设计说明:Graph 层是用户直接交互的 API 层。StateGraph 是最核心的类,它通过声明式 API 让用户定义节点(addNode)、边(addEdge)、条件边(addConditionalEdges),然后通过 compile() 方法生成 CompiledStateGraph。编译过程将用户定义的图结构转换为 Pregel 可执行的内部表示。Annotation API 提供类型安全的状态定义,每个字段可指定 reducer 和默认值。MessageGraph 是专门为消息状态优化的简化接口。

内部结构图

text
+-----------------------------------------------------------------------+
|                    Graph 定义与编译流程                                |
|                                                                       |
|  用户代码:                                                             |
|  const workflow = new StateGraph(MyAnnotation)                        |
|    .addNode("nodeA", myFunc)                                          |
|    .addNode("nodeB", otherFunc)                                       |
|    .addEdge(START, "nodeA")                                           |
|    .addConditionalEdges("nodeA", router, {x: "nodeB", y: END})        |
|    .compile({ checkpointer: mySaver })                                |
|                                                                       |
|  +------------------------------------------------------------------+ |
|  | StateGraph.compile()                                             | |
|  |   |                                                              | |
|  |   v                                                              | |
|  | 1. 验证图结构 (无孤立节点、无循环依赖)                              | |
|  | 2. 为每个状态字段创建对应 Channel                                   | |
  |    - Annotation.spec[key] ──> Channel 实例                        | |
|  | 3. 创建内部通道:                                                   | |
|  |    - __start__: EphemeralValue                                   | |
|  |    - __end__: LastValue                                          | |
|  |    - branch:to:nodeX: EphemeralValue (条件路由)                   | |
|  |    - 每个状态字段: LastValue / Topic (根据 reducer)               | |
|  | 4. 构建 Pregel 节点 (PregelNode):                                 | |
  |    - 每个用户节点包装为 PregelNode                                 | |
|  |    - 注入 ChannelRead (输入通道) 和 ChannelWrite (输出通道)        | |
|  | 5. 构建 trigger_to_nodes 映射:                                    | |
|  |    - channel_name ──> [被触发的节点列表]                           | |
|  | 6. 生成 CompiledStateGraph (持有 Pregel 实例)                      | |
|  +------------------------------------------------------------------+ |
|       |                                                               |
|       v                                                               |
|  CompiledStateGraph.invoke(state, config)                             |
|       |                                                               |
|       v                                                               |
|  Pregel.invoke(input, config)  ──> 进入 Pregel 执行引擎               |
+-----------------------------------------------------------------------+

模块四:Checkpoint 持久化系统

设计说明:Checkpoint 系统负责 Agent 状态的持久化和恢复。系统核心是 Checkpoint 数据结构(包含 channel_values 和 channel_versions)和 BaseCheckpointSaver 抽象接口。每次超步结束后,框架自动调用 checkpointer.put() 保存当前快照。BaseStore 提供独立的长期键值存储(与检查点正交),用于跨会话的持久记忆。Cache 层提供热点数据缓存,减少数据库查询。序列化层(Serde)处理复杂类型的序列化/反序列化,支持自定义序列化器。

内部结构图

text
+-----------------------------------------------------------------------+
|                    Checkpoint 持久化系统                               |
|                                                                       |
|  +------------------------------------------------------------------+ |
  |  Checkpoint 结构:                                                 | |
  |  {                                                                | |
  |    v: 4,                  // 格式版本                              | |
  |    id: "uuid6",           // 检查点 ID                             | |
  |    ts: "ISO timestamp",   // 时间戳                                | |
|  |    channel_values: {     // 各通道当前值                           | |
  |      "messages": [...],                                           | |
  |      "agent_status": "idle",                                      | |
  |    },                                                             | |
|  |    channel_versions: {   // 各通道版本号 (用于冲突检测)            | |
  |      "messages": 5,                                               | |
  |      "agent_status": 3,                                           | |
  |    },                                                             | |
|  |    versions_seen: {      // 每个节点看到的版本 (用于触发判断)      | |
  |      "nodeA": { "messages": 3, "agent_status": 1 },              | |
  |    }                                                              | |
  |  }                                                                | |
  +------------------------------------------------------------------+ |
                                                                       |
  +------------------------------------------------------------------+ |
|  |  BaseCheckpointSaver (抽象接口)                                  | |
  |  + getTuple(config): CheckpointTuple | undefined                  | |
  |  + list(config, options): AsyncGenerator<CheckpointTuple>         | |
|  |  + put(config, checkpoint, metadata, newVersions):               | |
  | |      Promise<{configurable: ...}>                               | |
  | |  + putWrites(config, writes, taskId): Promise<void>             | |
  | |  + deleteThread(threadId): Promise<void>                        | |
  +--+---------------------------------------------------------------+ |
     |                                                                 |
     v                  v                 v                 v          |
  +------+    +-------------+    +----------+    +--------+            |
|  |Memory|    |SqliteSaver  |    |PostgresSaver|  |MongoDBSaver|       |
|  |Saver |    |(better-     |    |(pg driver)  |  |(mongodb)   |       |
  |------|    | sqlite3)    |    |            |    |           |       |
|  |内存  |    |本地文件     |    |远程数据库   |    |文档数据库  |       |
|  +------+    +-------------+    +----------+    +--------+           |
                                                                       |
  +------------------------------------------------------------------+ |
|  |  BaseStore (长期键值存储)                                        | |
  |  + put(namespace, key, value): Promise<void>                      | |
  |  + get(namespace, key): Promise<Item | null>                      | |
|  |  + search(namespace, filter): AsyncGenerator<Item>               | |
  |  + listNamespaces(): AsyncGenerator<NamespacePath>                | |
  +------------------------------------------------------------------+ |
                                                                       |
  +------------------------------------------------------------------+ |
|  |  Cache (缓存层)                                                  | |
|  |  - 缓存热点检查点和 PendingWrite                                  | |
|  |  - 减少数据库往返                                                | |
|  |  - 支持 TTL 和 LRU 淘汰策略                                       | |
  +------------------------------------------------------------------+ |
+-----------------------------------------------------------------------+

模块五:SDK 客户端与流式传输

设计说明@langchain/langgraph-sdk 是独立于核心库的客户端包,用于与 LangGraph API 服务端通信。ThreadStream 是核心抽象,封装了 SSE/WebSocket 协议的流式交互。MessageAssembler 将碎片化的流式消息组装为完整结构。SDK 提供 React(useStream/useStreamState)、Svelte、Vue 的多框架 Hook。内部使用 p-queue 控制并发,p-retry 实现自动重试。

内部结构图

text
+-----------------------------------------------------------------------+
|                    SDK 客户端架构                                      |
|                                                                       |
  +------------------------------------------------------------------+ |
  |  Client (HTTP 客户端)                                             | |
|  |  + assistants: AssistantsClient ──> CRUD 助手配置                 | |
|  |  + threads: ThreadsClient ────────> CRUD 会话线程                 | |
|  |  + runs: RunsClient ──────────────> 创建/监控执行                  | |
|  |  + crons: CronsClient ────────────> 定时任务管理                   | |
|  |  + store: StoreClient ────────────> 长期存储操作                   | |
  +---------------------------+--------------------------------------+ |
                              |                                        |
                              v                                        |
  +------------------------------------------------------------------+ |
  |  ThreadStream (流式客户端)                                        | |
  |  + subscribe(options): ──> 订阅流式事件                            | |
  |    - TransportAdapter (SSE / WebSocket)                           | |
  |    - MultiCursorBuffer (多光标缓冲)                                | |
  |    - MessageAssembler (消息组装)                                  | |
  |    - MediaAssembler (多媒体组装)                                   | |
  |    - ToolCallAssembler (工具调用组装)                              | |
  |    - SubgraphDiscoveryHandle (子图发现)                            | |
  |    - StreamingMessageHandle (流式消息句柄)                         | |
  +---------------------------+--------------------------------------+ |
                              |                                        |
                              v                                        |
  +------------------------------------------------------------------+ |
  |  框架适配层                                                       | |
  |  + react/  ──> useStream() Hook, useStreamState() Hook           | |
  |  + react-ui/ ──> 服务端渲染 UI 组件                               | |
  |  + sdk-react/ ──> React 包装 (npm: @langchain/langgraph-sdk-react)| |
  |  + sdk-svelte/ ─> Svelte 包装                                    | |
  |  + sdk-vue/ ────> Vue 包装                                       | |
  |  + sdk-angular/ > Angular 包装                                   | |
  +------------------------------------------------------------------+ |
+-----------------------------------------------------------------------+

5. 关键数据流程

场景一:StateGraph 完整执行流程

场景说明:用户构建一个包含两个节点(agent → tools)的 StateGraph,编译后调用 invoke()。展示从输入到输出的完整执行流程,包括通道初始化、节点调度、检查点持久化、中断处理等关键环节。

场景二:检查点恢复流程(Human-in-the-Loop)

场景说明:Agent 执行到人工审批节点时中断,用户检查状态后通过 Command 恢复执行。展示中断→检查状态→恢复→继续执行的完整流程。

场景三:流式传输与 SDK 交互

场景说明:前端通过 SDK 的 ThreadStream 订阅 LangGraph API 的流式事件,展示 SSE 连接建立、事件流式接收、消息组装和 UI 更新的完整链路。

6. 接口与契约规范

6.1 核心内部模块契约

typescript
/**
 * 状态通道基础契约 - 所有通道类型的抽象基类
 */
export abstract class BaseChannel<
  ValueType = unknown,
  UpdateType = unknown,
  CheckpointType = unknown,
> {
  abstract lc_graph_name: string;
  lg_is_channel: true;

  // 从检查点恢复通道状态
  abstract fromCheckpoint(checkpoint?: CheckpointType): this;

  // 应用一组更新到通道
  // @throws {InvalidUpdateError} 当更新序列无效时
  abstract update(values: UpdateType[]): boolean;

  // 获取当前通道值
  // @throws {EmptyChannelError} 当通道为空时
  abstract get(): ValueType;

  // 导出可序列化的检查点快照
  abstract checkpoint(): CheckpointType | undefined;

  // 标记当前值为已消费 (默认 no-op)
  consume(): boolean;
}
typescript
/**
 * 检查点持久化契约
 */
export interface BaseCheckpointSaver<
  N extends string = string,
  C extends string = string,
> {
  // 获取指定配置对应的检查点
  getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined>;

  // 列出某线程的检查点历史
  list(
    config: RunnableConfig,
    options?: CheckpointListOptions
  ): AsyncGenerator<CheckpointTuple>;

  // 保存检查点
  put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    newVersions: ChannelVersions
  ): Promise<{ configurable: Record<string, unknown> }>;

  // 保存待处理的写入
  putWrites(
    config: RunnableConfig,
    writes: PendingWrite[],
    taskId: string
  ): Promise<void>;

  // 删除线程的所有检查点
  deleteThread(threadId: string): Promise<void>;
}
typescript
/**
 * 长期存储契约
 */
export abstract class BaseStore {
  // 存储键值
  abstract put(
    namespace: string[],
    key: string,
    value: Record<string, unknown>
  ): Promise<void>;

  // 读取键值
  abstract get(
    namespace: string[],
    key: string
  ): Promise<Item | null>;

  // 搜索(支持过滤、分页)
  abstract search(
    namespace: string[],
    filter?: SearchFilter
  ): AsyncGenerator<Item>;

  // 列出命名空间
  abstract listNamespaces(
    options?: ListNamespacesOptions
  ): AsyncGenerator<NamespacePath>;
}
typescript
/**
 * Pregel 执行参数
 */
export interface PregelParams {
  channels: Record<string, BaseChannel>;
  nodes: Record<string, PregelNode>;
  triggerToNodes: Record<string, string[]>;
  inputChannels: string | string[];
  outputChannels: string | string[];
  streamChannels: string | string[];
  interruptAfterNodes?: string[];
  interruptBeforeNodes?: string[];
  checkpointer?: BaseCheckpointSaver;
  store?: BaseStore;
  cache?: BaseCache;
  debug?: boolean;
}

/**
 * 图编译选项
 */
export interface StateGraphOptions {
  checkpointer?: BaseCheckpointSaver;
  store?: BaseStore;
  cache?: BaseCache;
  interruptBefore?: string[];
  interruptAfter?: string[];
  streamTransformers?: StreamTransformer[];
}

6.2 对外 API 契约(SDK 协议 - OpenSpec 风格)

yaml
components:
  schemas:
    Run:
      type: object
      required: [run_id, thread_id, assistant_id, status]
      properties:
        run_id:
          type: string
          description: 运行唯一标识符
        thread_id:
          type: string
          description: 所属会话线程 ID
        assistant_id:
          type: string
          description: 助手配置 ID
        status:
          type: string
          enum: [pending, running, completed, failed, interrupted, cancelled]
        result:
          type: object
          description: 执行结果 (status=completed 时)
        error:
          type: string
          description: 错误信息 (status=failed 时)

    ThreadCreate:
      type: object
      properties:
        thread_id:
          type: string
          description: 可选,自定义线程 ID
        metadata:
          type: object
          description: 附加元数据

    StreamEvent:
      type: object
      properties:
        event:
          type: string
          enum: [values, messages, updates, events, debug, error, end]
        data:
          type: object
          description: 事件数据,类型取决于 event 字段

paths:
  /threads:
    post:
      summary: 创建新线程
      requestBody:
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/ThreadCreate'
      responses:
        '200':
          description: 线程创建成功
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Thread'

  /threads/{thread_id}/runs:
    post:
      summary: 在指定线程中创建执行
      parameters:
        - name: thread_id
          in: path
          required: true
          schema:
            type: string
      requestBody:
        content:
          application/json:
            schema:
              type: object
              required: [assistant_id]
              properties:
                assistant_id:
                  type: string
                input:
                  type: object
                config:
                  type: object
      responses:
        '200':
          description: 执行已创建并启动
        '409':
          description: 线程已有正在进行的执行

  /threads/{thread_id}/runs/stream:
    post:
      summary: 流式执行 (SSE)
      parameters:
        - name: thread_id
          in: path
          required: true
      responses:
        '200':
          description: SSE 事件流
          content:
            text/event-stream:
              schema:
                $ref: '#/components/schemas/StreamEvent'

  /threads/{thread_id}/state:
    get:
      summary: 获取线程当前状态
      parameters:
        - name: thread_id
          in: path
          required: true
      responses:
        '200':
          description: 当前状态快照
    post:
      summary: 更新线程状态(用于人工干预)

7. 快速开始

7.1 环境配置

bash
# 要求 Node.js >=18
node --version

# 安装 pnpm (如未安装)
npm install -g pnpm

# 安装依赖
pnpm install

7.2 安装与使用

bash
npm install @langchain/langgraph @langchain/core

7.3 最小示例

typescript
import { StateGraph, Annotation, END, START } from "@langchain/langgraph";
import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";

// 1. 定义状态
const State = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: (left, right) => left.concat(right),
    default: () => [],
  }),
});

// 2. 构建图
const graph = new StateGraph(State)
  .addNode("agent", async (state) => {
    return { messages: [new AIMessage("Hello! How can I help?")] };
  })
  .addEdge(START, "agent")
  .addEdge("agent", END)
  .compile();

// 3. 执行
const result = await graph.invoke({
  messages: [new HumanMessage("Hi!")],
});
console.log(result.messages);

7.4 项目构建

bash
# 构建所有包
pnpm build

# 运行测试
pnpm test

# 运行单个测试
pnpm test:single /path/to/yourtest.test.ts

# 代码检查与格式化
pnpm lint
pnpm format