Skip to content

fetch-event-source 源码解读

第一部分: 业务价值篇

项目定位

@microsoft/fetch-event-source 是微软开源的一个基于 Fetch API 的 Server-Sent Events (SSE) 客户端库,为浏览器提供了更强大、更灵活的服务器推送事件接收能力。

解决的核心痛点

原生浏览器的 EventSource API 存在以下限制:

限制说明
仅支持 GET 请求无法使用 POST、PUT 等其他 HTTP 方法
无法自定义请求头无法传递认证令牌、内容类型等自定义头部
无法发送请求体所有参数必须编码在 URL 中,受限于浏览器 URL 长度限制(通常 2000 字符)
重试策略不可控连接断开时浏览器自动重试但无法自定义重试逻辑
错误处理能力有限无法优雅处理 API 网关返回的错误响应

技术亮点

技术亮点说明
基于 Fetch API充分利用 Fetch API 的所有特性(自定义方法、头部、请求体等)
完全兼容 SSE 格式与标准 Event Stream 格式 100% 兼容
灵活的重试策略完全掌控连接断开时的重试逻辑
页面可见性集成自动在页面隐藏时断开连接、显示时重连,减少服务器负载
可替换的 fetch 实现支持注入自定义 fetch 函数
TypeScript 原生支持完整的类型定义

适用场景

场景说明
实时数据推送股票行情、实时新闻、即时消息
进度更新长时间运行任务的进度追踪
日志流实时日志查看和监控
协作应用多人协作时的实时状态同步
AI 应用大语言模型的流式响应输出

典型使用 Case

Case 1: 基础 SSE 连接

typescript
import { fetchEventSource } from '@microsoft/fetch-event-source';

await fetchEventSource('/api/sse', {
    onmessage(ev) {
        console.log(ev.data);
    }
});

Case 2: 带认证和自定义请求

typescript
const ctrl = new AbortController();
fetchEventSource('/api/sse', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer token123'
    },
    body: JSON.stringify({ query: 'data' }),
    signal: ctrl.signal,
    onmessage(msg) {
        // 处理消息
    }
});

Case 3: 自定义错误处理和重试

typescript
class RetriableError extends Error { }
class FatalError extends Error { }

fetchEventSource('/api/sse', {
    async onopen(response) {
        if (response.ok && response.headers.get('content-type') === 'text/event-stream') {
            return;
        } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
            throw new FatalError();
        } else {
            throw new RetriableError();
        }
    },
    onerror(err) {
        if (err instanceof FatalError) {
            throw err; // 停止操作
        }
        // 返回重试间隔(毫秒),undefined 使用默认 1 秒
        return 2000;
    }
});

技术栈分析

技术版本用途
TypeScript^4.2.4开发语言
Jasmine^4.3.1测试框架
ES2017-目标 ECMAScript 版本

选型决策分析

1、为什么选择 TypeScript?

  • 类型安全: 为 API 提供完整的类型定义,提升开发体验
  • 生态友好: TypeScript 在前端生态中已成为主流
  • 维护性: 类型系统有助于长期维护和重构

2、为什么基于 Fetch API 而非 XMLHttpRequest?

  • 现代标准: Fetch API 是现代浏览器的标准 API
  • Promise 支持: 更优雅的异步处理方式
  • 可扩展性: Request/Response 对象设计更灵活
  • 流式处理: 原生支持 ReadableStream

3、为什么不直接使用 WebSocket?

  • SSE 兼容性: 现有 SSE 服务器无需修改即可使用
  • 单向通信: 对于仅需服务器推送的场景,SSE 更轻量
  • HTTP 友好: 使用标准 HTTP,更容易通过防火墙和代理
  • 自动重连: SSE 本身就有重连机制,此库进一步增强

项目结构初览

shell
src/
├── fetch.ts        # 核心实现: fetchEventSource 函数
├── parse.ts        # 解析器: SSE 格式解析
├── index.ts        # 入口: 导出公共 API
└── parse.spec.ts   # 测试: 解析器单元测试

