基于 SSE 实现打字机效果输出

在 AI 应用开发中,用户体验至关重要。当 LLM 需要较长时间生成回答时,传统的”等待-返回”模式会让用户感到焦虑。SSE(Server-Sent Events)流式输出技术能够实现”边生成边展示”,显著提升用户体验。 本文将以本项目的知识库问答和 RAG 聊天功能为例,详细讲解 SSE 流式输出的实现原理、与 WebSocket 的对比,以及在实际项目中的最佳实践。

SSE 简介

SSE(Server-Sent Events) 是一种基于 HTTP 的服务器推送技术,允许服务器向客户端单向发送事件流。它是 HTML5 标准的一部分,与传统轮询相比,SSE 具有实时性强、延迟低、资源消耗少等优势。

图片[1]-基于 SSE 实现打字机效果输出-MacFun is an interesting website.

SSE 的核心特点:

● 单向通信:服务器向客户端推送数据

● 自动重连:浏览器断开时会自动尝试重连

● 文本格式:使用 UTF-8 编码的文本数据

● 标准协议:基于 HTTP/HTTPS,无需额外协议升级

SSE 和 WebSocket 对比

WebSocket 是一种在 TCP 连接上进行全双工通信的协议,建立客户端和服务器之间的通信渠道。浏览器和服务器仅需一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

图片[2]-基于 SSE 实现打字机效果输出-MacFun is an interesting website.

SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:

● SSE 是基于 HTTP 协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket 需单独服务器来处理协议。

● SSE 单向通信,只能由服务端向客户端单向通信;WebSocket 全双工通信,即通信的双方可以同时发送和接受信息。

● SSE 实现简单开发成本低,无需引入其他组件;WebSocket 传输数据需做二次解析,开发门槛高一些。

● SSE 默认支持断线重连;WebSocket 则需要自己实现。

● SSE 只能传送文本消息,二进制数据需要经过编码后传送;WebSocket 默认支持传送二进制数据。

SSE 与 WebSocket 该如何选择?

● 选择 SSE:服务器单向推送数据,如实时通知、AI 流式生成。

● 选择 WebSocket:需要双向通信,如聊天室、协作编辑。

SSE 协议格式

SSE 是一组基于 UTF-8 编码的纯文本流。每一个事件(Message)由一行或多行设置组成,最后必须以一个**空行(两个换行符 \n\n)**作为该事件的结束标志。

在同一个事件块内,各个字段(data, event, id, retry)的顺序没有严格要求,但它们必须紧挨着,中间不能有空行:

data: 第一行内容
data: 第二行内容
data: 第三行内容

id: 事件ID
event: 自定义事件类型
retry: 3000

data: 下一个事件

字段说明:

字段说明
字段说明
data具体的业务数据。若有多行,自动拼接。
event自定义事件类型(默认为 message)。
id事件 ID,用于断线重连后恢复。
retry告诉客户端如果连接掉线,等待多少毫秒后再尝试重连。
\n\n事件分隔符(两个换行符分隔不同事件)。

Spring Boot SSE 实现

Spring Boot 支持通过 MediaType.TEXT_EVENT_STREAM_VALUE 和 Flux<ServerSentEvent<T>> 实现流式输出。本项目使用的是 WebMVC + Reactor 混合模式:依赖 spring-boot-starter-webmvc,但通过 Spring AI 传递依赖的 reactor-core 来使用 Flux 进行 SSE 输出。

WebMVC vs WebFlux:WebFlux 是完全响应式的非阻塞架构,而 WebMVC + Flux 只是在 Controller 层使用响应式类型做 SSE 输出,底层仍是阻塞式 Servlet 模型。对于 AI 流式输出场景,这种混合模式已足够使用。

基础用法

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamData() {
    return Flux.interval(Duration.ofMillis(500))
        .map(seq -> "data: 消息 " + seq + "\n\n");
}

代码说明:

●MediaType.TEXT_EVENT_STREAM_VALUE:声明响应类型为 SSE 事件流,值为 text/event-stream。

●Flux<String>:响应式流类型,返回多个字符串事件。

●Flux.interval():每 500 毫秒发射一个递增序列。 ●手动拼接 SSE 格式:data: 前缀 + \n\n 分隔符。

