流式输出 SSE:用服务器推送事件实现逐 Token 返回

FreeGuideOnline 最新 2026-06-29

```

运行后,页面将每秒显示一条新消息,直到收到 [DONE] 后关闭连接。SSE 的 onerror 会触发自动重连(浏览器默认行为),非常适合需要稳定推送的场景。

逐 Token 返回:AI 对话的典型实现

现在大语言模型(LLM)的回复通常采用 流式输出,即每生成一个词(token)就立即发送给前端,提升交互体验。我们可以借助 SSE 轻松实现这种效果。

下面模拟一个 LLM 流式回复的完整示例。

服务端:模拟 token 生成

app.get('/chat-stream', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.flushHeaders();

  // 模拟的回复文本,按字符拆分作为“token”
  const responseText = '你好!我是由 SSE 驱动的 AI 助手。我能逐字输出回复内容,带来更流畅的对话体验。';
  const tokens = responseText.split('');   // 实际应用中可能是 tokenizer 的结果

  let index = 0;

  const sendToken = () => {
    if (index < tokens.length) {
      const token = tokens[index];
      // 构造 SSE 数据,可以包装成 JSON 携带更多信息
      const data = JSON.stringify({
        token: token,
        index: index,
        timestamp: Date.now()
      });
      res.write(`data: ${data}\n\n`);
      index++;
      // 模拟生成间隔
      setTimeout(sendToken, 50);
    } else {
      // 结束信号
      res.write('data: [DONE]\n\n');
      res.end();
    }
  };

  sendToken();

  req.on('close', () => {
    res.end();
  });
});

客户端:逐字显示

const eventSource = new EventSource('/chat-stream');
const chatBox = document.getElementById('chat-box');
let fullAnswer = '';

eventSource.onmessage = (e) => {
  if (e.data === '[DONE]') {
    eventSource.close();
    return;
  }

  try {
    const chunk = JSON.parse(e.data);
    fullAnswer += chunk.token;
    chatBox.textContent = fullAnswer;
  } catch (err) {
    // 非 JSON 格式的简单文本
    fullAnswer += e.data;
    chatBox.textContent = fullAnswer;
  }
};

这样,AI 回复就像真实打字效果一样,逐 token 出现在屏幕上。

进阶:自定义事件与错误处理

1. 自定义事件类型

除了默认的 message 事件,你还可以定义具名事件来区分不同种类的消息。

服务端发送:

res.write(`event: user-typing\ndata: 对方正在输入...\n\n`);
res.write(`event: message\ndata: ${JSON.stringify({text: '你好'})}\n\n`);

客户端监听:

eventSource.addEventListener('user-typing', (e) => {
  showTypingIndicator(e.data);
});

eventSource.addEventListener('message', (e) => {
  handleMessage(e.data);
});

2. 设置重连间隔

SSE 支持在服务端指定重连时间(毫秒),浏览器断线后将按该时间间隔自动重连。

res.write(`retry: 3000\n\n`); // 3 秒重连一次

3. Last-Event-ID 断点续传

客户端可以在重连时自动带上 Last-Event-ID 请求头,服务器可以根据它推送丢失的事件。

服务端设置事件 ID:

res.write(`id: ${eventId}\ndata: 消息内容\n\n`);

客户端断线重连后,浏览器会自动发送 Last-Event-ID: ${eventId},服务端读取后从对应位置继续推送。

生产环境注意事项

  1. 负载均衡与反向代理
    确保 Nginx 等代理不会缓冲响应,需要关闭缓冲:

    proxy_buffering off;
    proxy_cache off;
    chunked_transfer_encoding on;
    
  2. 连接数限制
    浏览器对同域名的 SSE 连接数通常限制为 6,合理分配或使用 HTTP/2 可以突破。

  3. 安全性

    • 始终验证用户身份(可在建立连接前通过 Cookie 或 Token 鉴权)。
    • 避免 Access-Control-Allow-Origin: *,限定允许的源。
  4. 心跳保活
    在长时间无数据推送时,可以定时发送注释行 : heartbeat\n\n 防止连接被中间代理关闭。

SSE 与 Fetch 搭配的流式读取

如果不想使用 EventSource,也可以利用 Fetch API 和 ReadableStream 处理 SSE 流:

const response = await fetch('/chat-stream');
const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  const chunk = decoder.decode(value);
  // 解析 SSE 格式并逐行处理
}