Skip to content

流式 HTTP 与 SSE 协议:从概念辨析到生产实践

摘要总结: 针对开发者容易混淆流式 HTTP 与 SSE 协议的问题,从概念层次出发厘清二者本质区别与协作关系,指出流式 HTTP 是底层传输机制而 SSE 是应用层数据格式约定。文章依次介绍原生 EventSource API、fetch + ReadableStream 进阶方案、@microsoft/fetch-event-source 工业级实现等三种客户端方案,并以 NestJS 为例展示服务端 SSE 响应的标准实现与 POST 请求扩展,最后涵盖指数退避重试、事件分类处理、连接状态追踪、资源清理等生产实践要点,为 AI 时代的流式响应场景提供完整技术方案。

1. 引言

Web 通信协议全景:从 HTTP 到 WebSocket 的技术选型指南中,我们对比分析了多种通信技术。在本文我们将重点探讨 SSE(Server-Sent Events)流式 HTTP(HTTP Streaming) 的实践。

1.1 概念澄清:两种技术,两个层次

很多开发者容易将 SSE 与流式 HTTP 混为一谈,实际上它们处于不同的技术层次:

维度流式 HTTPSSE
定位底层传输机制应用层协议
核心原理HTTP Chunked Transfer Encoding,数据分块连续传输基于 HTTP 的事件流协议,定义了标准化的数据格式
浏览器原生支持无,需自行解析数据流有,EventSource API
协议规范无标准格式约定W3C 标准(text/event-stream

简而言之,流式 HTTP 是一种“传输手段”,而 SSE 是一种“数据格式约定”流式 HTTP 关注的是如何把数据“边发边传”,而 SSE 关注的是如何让数据“按照固定格式组织”,便于客户端解析处理

1.2 关联与协作

尽管处于不同层次,但二者完全可以协同工作:

  • SSE 必须基于流式 HTTP:SSE 的标准格式需要通过 HTTP 流式传输来实现,因此 SSE 协议的数据流底层必然依赖流式 HTTP
  • 流式 HTTP 不必局限于 SSE:纯流式 HTTP 可以自定义数据格式(如 JSON Lines、NDJSON),不一定要遵循 SSE 的 event:id:data: 等字段约定

正是由于这种关系,我们可以利用 fetch + ReadableStream 突破 SSE 的一些局限

  1. 请求方式受限:原生 SSE 仅支持 GET 请求,当需要携带较大 body 数据(如文件上传上下文、复杂查询参数)时会受到限制,而 fetch API 不受此约束。
  2. 身份验证不便:原生 SSE 无法灵活地在请求头中添加 Authorization: Bearer <token> 等认证信息,fetch API 则可以像调用普通接口一样自定义 Header。

因此,基于流式 HTTP,采用 fetch + ReadableStream 的方案,不仅保留了 fetch 的全部能力(POST 方法、自定义 Headers、超时控制等),同时具备了流式传输的实时性,并且在数据格式层面依然遵循 SSE 协议,保持良好的可解析性和通用性。相比原生 SSE,使用 Fetch API + ReadableStream 能突破 GET 请求限制,可携带任意大小的 body 数据,同时支持灵活的身份验证方式;相比 WebSocket,则避免了维护长连接和心跳保活的额外资源损耗。

这种“取两家之长”的方案,正是 @microsoft/fetch-event-source 这类库在 AI 社区盛行的底层逻辑:既能像调用普通接口一样编写业务逻辑,又能享受流式传输的实时性,数据格式依然遵循 SSE 协议,客户端解析毫无额外成本

特性标准 EventSource (SSE)Fetch + Stream
HTTP 方法仅限 GET支持 POST、PUT 等所有方法
自定义 Header不支持(无法传 Token 等)完全支持
**请求体 **(Body)不支持支持发送 JSON Payload
重连机制浏览器内置,难以精细控制插件实现,支持高度自定义重连策略
跨域 (CORS)比较死板,需特定配置遵循标准的 Fetch CORS 规则
底层 API浏览器原生 EventSource 对象基于 fetch 和 ReadableStream

1.3 SSE 协议数据格式

SSE 协议对数据流格式有明确规定,服务器返回的响应头和数据体需遵循以下规范:

markdown
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

event: message
id: 1
data: {"text": "你好"}

event: message
id: 2
data: {"text": "世界"}

每个事件由可选的 eventid 字段和必需的 data 字段组成,事件之间以空行分隔。event 字段指定事件类型,id 字段用于断线重连时的位置标记,data 字段承载实际数据内容,可以是多行文本。

2. 原生 SSE 客户端 API

浏览器原生提供了 EventSource API 来实现 SSE 协议的数据格式规范,是 SSE 在客户端的标准方案。

2.1 EventSource API 概览

EventSource 是浏览器原生封装的 SSE 客户端实现,它负责解析 SSE 协议格式、自动重连、管理连接状态,让你无需关心底层细节。以下是一个完整的示例:

javascript
const url = 'https://example.com/api/sse';
// 通过 withCredentials 选项发送 Cookie 等凭证信息到服务器端
const eventSource = new EventSource(url, { withCredentials: true });

// onopen:连接建立成功时触发,对应 SSE 协议中 Connection: keep-alive 的完成
eventSource.onopen = (event) => {
  console.log('SSE 连接已打开:', event);
};

// onmessage:收到服务器发送的通用消息(无 event 字段或 event: message)时触发
// 对应 SSE 协议中的 data: <content>,event.data 即为 data 字段的值
eventSource.onmessage = (event) => {
  console.log('收到消息:', event.data);
};

// 通过 addEventListener 监听指定类型的消息,对应 SSE 协议中的 event: <type> 字段
// 例如服务器发送 "event: progress\ndata: 50",则触发此回调
eventSource.addEventListener('progress', (event) => {
  console.log('进度更新:', event.data);
});

// onerror:连接出错或中断时触发,EventSource 会自动尝试重连
eventSource.onerror = (event) => {
  console.error('SSE 连接错误:', event);
};

API 与 SSE 协议字段的对应关系

EventSource API触发的时机/内容对应的 SSE 协议字段
onopen连接建立成功Connection: keep-alive
onmessage收到通用消息data: <content>(无 event:event: message
addEventListener('xxx')收到指定类型消息event: xxx + data: <content>
event.lastEventId当前事件的标识id: <number>
onerror连接出错,自动重连

2.2 服务端实现示例

服务端只需返回符合 SSE 协议格式的响应即可,以下是 NodeJS 原生实现示例:

javascript
const http = require('http');

const server = http.createServer((req, res) => {
  // 设置 SSE 必需的响应头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  });

  // 每秒发送一条消息
  const interval = setInterval(() => {
    // id 字段:事件唯一标识,用于断线重连
    res.write('id: 1\n');
    // event 字段:事件类型,客户端通过 addEventListener 监听
    res.write('event: progress\n');
    // data 字段:实际承载的数据内容,可多行
    res.write('data: {"percent": 50}\n\n');
  }, 1000);

  // 清理资源
  req.on('close', () => {
    clearInterval(interval);
    res.end();
  });
});

server.listen(8080, () => {
  console.log('SSE 服务端已启动: http://localhost:8080');
});

可以看到,服务端只需要按 SSE 协议格式写入 ideventdata 字段即可,客户端的 EventSource 会自动解析这些字段并触发相应回调。

3. Fetch + ReadableStream 实现方案

如上所述,尽管原生 EventSource 已经实现了 SSE 协议解析,但其局限性(如仅支持 GET、无法自定义 Header)在实际业务中往往成为瓶颈。因此,社区基于 fetch + ReadableStream 探索出了一条更灵活的路径:既保留了流式传输的实时性,又拥有 fetch 的全部能力。

3.1 原生 Fetch + ReadableStream 实现

fetch API 返回的 Response 对象自带 .body 属性,它是一个 ReadableStream,我们可以自行解析这个数据流。以下是核心实现:

javascript
/**
 * 基于 fetch + ReadableStream 的简化 SSE 客户端
 * 
 * @param url 请求地址
 * @param options 配置选项
 * @param options.method HTTP 方法,默认 GET
 * @param options.headers 请求头对象
 * @param options.body 请求体对象(会自动 JSON.stringify)
 * @param options.onmessage 收到消息时的回调,参数为 { data }
 */
async function fetchSSE(url, options = {}) {
  // 1. 通过 fetch 发起请求,支持 POST 方法和自定义 Header
  const response = await fetch(url, {
    method: options.method || 'GET',                    // HTTP 方法
    headers: options.headers || {},                     // 自定义请求头
    body: options.body ? JSON.stringify(options.body) : undefined,  // 请求体
  });

  // 2. 获取响应体的流读取器
  // response.body 是 ReadableStream 类型,用于逐块读取响应数据
  const reader = response.body.getReader();

  // 3. 创建文本解码器,将字节块转换为字符串
  // TextDecoder 支持 stream 模式,逐块解码而非等待全部接收
  const decoder = new TextDecoder();

  // 4. 维护一个缓冲区,用于处理跨块边界的行分割
  // 由于数据可能被分割到多个 chunk 中,需要缓存不完整的行
  let buffer = '';

  // 5. 循环读取流数据,直到流结束
  while (true) {
    // read() 返回一个 Promise,resolve 为 { done, value }
    // done: true 表示流已结束,value: Uint8Array 为数据块
    const { done, value } = await reader.read();
    if (done) break;  // 流结束,退出循环

    // 将读取到的字节块解码为字符串
    // stream: true 表示这是部分数据,后面还有数据,持续累积到 buffer
    buffer += decoder.decode(value, { stream: true });

    // 按换行符分割,提取所有完整的行
    const lines = buffer.split('\n');

    // 保留最后一行(可能是不完整的,等待下次数据补充)
    // 因为最后一行后面可能还没有换行符,所以不能当作完整行处理
    buffer = lines.pop();

    // 遍历所有完整行,解析 SSE 格式
    for (const line of lines) {
      // SSE 协议中,数据行以 "data: " 开头
      if (line.startsWith('data: ')) {
        // 提取 data: 后面的内容(去掉 "data: " 前缀)
        const data = line.slice(6);
        // 触发回调,传入解析后的数据
        options.onmessage?.({ data });
      }
    }
  }
}

// 使用示例:发起 POST 请求,携带 JSON body
fetchSSE('/api/sse', {
  method: 'POST',                                              // 使用 POST 方法
  headers: {                                                  // 自定义请求头
    'Content-Type': 'application/json',                       // 内容类型
    'Authorization': 'Bearer token123'                        // 认证令牌
  },
  body: { query: '你好' },                                    // 请求体对象
  onmessage: (ev) => {                                        // 消息回调
    console.log('收到消息:', ev.data);
  }
});

这段代码演示了 fetch + ReadableStream 的核心原理:

  1. 通过 fetch 发起请求,支持 POST 方法和自定义 Header
  2. 通过 response.body.getReader() 获取流读取器
  3. 持续读取流中的数据块,用 TextDecoder 解码
  4. 按行分割数据,匹配 data: 前缀提取消息内容
  5. 通过缓冲区处理跨块边界的行分割问题

但这个简化实现存在明显问题:它只能处理单行 data: 字段,无法处理多行数据、无法解析 event:id: 字段、断行边界处理粗糙。要完整实现 SSE 协议解析,需要更精细的状态管理。

3.2 fetch-event-source:更优雅的解决方案

正是为了解决上述问题,微软开源了 @microsoft/fetch-event-source 库。它封装了完整的 SSE 协议解析逻辑,同时保留了 fetch 的全部能力,是目前 AI 社区最流行的 SSE 客户端方案。

该库解决的核心痛点

痛点原生 EventSourcefetch-event-source
HTTP 方法仅 GET支持 POST、PUT 等所有方法
自定义 Header不支持完全支持
请求体不支持支持发送 JSON Payload
重连机制浏览器内置,难以精细控制支持高度自定义的重连策略
错误处理有限可区分可重试错误与致命错误
页面可见性无感知自动在页面隐藏时断开、显示时重连

3.3 解析流水线详解

fetch-event-source 的核心在于将 ReadableStream 经过多层转换,最终输出结构化的 EventSourceMessage。其解析流水线如下:

markdown
ReadableStream(原始字节流)

getBytes()(字节读取)

getLines()(按行分割)

getMessages()(按事件组装)

EventSourceMessage(结构化消息)

Stage 1: getBytes() — 字节读取

ReadableStream 提供的是 Uint8Array 字节块,需要持续读取直到流结束:

javascript
/**
 * 将 ReadableStream 转换为字节块迭代器
 * 使用生成器函数,方便异步逐块读取
 */
async function* getBytes(readableStream) {
  // 获取流的读取器
  const reader = readableStream.getReader();
  try {
    // 循环读取,直到流结束
    while (true) {
      // read() 返回 Promise,done=true 时流已结束
      const { done, value } = await reader.read();
      if (done) break;
      // yield 产出每个字节块 Uint8Array
      yield value;
    }
  } finally {
    // 确保流读取器被释放,防止内存泄漏
    reader.releaseLock();
  }
}

Stage 2: getLines() — 按行分割

SSE 协议以换行符分隔字段,但数据可能跨越多个读取块,需要维护一个缓冲区:

javascript
/**
 * 将字节块流转换为行迭代器
 * 处理跨块边界的行分割问题
 */
async function* getLines(bytesIterable) {
  // TextDecoder 将 Uint8Array 解码为字符串
  const decoder = new TextDecoder();
  // 缓存不完整的行(末尾可能缺少换行符)
  let partialLine = '';

  // 遍历所有字节块
  for await (const chunk of bytesIterable) {
    // 解码当前块,stream: true 表示持续解码
    partialLine += decoder.decode(chunk, { stream: true });
    // 按换行符分割提取完整行
    const lines = partialLine.split('\n');
    // 最后一行可能是未完成的,保留到下次处理
    partialLine = lines.pop();

    // 产出所有完整行
    for (const line of lines) {
      yield line;
    }
  }
  // 处理最后一行(如果有)
  if (partialLine) yield partialLine;
}

Stage 3: getMessages() — 按事件组装

每条 SSE 消息由 event:id:data: 等字段组成,字段间以空行分隔。getMessages() 需要按行读取并组装完整事件:

javascript
/**
 * 将行流组装为完整事件
 * SSE 协议规定:事件之间以空行(连续两个换行)分隔
 */
async function* getMessages(linesIterable) {
  // 当前正在组装的事件对象
  let event = { id: '', event: 'message', data: '' };

  for await (const line of linesIterable) {
    // 空行表示一个事件结束,产出后重置
    if (line === '') {
      yield { ...event };  // 浅拷贝,防止对象被后续修改影响
      event = { id: '', event: 'message', data: '' };  // 重置为默认值
      continue;
    }

    // 找到冒号位置,分割字段名和字段值
    // SSE 格式:field: value,冒号后有一个空格
    const colonIndex = line.indexOf(':');
    if (colonIndex === -1) continue;  // 无效行,跳过

    // 提取字段名(如 "id", "event", "data")
    const field = line.slice(0, colonIndex);
    // 提取字段值(跳过 "field: " 中的空格,即冒号后两个字符)
    const value = line.slice(colonIndex + 2);

    // 根据字段名填充事件对象
    switch (field) {
      case 'id': event.id = value; break;
      case 'event': event.event = value; break;
      // data 字段可能多行,用换行连接
      case 'data': event.data += value + '\n'; break;
    }
  }
}

经过这三层处理,最终输出的 EventSourceMessage 包含了 ideventdata 字段,与原生 EventSource 的事件对象完全一致。

3.4 典型使用示例

typescript
// 1. 导入 fetchEventSource
import { fetchEventSource } from '@microsoft/fetch-event-source';

// 2. 创建 AbortController,用于主动取消请求
const ctrl = new AbortController();

// 3. 调用 fetchEventSource,建立 SSE 连接
fetchEventSource('/api/chat', {
  // HTTP 方法,支持 POST 可携带请求体
  method: 'POST',
  // 自定义请求头,可添加认证信息
  headers: {
    'Content-Type': 'application/json',            // 内容类型
    'Authorization': 'Bearer token123'              // 认证令牌
  },
  // 请求体数据
  body: JSON.stringify({ message: '你好,请介绍一下自己' }),

  // 4. onopen:HTTP 连接建立成功后的回调
  //    用于验证响应状态和 Content-Type
  async onopen(response) {
    // 检查响应是否成功且内容类型为 SSE 格式
    if (response.ok && response.headers.get('content-type') === 'text/event-stream') {
      return;  // 验证通过,连接建立成功
    }
    // 4xx 客户端错误(除 429 防抖限制外)为致命错误,不重试
    if (response.status >= 400 && response.status < 500 && response.status !== 429) {
      throw new FatalError();
    }
    // 其他错误(如 5xx 服务器错误)为可重试错误
    throw new RetriableError();
  },

  // 5. onmessage:收到 SSE 消息时的回调
  //    msg 对象包含 { id, event, data } 字段
  onmessage(msg) {
    console.log('收到消息:', msg.data);
  },

  // 6. onerror:连接出错时的回调
  //    返回值控制是否重试
  onerror(err) {
    if (err instanceof FatalError) {
      // 致命错误:主动取消并抛出,停止重试
      ctrl.abort();
      throw err;
    }
    // 可重试错误:返回等待时间(毫秒),底层会自动重连
    return 2000;  // 等待 2 秒后重试
  }
});

可以看到,fetchEventSource 的接口设计与原生 EventSource 类似(onopenonmessageonerror),但多了 methodheadersbody 等 fetch 原生参数,上手成本极低。

4. 生产实践要点与完整示例

理论说完,该落地了。在实际项目中使用 fetch-event-source 构建 SSE 客户端时,有几个关键问题需要考虑:

4.1 重试策略:指数退避

网络不稳定时 SSE 连接可能中断,fetch-event-sourceonerror 回调可以通过返回值控制重试间隔。切勿立即重试,否则会造成瞬时并发压力。推荐使用指数退避策略:

javascript
let retryCount = 0;

onerror(err) {
  if (err instanceof FatalError || ctrl.signal.aborted) {
    ctrl.abort();
    throw err;
  }

  retryCount++;
  if (retryCount > MAX_RETRY_COUNT) {
    throw new FatalError('重试次数超限,停止重试');
  }

  // 指数退避:1s → 2s → 4s → 8s → ...,上限 30s
  const delay = Math.min(1000 * Math.pow(2, retryCount - 1), 30000);
  // onerror 在接收到 delay 数值后,会在内部自动重试,无需手动处理。
  return delay;
}

何时重试 vs 何时放弃:对于 4xx 客户端错误(如认证失效),应立即抛出 FatalError 终止重试;对于 5xx 或网络错误,可以重试。

4.2 事件分类处理:区分业务事件与协议控制事件

SSE 流中包含两类消息:

  1. 协议层控制消息ping(保活)、stream_end(流结束)、error(协议错误)
  2. 业务层数据消息progress(进度更新)、thinking(思考中)、complete(完成)等

需要在 onmessage 中区分处理:

javascript
onmessage(msg) {
  // 跳过空消息和 ping 消息
  if (!msg?.data || msg.event === 'ping') {
    return;
  }

  // 处理令牌失效等致命错误
  if (msg.event === 'error' && msg.data === 'TOKEN_EXPIRED') {
    throw new FatalError('令牌失效,请重新登录');
  }

  const data = JSON.parse(msg.data);
  if (data.type === 'ping') return;

  // 分发给具体业务处理器
  switch (data.type) {
    case 'progress': /* ... */ break;
    case 'complete': /* ... */ break;
    case 'error': /* ... */ break;
    case 'stream_end': /* ... */ break;
  }
}

4.3 连接状态追踪:防止重复回调

SSE 事件流可能因网络波动触发多次 onerror/onclose,需要用标志位防止重复触发:

javascript
const streamEnded = { value: false };
const hadError = { value: false };

onclose() {
  if (streamEnded.value) return; // 防止重复触发
  streamEnded.value = true;
  callbacks.onClose?.();
}

onerror(err) {
  if (err instanceof FatalError || ctrl.signal.aborted) {
    hadError.value = true;
    onError?.(err);
    throw err;
  }
  // ... 重试逻辑
}

业务层面也需要追踪状态,避免在 stream_end 后又收到 error 时重复更新状态。

4.4 资源清理:AbortController 与主动取消

页面切换或用户主动取消时,需要主动断开连接、清理资源:

javascript
const ctrl = new AbortController();

fetchEventSource(url, {
  signal: ctrl.signal,
  // ...
});

return () => {
  ctrl.abort(); // 通知底层断开连接
};

fetch-event-source 内部会监听 signal.aborted,立即终止请求并触发 onerror(判断为 FatalError)。

4.5 完整示例:任务流订阅客户端

综合以上要点,以下是一个简化后的任务流订阅客户端实现:

typescript
import { fetchEventSource, EventStreamContentType } from '@microsoft/fetch-event-source';

// 最大重试次数,防止无限重试
const MAX_RETRY_COUNT = 3;

/**
 * 任务流订阅客户端
 * 用于订阅服务端的任务执行进度流事件
 */
class TaskStreamClient {
  private baseUrl: string;

  constructor(baseUrl: string) {
    // 规范化 baseUrl,移除末尾斜杠以避免拼接时出现双斜杠
    this.baseUrl = baseUrl.replace(/\/$/, '');
  }

  /**
   * 订阅任务流事件
   * @param taskId 任务 ID
   * @param callbacks 事件回调集合
   * @returns 取消订阅函数,调用后会断开 SSE 连接
   */
  subscribe(
    taskId: string,
    callbacks: {
      onProgress?: (data: unknown) => void;  // 进度更新回调
      onComplete?: (data: unknown) => void;   // 任务完成回调
      onError?: (err: Error) => void;         // 错误回调
      onClose?: () => void;                   // 连接关闭回调
    }
  ): () => void {
    // AbortController 用于主动取消请求
    const ctrl = new AbortController();
    // 标记流是否已正常结束,防止 onclose 重复触发
    const streamEnded = { value: false };
    // 当前重试次数计数器
    let retryCount = 0;

    // 建立 SSE 连接
    fetchEventSource(`${this.baseUrl}/tasks/${taskId}/stream`, {
      // AbortSignal,用于主动取消请求
      signal: ctrl.signal,
      // 使用 POST 方法,可携带请求体
      method: 'POST',
      // 设置内容类型
      headers: { 'Content-Type': 'application/json' },
      // 即使页面隐藏也保持连接,防止手机浏览器进入后台时断开
      openWhenHidden: true,

      /**
       * onopen:HTTP 连接建立成功后的回调
       * 用于验证服务端返回的 Content-Type 是否为 text/event-stream
       */
      async onopen(response) {
        const contentType = response.headers.get('content-type');
        // 检查响应是否成功且内容类型匹配 SSE 格式
        if (response.ok && contentType?.includes(EventStreamContentType)) {
          // 连接成功时重置重试计数器,下次退避时间从 1s 重新开始
          retryCount = 0;
          return;
        }
        // 内容类型不匹配或响应失败,抛出致命错误,停止重试
        throw new FatalError();
      },

      /**
       * onmessage:收到 SSE 消息时的回调
       * msg 对象的结构:{ id, event, data }
       */
      onmessage(msg) {
        // 跳过空数据消息和 ping 消息(ping 由协议层处理)
        if (!msg.data || msg.event === 'ping') return;

        // 解析 JSON 数据
        const data = JSON.parse(msg.data);

        // 根据业务事件类型分发到对应回调
        switch (data.type) {
          case 'progress':
            // 进度更新事件
            callbacks.onProgress?.(data);
            break;
          case 'complete':
            // 任务完成事件
            callbacks.onComplete?.(data);
            break;
          case 'stream_end':
            // 流结束事件,标记并触发关闭回调
            streamEnded.value = true;
            callbacks.onClose?.();
            break;
          case 'error':
            // 业务错误事件,透传错误信息
            callbacks.onError?.(new Error(data.error));
            break;
        }
      },

      /**
       * onerror:连接出错时的回调
       * 返回值控制是否重试:
       * - 返回数字:等待指定毫秒后重试
       * - 抛出 FatalError:停止重试
       */
      onerror(err) {
        // 如果是致命错误或用户主动取消,直接终止
        if (err instanceof FatalError || ctrl.signal.aborted) {
          ctrl.abort();  // 确保 abort 已调用
          throw err;     // 抛出错误阻止重试
        }

        // 非致命错误,增加重试计数器
        retryCount++;
        // 超过最大重试次数,抛出致命错误停止重试
        if (retryCount > MAX_RETRY_COUNT) {
          throw new FatalError();
        }

        // 计算指数退避延迟时间:1s → 2s → 4s → 8s ...,上限 30s
        return Math.min(1000 * Math.pow(2, retryCount - 1), 30000);
      },
    });

    // 返回取消订阅函数,调用者可通过执行此函数主动断开连接
    return () => ctrl.abort();
  }
}

这个示例涵盖了前文所述的核心要点:指数退避重试、事件分类处理、状态追踪防止重复回调、以及 AbortController 资源清理。实际项目中,我们可以根据业务需求扩展 callbacks、增加数据解析逻辑、以及对接身份认证系统。

为应对各种网络异常与业务场景,构建一个健壮的 SSE 客户端,我们需要掌握这些要点:

  • 重试策略:采用指数退避避免瞬时并发压力,区分可重试错误与致命错误。
  • 事件处理:区分协议控制消息(pingstream_end)与业务消息,防止无效消息干扰。
  • 状态追踪:用标志位防止 onerror/onclose 重复触发,确保回调只执行一次
  • 资源清理:通过 AbortController 实现主动取消,页面切换时及时释放连接

5. 流式响应的服务端

客户端说完,该服务端登场了。服务端的核心任务只有一个:将数据按照 SSE 协议格式持续写入响应流,让客户端能够"边收边处理"。

5.1 服务端要点

与普通 HTTP 响应不同,SSE 服务端需要在响应头和数据格式上遵循特定规范:

响应头设置

响应头作用
Content-Typetext/event-stream声明这是 SSE 响应
Cache-Controlno-cache禁止缓存,确保实时推送
Connectionkeep-alive保持长连接
X-Accel-Bufferingno(可选)关闭 Nginx 等反向代理的缓冲

数据格式:服务端需要按照 SSE 协议规范写入数据:

id: <event_id>\n
event: <event_type>\n
data: <json_data>\n\n

注意:每行以 \n 结尾,事件之间以空行分隔(即连续两个 \n)。

5.2 NestJS 实现示例

NestJS 提供了 @Sse() 装饰器和 Observable + map 的响应模式,非常适合实现 SSE。以下是完整示例:

typescript
import {
  Controller,
  Post,
  Sse,
  Req,
  Param,
  Query,
} from '@nestjs/common';
import { Request } from 'express';
import { Observable, interval, map } from 'rxjs';
import { EventMessage } from './interfaces/event-message.interface';

/**
 * SSE 事件消息格式
 * id: 事件唯一标识,客户端可用于断线重连
 * event: 事件类型,客户端通过 addEventListener 监听
 * data: 事件数据内容
 */
interface SseEvent {
  id: string;
  event: string;
  data: unknown;
}

@Controller('tasks')
export class TasksController {

  /**
   * 订阅任务流事件
   * GET /tasks/:taskId/stream
   */
  @Sse(':taskId/stream')
  streamTask(@Param('taskId') taskId: string): Observable<MessageEvent> {
    // 使用 interval 模拟任务进度推送,实际项目中替换为真实任务观察
    return interval(1000).pipe(
      map((count) => {
        // 构造 SSE 格式数据
        const event: SseEvent = {
          // 事件 ID,使用计数器作为简单示例
          id: String(count),
          // 事件类型
          event: 'progress',
          // 数据内容,必须是字符串(如果是对象需要 JSON.stringify)
          data: { status: 'running', progress: (count % 100) },
        };

        // NestJS 的 @Sse 装饰器期望返回 MessageEvent 对象
        // data 字段即为 SSE 协议中的 data: <content> 部分
        // type 字段对应 SSE 的 event: <type>
        // lastEventId 对应 SSE 的 id: <number>
        return {
          data: JSON.stringify(event.data),
          type: event.event,
          lastEventId: event.id,
        } as MessageEvent;
      }),
    );
  }
}

关键点说明

  1. @Sse() 装饰器:NestJS 封装的 SSE 响应装饰器,内部会自动设置正确的响应头

  2. Observable 返回:NestJS 支持 RxJS 响应模式,interval(1000) 表示每秒推送一次

  3. MessageEvent 对象

    • data: SSE 协议的 data: 部分(需要字符串)
    • type: SSE 协议的 event: 部分
    • lastEventId: SSE 协议的 id: 部分

5.3 支持 POST 请求的 SSE

如果需要像 fetchEventSource 那样通过 POST 携带请求体,NestJS 中可以这样实现:

typescript
import { Controller, Post, Body, Sse, Query } from '@nestjs/common';
import { Observable, Subject } from 'rxjs';
import { map, takeUntil } from 'rxjs/operators';

@Controller('tasks')
export class TasksController {

  /**
   * 通过 POST 请求订阅任务流,支持携带请求体
   * POST /tasks/:taskId/stream
   */
  @Post(':taskId/stream')
  @Sse()
  streamTaskPost(
    @Param('taskId') taskId: string,
    @Body() body: { filter?: string; options?: Record<string, unknown> },
  ): Observable<MessageEvent> {
    const destroy$ = new Subject<void>();

    // 监听请求关闭事件,清理资源
    // 当客户端断开连接时,NestJS 会触发 complete 或 error
    // 通过 takeUntil 自动取消订阅,防止内存泄漏
    return interval(1000).pipe(
      takeUntil(destroy$),
      map((count) => {
        // 根据请求参数过滤数据(示例)
        const filtered = this.applyFilter(body.filter, count);

        return {
          data: JSON.stringify({ progress: filtered }),
          type: 'progress',
          lastEventId: String(count),
        } as MessageEvent;
      }),
    );
  }

  /**
   * 根据 filter 参数过滤数据
   */
  private applyFilter(filter: string | undefined, count: number): number {
    if (filter === 'even') {
      return count % 2 === 0 ? count : count - 1;
    }
    return count;
  }
}

6. 结语

回顾全文,我们从概念辨析出发,历经客户端到服务端的完整技术链条:

  • 概念层:厘清了流式 HTTP(传输机制)与 SSE(数据格式)的本质区别与协作关系
  • 客户端层:从原生 EventSource API,到 fetch + ReadableStream 的进阶方案,再到 @microsoft/fetch-event-source 的工业级实现
  • 服务端层:以 NestJS 为例,展示了 SSE 响应的标准实现与 POST 请求扩展

或许在阅读之前,你对 SSE 的认知还停留在"浏览器原生的 EventSource"。但现在你应该已经掌握了一套完整的解决方案:既能像调用普通接口一样携带请求体、自定义 Header,又能享受流式传输的实时性;数据格式依然遵循 SSE 协议,客户端解析零成本。

这套方案并非银弹——它本质上仍是单向通信。如果你的业务需要双向实时交互,WebSocket 仍是更合适的选择。但在 AI 时代,大量"请求-流式响应"的场景(如 LLM 对话、任务进度推送、实时日志等),fetch + SSE 的组合已经足够强大且优雅。下次遇到需要实时数据推送的场景时,不妨先问自己:真的需要 WebSocket 吗?