使用 ServerSentEvent 包装

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamData() {
    return Flux.interval(Duration.ofMillis(500))
        .map(seq -> ServerSentEvent.<String>builder()
            .id(String.valueOf(seq))           // 事件 ID
            .event("custom-event")              // 自定义事件类型
            .data("消息 " + seq)                // 事件数据
            .retry(3000L)                       // 重连间隔
            .build());
}

代码说明:

●ServerSentEvent<T>:Spring 提供的 SSE 事件包装类,自动处理 SSE 格式。

●字段说明:

id():设置事件唯一标识,客户端断线重传时可作为断点恢复的依据。

event():自定义事件类型,客户端可通过 addEventListener(‘custom-event’) 监听。

retry():告诉客户端连接断开后等待多久重连(毫秒)。

Spring AI 流式调用

Spring AI 的 ChatClient 提供了流式调用接口:

Flux<String> responseFlux = chatClient.prompt()
    .system(systemPrompt)
    .user(userPrompt)
    .stream()  // 关键:使用 stream() 方法启用流式输出
    .content();

流式调用特点:

● 响应式编程:返回 Flux<String>,支持背压控制。

● 增量返回:每个 token 立即返回,不等待完整生成。

● 自动重试:可配置重试策略。

● 超时控制:可设置超时时间。

SSE 格式转义处理

SSE 协议使用 \n\n 作为事件分隔符,如果数据中包含换行符,会破坏协议格式:

# 错误示例:数据中的换行符破坏了 SSE 格式
data: 第一行内容
仍然在第一行           ← 被误认为是新的事件
data: 第二行内容

# 正确示例:转义换行符
data: 第一行内容\n仍然在第一行
data: 第二行内容\n

后端转义实现:

.map(chunk -> ServerSentEvent.<String>builder()
    .data(chunk.replace("\n", "\\n").replace("\r", "\\r"))
    .build())

前端反向转义:

const text = chunk.replace(/\\n/g, '\n').replace(/\\r/g, '\r');

错误处理与性能优化

超时控制

return responseFlux
    .timeout(Duration.ofSeconds(30))
    .onErrorResume(TimeoutException.class, e -> {
        log.error("流式输出超时", e);
        return Flux.just("【错误】回答生成超时,请缩短问题或稍后重试。");
    });

背压控制

在 WebMVC + Reactor 混合模式下,Flux 的背压机制仍然有效,可以控制数据流速度:

Flux<String> responseFlux = chatClient.prompt()
    .stream()
    .content()
    .onBackpressureBuffer();  // 缓冲背压

注意:虽然 Reactor 的背压机制可用,但 WebMVC 底层仍是阻塞式 Servlet 线程模型,连接数受限于 Servlet 容器的线程池配置(如 Tomcat 的 server.tomcat.threads.max)。

Servlet 容器配置

本项目使用 虚拟线程(Java 21+),无需担心 SSE 长连接占用线程池:

spring:
  threads:
    virtual:
      enabled: true  # 启用虚拟线程

虚拟线程 vs 平台线程:

场景平台线程虚拟线程
场景平台线程虚拟线程
200 并发 SSE线程池满,排队轻松处理
AI 调用等待 3 秒线程阻塞,占用资源自动挂起,让出资源
10000 并发请求拒绝服务正常处理

为什么不用 WebFlux? 虚拟线程只需一行配置,无需重写代码;而 WebFlux 需要将 JPA 换成 R2DBC,改造成本极高。对于 I/O 密集型的 AI 应用,虚拟线程是更务实的选择。 关于虚拟线程的详细介绍可以看这篇文章:虚拟线程常见问题总结

超时设置

spring:
  ai:
    openai:
      chat:
        options:
          duration: 30s  # AI 调用超时时间

最佳实践

SSE 格式规范

SSE 对格式极其敏感,任何微小的格式偏差都可能导致客户端解析器挂起。

规范项说明项目实践
规范项说明项目实践
数据转义data 字段内的原始数据若包含 \n,必须转义为 \nRAG 聊天使用 chunk.replace(“\n”, “\\n”) 转义
事件定界严格使用 \n\n(双换行)作为每个事件块的结束标志使用 ServerSentEvent 自动处理
字符编码强制使用 UTF-8Spring Boot 默认使用 UTF-8
心跳机制每隔 15-30 秒发送冒号开头的注释行(如 : heartbeat\n\n)本项目暂未实现,生产环境建议添加

