Spring Boot + JavaScript 實時數(shù)據(jù)流:兩種完整實現(xiàn)
環(huán)境:SpringBoot3.4.2
1. 簡介
在構(gòu)建實時 Web 應(yīng)用時,傳統(tǒng)的請求-響應(yīng)模式已無法滿足數(shù)據(jù)持續(xù)推送的需求。Spring Boot 提供了多種服務(wù)端流式方案:StreamingResponseBody 基于原始輸出流,適用于大文件下載或自定義流傳輸,靈活性高但需手動 flush;SseEmitter 遵循 Server-Sent Events(SSE)標準,專為瀏覽器單向推送設(shè)計,支持事件類型、消息 ID 和自動重連,適合實時通知、日志流等場景;而 WebSocket 則提供全雙工通信,適用于聊天、協(xié)同編輯等高頻雙向交互場景。三者各有定位:SSE 簡單輕量、基于 HTTP、瀏覽器原生支持;WebSocket 功能強大但開銷較大;StreamingResponseBody 更偏向底層流控制。
本篇文章將重點介紹 StreamingResponseBody 與 SseEmitter 的前后端實現(xiàn),對比其在實時數(shù)據(jù)流中的應(yīng)用差異,幫助開發(fā)者按需選型。
2.實戰(zhàn)案例
2.1 方案1(StreamingResponseBody)
StreamingResponseBody 是 Spring MVC 提供的一個接口,用于異步流式傳輸響應(yīng)數(shù)據(jù)。它允許服務(wù)器在處理過程中逐步將數(shù)據(jù)寫入輸出流,適用于大文件下載、日志推送或?qū)崟r數(shù)據(jù)流場景。常與 ResponseEntity<StreamingResponseBody> 結(jié)合使用,需在子線程中執(zhí)行流式寫入,并通過 flush() 主動推送數(shù)據(jù)到客戶端。
private final ObjectMapper objectMapper ;
public UserStreamingController(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@GetMapping("/stream")
public ResponseEntity<StreamingResponseBody> streamUsers() {
StreamingResponseBody responseBody = os -> {
datas.forEach(user -> {
try {
String json = this.objectMapper.writeValueAsString(user) + "\n";
os.write(json.getBytes());
os.flush() ;
// 模擬延遲
TimeUnit.MILLISECONDS.sleep(500) ;
} catch (Exception e) {
throw new RuntimeException(e);
}
}) ;
} ;
return ResponseEntity.ok()
.header("Content-Type", "text/plain;charset=utf-8")
.body(responseBody);
}前端讀取實現(xiàn)
async function fetchStream() {
const response = await fetch('/users/stream');
if (!response.body) {
console.error('ReadableStream not supported');
return;
}
let container = document.querySelector('#stream_data')
// 清空所有子元素
// 方案1
// container.innerHTML = ""
// 方案2
// while (container.firstChild) {
// container.removeChild(container.firstChild);
// }
// 方案3
// 專為"替換/清空子元素"設(shè)計,比上面2個方案性能更好
container.replaceChildren()
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = ''; // 用于拼接不完整的文本片段
while (true) {
const {done, value} = await reader.read();
if (done) {
console.log('Stream complete');
break;
}
// 將 Uint8Array 解碼為字符串
const chunk = decoder.decode(value, {stream: true});
buffer += chunk;
// 按行處理
const lines = buffer.split('\n');
buffer = lines.pop(); // 最后一行可能不完整,留在 buffer 中
lines.forEach(line => {
if (line) {
appendData(container, line);
}
});
}
// 處理最后可能殘留的不完整行
if (buffer) {
appendData(container, buffer)
}
}
function appendData(container, data) {
let liNode = document.createElement("li");
let dataNode = document.createTextNode(data)
liNode.appendChild(dataNode)
container.appendChild(liNode)
}HTML頁面
<body>
<button type="button" onclick="fetchStream()">獲取數(shù)據(jù)</button>
<button type="button" onclick="fetchSse()">SSE獲取數(shù)據(jù)</button>
<hr />
<ul id="stream_data"></ul>
</body>效果
2.2 方案2(SSE)
SseEmitter是Spring用于實現(xiàn)服務(wù)器推送(Server-Sent Events, SSE)的類,允許服務(wù)器通過HTTP連接主動向客戶端推送實時數(shù)據(jù)。它支持單向通信、事件流格式傳輸,適用于實時通知、數(shù)據(jù)流、聊天應(yīng)用等場景,能顯著提升用戶體驗。
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter sseUsers() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
// 在子線程中發(fā)送數(shù)據(jù),避免阻塞請求線程
Executors.newSingleThreadExecutor().submit(() -> {
try {
for (User user : datas) {
try {
// 序列化為 JSON
String json = objectMapper.writeValueAsString(user);
// 發(fā)送 SSE 消息
emitter.send(SseEmitter.event().id(UUID.randomUUID().toString()) // 可選:消息ID
.name("user-data") // 可選:事件類型
.data(json) // 數(shù)據(jù)體
.reconnectTime(5000) // 可選:重連時間
);
// 模擬延遲
Thread.sleep(300);
} catch (JsonProcessingException e) {
// JSON 序列化失敗,發(fā)送錯誤事件
emitter.send(SseEmitter.event().name("error").data("JSON error: " + e.getMessage()));
} catch (IOException e) {
// 客戶端斷開等 IO 異常
emitter.completeWithError(e);
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
emitter.completeWithError(e);
return;
}
}
// 先發(fā)送完成事件
emitter.send(SseEmitter.event()
.name("complete")
.data("done")
.id(UUID.randomUUID().toString()));
// 所有數(shù)據(jù)發(fā)送完成
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}前端讀取實現(xiàn)
async function fetchSse() {
const container = document.querySelector('#stream_data');
container.replaceChildren(); // 清空
// 使用 EventSource
const eventSource = new EventSource('/users/sse');
eventSource.onopen = (event) => {
console.log('SSE 連接已建立');
};
// 監(jiān)聽自定義事件(name = "user-data")
eventSource.addEventListener('user-data', function (event) {
appendData(container, `${event.data}`);
});
// ? 監(jiān)聽完成事件
eventSource.addEventListener('complete', function (event) {
appendData(container, '? 所有數(shù)據(jù)加載完畢');
// 主動關(guān)閉連接,避免繼續(xù)重連
eventSource.close();
});
// 監(jiān)聽錯誤
eventSource.onerror = function (event) {
console.error('SSE 錯誤:', event);
if (eventSource.readyState === EventSource.CLOSED) {
console.log('連接已關(guān)閉');
}
};
}效果

圖片
