完整目录树:

shell
fetch-event-source/
├── .github/                    # GitHub 配置目录
   └── workflows/              # GitHub Actions 工作流
       └── node.js.yml         # Node.js CI 配置
├── src/                        # 源代码主目录
   ├── fetch.ts                # fetchEventSource 主实现 - 连接管理、重试、页面可见性
   ├── index.ts                # 模块入口文件 - 统一导出公共 API
   ├── parse.spec.ts           # 解析器单元测试
   └── parse.ts                # 事件流解析器 - SSE 格式解析流水线
├── .gitignore                  # Git 忽略文件
├── .npmignore                  # NPM 发布忽略文件
├── .nycrc                      # 测试覆盖率配置
├── CHANGELOG.md                # 变更日志
├── CODE_OF_CONDUCT.md          # 行为准则  
├── CONTRIBUTING.md             # 贡献指南
├── LICENSE                     # MIT 许可证
├── README.md                   # 项目说明文档(英文)
├── SECURITY.md                 # 安全政策
├── jasmine.json                # Jasmine 测试框架配置
├── package-lock.json           # NPM 依赖锁定文件
├── package.json                # NPM 项目配置 - dual-package (CJS+ESM)
├── tsconfig.esm.json           # TypeScript ESM 构建配置
└── tsconfig.json               # TypeScript 主配置 - strict 模式

核心模块职责

模块职责
fetch.ts主 API 实现,连接管理、重试逻辑、页面可见性处理
parse.tsSSE 协议解析,字节流 → 行 → 消息的转换
index.ts模块入口,统一导出

依赖关系

外部依赖

  • 无运行时依赖: 这是一个零依赖的轻量级库
  • 仅开发依赖: TypeScriptJasminerimrafsource-map-support

内部依赖

shell
index.ts fetch.ts parse.ts

         parse.ts (直接导出类型)

构建输出

  • CJS: CommonJS 格式 (lib/cjs/index.js)
  • ESM: ES Module 格式 (lib/esm/index.js)
  • 类型: TypeScript 类型定义 (lib/cjs/index.d.ts)

支持 dual-package 发布,同时兼容 CommonJS 和 ES Module。

第二部分: 架构设计篇

系统整体架构

markdown
                        ┌─────────────────────────────────────────────────────────────┐
                        │                    fetchEventSource API                     │
                        │                     (入口: src/index.ts)                     │
                        └────────────────────────────┬────────────────────────────────┘


                        ┌─────────────────────────────────────────────────────────────┐
                        │                   核心控制器 (src/fetch.ts)                 │
                        │  ┌───────────────────────────────────────────────────────┐  │
                        │  │  create()           - 创建/重试连接的主循环            │  │
                        │  │  dispose()          - 清理资源                          │  │
                        │  │  onVisibilityChange() - 页面可见性变化处理              │  │
                        │  └───────────────────────────────────────────────────────┘  │
                        └────────────────────────────┬────────────────────────────────┘

              ┌──────────────────────────────────────┼──────────────────────────────────────┐
              │                                      │                                      │
              ▼                                      ▼                                      ▼
┌─────────────────────────┐        ┌─────────────────────────┐        ┌─────────────────────────┐
│   页面可见性监听        │        │   Fetch 请求处理          │        │   重试策略控制          │
│  - visibilitychange     │        │  - 自定义方法/头部/体    │        │  - 可配置重试间隔       │
│  - 自动断开/重连        │        │  - Last-Event-ID 头部     │        │  - 错误分类处理         │
└─────────────────────────┘        └─────────────────────────┘        └─────────────────────────┘
              │                                      │                                      │
              └──────────────────────────────────────┼──────────────────────────────────────┘


                        ┌─────────────────────────────────────────────────────────────┐
                        │                  事件流解析器 (src/parse.ts)                 │
                        │  ┌───────────────────────────────────────────────────────┐  │
                        │  │  getBytes()    - 读取字节流                           │  │
                        │  │  getLines()    - 解析行缓冲                           │  │
                        │  │  getMessages() - 组装完整消息                          │  │
                        │  └───────────────────────────────────────────────────────┘  │
                        └─────────────────────────────────────────────────────────────┘


                        ┌─────────────────────────────────────────────────────────────┐
                        │                    用户回调函数                              │
                        │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐  │
                        │  │ onopen   │ │ onmessage│ │ onclose  │ │ onerror  │  │
                        │  └──────────┘ └──────────┘ └──────────┘ └──────────┘  │
                        └─────────────────────────────────────────────────────────────┘