稳定性与异常处理

SSE 是长连接,资源释放和状态一致性是重中之重。

策略维度核心操作项目实践
策略维度核心操作项目实践
消息占位流式开始前先创建消息占位,完成后更新内容prepareStreamMessage() + completeStreamMessage()
状态管理使用 completed 字段区分”生成中”和”已完成”前端根据此字段显示 loading 状态
异常保存无论成功或失败,都保存已接收的内容doOnError 中保存 fullContent
超时降级设置合理的超时时间,超时后返回友好提示.timeout(Duration.ofSeconds(30)) + onErrorResume

RAG 聊天流式处理示例:

return sessionService.getStreamAnswer(sessionId, request.question())
    .doOnNext(fullContent::append)  // 累积完整内容
    .map(chunk -> ServerSentEvent.<String>builder()
        .data(chunk.replace("\n", "\\n").replace("\r", "\\r"))  // 转义换行符
        .build())
    .doOnComplete(() -> {
        // 成功:更新消息内容并标记为已完成
        sessionService.completeStreamMessage(messageId, fullContent.toString());
    })
    .doOnError(e -> {
        // 失败:保存已接收的部分内容
        String content = !fullContent.isEmpty()
            ? fullContent.toString()
            : "【错误】回答生成失败:" + e.getMessage();
        sessionService.completeStreamMessage(messageId, content);
    });

前端性能优化

流式数据高频更新会导致页面卡顿,需要优化渲染策略。

// 使用 requestAnimationFrame + React Transition 优化渲染
let fullContent = '';
const rafRef = useRef<number>();

onMessage((chunk: string) => {
  fullContent += chunk;
  if (rafRef.current) {
    cancelAnimationFrame(rafRef.current);  // 取消上一次未执行的渲染
  }
  rafRef.current = requestAnimationFrame(() => {
    startTransition(() => {
      updateAssistantMessage(fullContent);
    });
  });
});

优化要点:

● requestAnimationFrame:将渲染推迟到下一帧,避免高频更新

● cancelAnimationFrame:取消上一次未执行的渲染,只保留最新一次

● startTransition:React 18 并发特性,将渲染标记为低优先级

Nginx 代理配置

如果使用 Nginx 反向代理,必须关闭缓冲配置:

location /api/ {
    proxy_pass http://backend;
    proxy_buffering off;      # 关闭代理缓冲
    proxy_cache off;          # 关闭代理缓存
    proxy_read_timeout 300s;  # 读取超时时间
    proxy_set_header Connection '';     # 清空 Connection 头
    proxy_set_header Cache-Control 'no-cache';  # 禁止缓存
}

配置说明:

● proxy_buffering off:禁用缓冲,确保数据实时转发

● proxy_cache off:禁用缓存,避免旧数据被缓存

● proxy_read_timeout:设置合理的读取超时时间

● Connection ”:清空 Connection 头,保持长连接

总结

SSE 流式输出是提升 AI 应用用户体验的关键技术。核心要点:

要点说明
要点说明
协议选择单向推送选 SSE,双向通信选 WebSocket
协议格式data: 前缀 + \n\n 分隔符
格式转义换行符 \n → \n,避免破坏事件分隔符
消息持久化先创建占位 (completed=false),完成后更新内容
限流保护使用 @RateLimit 注解防止 AI 服务被刷爆
异常处理无论成功失败都保存已接收内容,避免数据丢失
前端优化使用 RAF + React Transition 优化高频渲染

项目核心文件:

● modules/knowledgebase/KnowledgeBaseController.java – 知识库控制器

● modules/knowledgebase/RagChatController.java – RAG 聊天控制器

● modules/knowledgebase/service/KnowledgeBaseQueryService.java – 查询服务

● modules/knowledgebase/service/RagChatSessionService.java – 会话管理服务

● frontend/src/api/knowledgebase.ts – 知识库前端 API

● frontend/src/api/ragChat.ts – RAG 聊天前端 API

© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容