流式输出 SSE:用服务器推送事件实现逐 Token 返回
FreeGuideOnline
最新
2026-06-29
运行后,页面将每秒显示一条新消息,直到收到 [DONE] 后关闭连接。SSE 的 onerror 会触发自动重连(浏览器默认行为),非常适合需要稳定推送的场景。
逐 Token 返回:AI 对话的典型实现
现在大语言模型(LLM)的回复通常采用 流式输出,即每生成一个词(token)就立即发送给前端,提升交互体验。我们可以借助 SSE 轻松实现这种效果。
下面模拟一个 LLM 流式回复的完整示例。
服务端:模拟 token 生成
app.get('/chat-stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
// 模拟的回复文本,按字符拆分作为“token”
const responseText = '你好!我是由 SSE 驱动的 AI 助手。我能逐字输出回复内容,带来更流畅的对话体验。';
const tokens = responseText.split(''); // 实际应用中可能是 tokenizer 的结果
let index = 0;
const sendToken = () => {
if (index < tokens.length) {
const token = tokens[index];
// 构造 SSE 数据,可以包装成 JSON 携带更多信息
const data = JSON.stringify({
token: token,
index: index,
timestamp: Date.now()
});
res.write(`data: ${data}\n\n`);
index++;
// 模拟生成间隔
setTimeout(sendToken, 50);
} else {
// 结束信号
res.write('data: [DONE]\n\n');
res.end();
}
};
sendToken();
req.on('close', () => {
res.end();
});
});
客户端:逐字显示
const eventSource = new EventSource('/chat-stream');
const chatBox = document.getElementById('chat-box');
let fullAnswer = '';
eventSource.onmessage = (e) => {
if (e.data === '[DONE]') {
eventSource.close();
return;
}
try {
const chunk = JSON.parse(e.data);
fullAnswer += chunk.token;
chatBox.textContent = fullAnswer;
} catch (err) {
// 非 JSON 格式的简单文本
fullAnswer += e.data;
chatBox.textContent = fullAnswer;
}
};
这样,AI 回复就像真实打字效果一样,逐 token 出现在屏幕上。
进阶:自定义事件与错误处理
1. 自定义事件类型
除了默认的 message 事件,你还可以定义具名事件来区分不同种类的消息。
服务端发送:
res.write(`event: user-typing\ndata: 对方正在输入...\n\n`);
res.write(`event: message\ndata: ${JSON.stringify({text: '你好'})}\n\n`);
客户端监听:
eventSource.addEventListener('user-typing', (e) => {
showTypingIndicator(e.data);
});
eventSource.addEventListener('message', (e) => {
handleMessage(e.data);
});
2. 设置重连间隔
SSE 支持在服务端指定重连时间(毫秒),浏览器断线后将按该时间间隔自动重连。
res.write(`retry: 3000\n\n`); // 3 秒重连一次
3. Last-Event-ID 断点续传
客户端可以在重连时自动带上 Last-Event-ID 请求头,服务器可以根据它推送丢失的事件。
服务端设置事件 ID:
res.write(`id: ${eventId}\ndata: 消息内容\n\n`);
客户端断线重连后,浏览器会自动发送 Last-Event-ID: ${eventId},服务端读取后从对应位置继续推送。
生产环境注意事项
-
负载均衡与反向代理
确保 Nginx 等代理不会缓冲响应,需要关闭缓冲:proxy_buffering off; proxy_cache off; chunked_transfer_encoding on; -
连接数限制
浏览器对同域名的 SSE 连接数通常限制为 6,合理分配或使用 HTTP/2 可以突破。 -
安全性
- 始终验证用户身份(可在建立连接前通过 Cookie 或 Token 鉴权)。
- 避免
Access-Control-Allow-Origin: *,限定允许的源。
-
心跳保活
在长时间无数据推送时,可以定时发送注释行: heartbeat\n\n防止连接被中间代理关闭。
SSE 与 Fetch 搭配的流式读取
如果不想使用 EventSource,也可以利用 Fetch API 和 ReadableStream 处理 SSE 流:
const response = await fetch('/chat-stream');
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 解析 SSE 格式并逐行处理
}