核心模块设计

1. fetch.ts - 主控制器模块

职责:

  • 提供 fetchEventSource 主 API 函数
  • 管理连接生命周期
  • 处理重试逻辑
  • 集成页面可见性 API
  • 协调请求/响应处理

核心数据结构:

typescript
interface FetchEventSourceInit extends RequestInit {
    headers?: Record<string, string>;
    onopen?: (response: Response) => Promise<void>;
    onmessage?: (ev: EventSourceMessage) => void;
    onclose?: () => void;
    onerror?: (err: any) => number | null | undefined | void;
    openWhenHidden?: boolean;
    fetch?: typeof fetch;
}

关键函数流程:

text
                    fetchEventSource() 入口


┌─────────────────────────────────────────────────────────┐
│  初始化阶段:                                             │
│  1. 复制并设置默认请求头 (Accept: text/event-stream)    │
│  2. 设置页面可见性监听器 (如果 openWhenHidden=false)    │
│  3. 初始化重试间隔 (默认 1000ms)                       │
│  4. 监听输入 AbortSignal                                 │
└────────────────────────────┬────────────────────────────┘


                    ┌───────────────┐
                    │  create() 函数 │ ←────────┐
                    └───────┬───────┘          │
                            │                    │
        ┌───────────────────┼───────────────────┐
        │                   │                   │
        ▼                   ▼                   ▼
┌──────────────┐   ┌──────────────┐   ┌──────────────┐
│  发起请求    │   │  验证响应     │   │  解析事件流   │
│  fetch()     │   │  onopen()     │   │  getBytes()  │
└──────┬───────┘   └──────┬───────┘   └──────┬───────┘
       │                   │                   │
       └───────────────────┼───────────────────┘


                ┌──────────────────┐
                │  成功/错误处理   │
                └────────┬─────────┘

              ┌──────────┴──────────┐
              │                     │
              ▼                     ▼
        正常完成              异常处理
        (resolve)         (onerror 决定是否重试)


                    需要重试? ──是──→ 重试 create()



                        终止 (reject)

2. parse.ts - 事件流解析模块

职责:

  • 解析 SSE 格式的字节流
  • 处理分块传输编码
  • 组装完整的 EventSourceMessage 对象

解析流水线:

text
  ReadableStream<Uint8Array>


    ┌───────────────┐
    │   getBytes()  │  - 从流中读取字节块
    └───────┬───────┘


    ┌───────────────┐
    │   getLines()  │  - 解析行缓冲,处理 \r, \n, \r\n
    └───────┬───────┘  - 识别字段名位置 (第一个冒号)


    ┌───────────────┐
    │ getMessages() │  - 组装完整消息
    └───────┬───────┘  - 处理多字段: id, event, data, retry
            │          - 空行触发消息回调

  EventSourceMessage

核心数据结构:

typescript
interface EventSourceMessage {
    id: string;           // 事件 ID
    event: string;        // 事件类型
    data: string;         // 事件数据
    retry?: number;       // 重试间隔(毫秒)
}

行解析状态机:

text
    ┌─────────────────────────────────────────────────────────────────┐
    │                    getLines() 状态机                             │
    └─────────────────────────────────────────────────────────────────┘

            初始状态: buffer=undefined, position=0, fieldLength=-1

                ┌───────────┐
                │ 新字节块  │
                └─────┬─────┘

                ┌─────▼─────────────────────────────────┐
                │ buffer 存在?                            │
                ├───────────┬───────────────────────────┤
                │   否      │           是               │
                ▼           ▼                           │
            buffer = arr  buffer = concat(buffer, arr)  │
            position = 0                                 │
            fieldLength = -1                             │
                │                                        │
                └──────────────┬─────────────────────────┘

                ┌──────────────▼─────────────────────────┐
                │  while position < bufLength:           │
                │                                         │
                │  ┌──────────────────────────────────┐ │
                │  │ 处理 discardTrailingNewline      │ │
                │  │ (跳过前一个 \r 后面的 \n)        │ │
                │  └──────────────┬───────────────────┘ │
                │                 │                       │
                │  ┌──────────────▼───────────────────┐ │
                │  │ 查找行尾 (\r 或 \n)               │ │
                │  │ 同时记录第一个冒号位置 (fieldLength)│ │
                │  └──────────────┬───────────────────┘ │
                │                 │                       │
                │  ┌──────────────▼───────────────────┐ │
                │  │  找到行尾?                         │ │
                │  ├───────────┬───────────────────────┤ │
                │  │   否      │           是           │ │
                │  ▼           ▼                       │ │
                │  break       onLine(line, fieldLength)│ │
                │  (等待下一块) lineStart = position    │ │
                │               fieldLength = -1        │ │
                │  └──────────────┬───────────────────┘ │
                └─────────────────┼─────────────────────┘

                ┌─────────────────▼─────────────────────┐
                │  处理剩余 buffer:                       │
                │  - 全部处理完: buffer = undefined      │
                │  - 部分处理: buffer = subarray(lineStart)│
                └───────────────────────────────────────┘

核心数据流转时序

正常消息接收流程

mermaid
sequenceDiagram
    participant U as 用户代码
    participant F as fetchEventSource
    participant R as Response
    participant B as getBytes
    participant L as getLines
    participant M as getMessages
    participant C as 用户回调

    U->>F: fetchEventSource(url, options)
    F->>F: create()
    F->>R: fetch(input, init)
    R-->>F: Response
    F->>C: onopen(response)
    C-->>F: void
    F->>B: getBytes(response.body, onChunk)
    loop 字节流循环
        B->>B: reader.read()
        B-->>L: onChunk(arr)
        loop 行解析循环
            L-->>M: onLine(line, fieldLength)
            alt 非空行
                M->>M: 解析字段 (id/event/data/retry)
            else 空行
                M-->>C: onmessage(msg)
                M->>M: 创建新消息
            end
        end
    end
    B-->>F: 流关闭
    F->>C: onclose()
    F->>F: dispose()
    F-->>U: resolve()

重试流程

mermaid
sequenceDiagram
    participant U as 用户代码
    participant F as fetchEventSource
    participant R as Response
    participant C as 用户回调

    U->>F: fetchEventSource(...)
    F->>F: create()
    F->>R: fetch(...)

    alt 请求失败
        R-->>F: Error
        F->>C: onerror(err)
        alt 需要重试
            C-->>F: 返回重试间隔
            F->>F: setTimeout(create, interval)
            Note over F: 等待指定时间后重试
            F->>F: create()
            F->>R: fetch(..., Last-Event-ID: id)
        else 致命错误
            C-->>F: 抛出异常
            F->>F: dispose()
            F-->>U: reject(err)
        end
    else 响应成功但连接断开
        R-->>F: Response
        F->>C: onopen()
        F->>F: 解析事件流...
        F->>C: onclose()
        F->>C: onerror(undefined)
        alt 需要重试
            C-->>F: 返回重试间隔
            F->>F: setTimeout(create, interval)
        else 停止
            C-->>F: 抛出异常
            F->>F: dispose()
            F-->>U: reject()
        end
    end

页面可见性变化流程

mermaid
sequenceDiagram
    participant D as Document
    participant F as fetchEventSource
    participant R as Request

    Note over F: openWhenHidden = false (默认)
    F->>D: addEventListener('visibilitychange')

    D->>F: visibilitychange (hidden)
    F->>F: curRequestController.abort()
    F->>R: 取消请求

    D->>F: visibilitychange (visible)
    F->>F: create()
    F->>R: fetch(..., Last-Event-ID: id)

接口契约定义

公共 API

typescript
/**
 * 发起一个 Server-Sent Events (SSE) 请求
 * @param input 请求 URL 或 Request 对象
 * @param init 配置选项
 * @returns Promise,在连接正常关闭时 resolve,出错时 reject
 */
export function fetchEventSource(
    input: RequestInfo,
    init: FetchEventSourceInit
): Promise<void>;

/**
 * SSE 响应的 Content-Type
 */
export const EventStreamContentType = 'text/event-stream';

/**
 * 配置选项接口,继承自 RequestInit
 */
export interface FetchEventSourceInit extends RequestInit {
    /**
     * 请求头,仅支持 Record<string, string> 格式
     */
    headers?: Record<string, string>;

    /**
     * 收到响应时调用,用于验证响应是否符合预期
     * 如果未提供,默认验证 content-type 为 text/event-stream
     * @throws 如果响应不符合预期,抛出异常
     */
    onopen?: (response: Response) => Promise<void>;

    /**
     * 收到消息时调用
     * 注意:与默认 EventSource.onmessage 不同,此回调会被所有事件调用,
     * 包括带有自定义 event 字段的事件
     */
    onmessage?: (ev: EventSourceMessage) => void;

    /**
     * 响应结束时调用
     * 如果不期望服务器关闭连接,可以在这里抛出异常并通过 onerror 重试
     */
    onclose?: () => void;

    /**
     * 发生任何错误时调用(请求/处理消息/回调等)
     * 用于控制重试策略:
     * - 如果错误是致命的,在回调内重新抛出错误以停止整个操作
     * - 否则,可以返回一个间隔(毫秒),请求将在该间隔后自动重试
     * - 如果未指定此回调或返回 undefined,所有错误都被视为可重试,1秒后重试
     */
    onerror?: (err: any) => number | null | undefined | void;

    /**
     * 如果为 true,即使文档隐藏也保持请求打开
     * 默认情况下,fetchEventSource 会在文档隐藏时关闭请求,
     * 并在文档再次可见时自动重新打开
     */
    openWhenHidden?: boolean;

    /**
     * 使用的 Fetch 函数,默认为 window.fetch
     */
    fetch?: typeof fetch;
}

/**
 * 表示事件流中的一条消息
 * https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format
 */
export interface EventSourceMessage {
    /** 要设置到 EventSource 对象的最后事件 ID 值的事件 ID */
    id: string;
    /** 标识所描述事件类型的字符串 */
    event: string;
    /** 事件数据 */
    data: string;
    /** 重试连接前等待的重连间隔(毫秒) */
    retry?: number;
}

内部解析 API

typescript
/**
 * 将 ReadableStream 转换为回调模式
 * @param stream 输入 ReadableStream
 * @param onChunk 对每个新字节块调用的函数
 * @returns Promise,在流关闭时 resolve
 */
export async function getBytes(
    stream: ReadableStream<Uint8Array>,
    onChunk: (arr: Uint8Array) => void
): Promise<void>;

/**
 * 解析任意字节块为 EventSource 行缓冲
 * 每行格式应为 "field: value",以 \r、\n 或 \r\n 结尾
 * @param onLine 对每个新 EventSource 行调用的函数
 * @returns 应该对每个输入字节块调用的函数
 */
export function getLines(
    onLine: (line: Uint8Array, fieldLength: number) => void
): (arr: Uint8Array) => void;

/**
 * 解析行缓冲为 EventSourceMessages
 * @param onId 对每个 id 字段调用的函数
 * @param onRetry 对每个 retry 字段调用的函数
 * @param onMessage 对每条消息调用的函数
 * @returns 应该对每个输入行缓冲调用的函数
 */
export function getMessages(
    onId: (id: string) => void,
    onRetry: (retry: number) => void,
    onMessage?: (msg: EventSourceMessage) => void
): (line: Uint8Array, fieldLength: number) => void;