SpringBoot + ResponseBodyEmitter 實(shí)時異步流式推送,優(yōu)雅!
兄弟們,今天咱們來聊個挺酷的技術(shù) —— 用 SpringBoot 結(jié)合 ResponseBodyEmitter 搞實(shí)時異步流式推送。估計不少人看到 “流式推送” 這四個字,腦子里已經(jīng)開始浮現(xiàn)各種復(fù)雜的場景了,比如實(shí)時聊天、數(shù)據(jù)監(jiān)控啥的。別急,咱們今天就用大白話,配上點(diǎn)小幽默,把這玩意兒給盤明白。
先來說說為啥需要這東西。咱們平時寫的 Web 接口,大多是 “一問一答” 的模式??蛻舳税l(fā)個請求,服務(wù)器哐哐哐處理完,打包成一個響應(yīng)扔回去,完事。這就好比你去便利店買瓶可樂,付了錢拿了貨,轉(zhuǎn)身就走,整個過程干凈利落。
但有些場景就不一樣了。比如說,你在看直播的時候,主播那邊的畫面和聲音是一幀一幀、一秒一秒不斷傳過來的,總不能等直播結(jié)束了,一次性給你整個視頻文件吧?再比如,你在用某個數(shù)據(jù)分析工具,想實(shí)時看到數(shù)據(jù)處理的進(jìn)度,總不能每隔兩秒就手動刷新一次頁面吧?這時候,“流式推送” 就派上用場了 —— 服務(wù)器可以像流水一樣,一點(diǎn)一點(diǎn)把數(shù)據(jù)推給客戶端,不用等所有事情都做完。
在 ResponseBodyEmitter 出現(xiàn)之前,咱們想搞這種實(shí)時推送,要么用 WebSocket,要么用輪詢。WebSocket 是個好東西,全雙工通信,但是配置起來有點(diǎn)麻煩,對于一些簡單的場景來說,有點(diǎn) “殺雞用牛刀” 的感覺。輪詢就更別說了,客戶端傻乎乎地每隔一段時間就問服務(wù)器 “有新數(shù)據(jù)嗎?有新數(shù)據(jù)嗎?”,不僅效率低,還浪費(fèi)資源,服務(wù)器估計都想拉黑這種客戶端。
那 ResponseBodyEmitter 是啥呢?簡單說,它就是 SpringBoot 提供的一個工具,能讓服務(wù)器一邊處理數(shù)據(jù),一邊把處理好的部分一點(diǎn)點(diǎn)推給客戶端,不用等全部處理完。就像你點(diǎn)了一份小龍蝦,廚師不是等所有蝦都做好了才端上來,而是炒好一盤先給你端一盤,讓你先吃著,他繼續(xù)炒下一盤。這體驗(yàn),是不是比干等著強(qiáng)多了?
咱們先來看個簡單的例子,感受一下 ResponseBodyEmitter 的用法。
首先,得在 SpringBoot 項(xiàng)目里引入 Web 依賴,這個不用多說,搞 SpringBoot 的都懂。然后,寫一個 Controller:
@RestController
public class EmitterController {
@GetMapping("/stream")
public ResponseBodyEmitter streamData() {
// 創(chuàng)建一個ResponseBodyEmitter對象,這里可以設(shè)置超時時間,比如30秒
ResponseBodyEmitter emitter = new ResponseBodyEmitter(30000L);
// 開個線程處理數(shù)據(jù),模擬耗時操作
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
// 每隔1秒推送一次數(shù)據(jù)
Thread.sleep(1000);
// 推送數(shù)據(jù),這里可以是字符串、對象等
emitter.send("這是第" + (i + 1)次推送的數(shù)據(jù)");
}
// 推送完成,關(guān)閉emitter
emitter.complete();
} catch (Exception e) {
// 發(fā)生異常時,通知客戶端
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}就這么幾行代碼,一個簡單的流式推送接口就搞定了??蛻舳苏埱?stream接口后,不會馬上收到一個完整的響應(yīng),而是會每隔 1 秒收到一段數(shù)據(jù),一共收到 5 段,最后連接關(guān)閉。是不是很簡單?可能有兄弟會問,這玩意兒是怎么實(shí)現(xiàn)的呢?其實(shí)原理也不復(fù)雜。當(dāng)服務(wù)器返回 ResponseBodyEmitter 對象時,Spring 并不會馬上把響應(yīng)發(fā)給客戶端,而是會保持這個連接。然后,當(dāng)我們調(diào)用 emitter.send () 方法時,Spring 就會把數(shù)據(jù)一點(diǎn)點(diǎn)寫到響應(yīng)流里,客戶端就能實(shí)時收到了。直到調(diào)用 emitter.complete (),或者發(fā)生異常調(diào)用 emitter.completeWithError (),這個連接才會被關(guān)閉。
這里有個小細(xì)節(jié),就是 ResponseBodyEmitter 的超時時間。如果服務(wù)器在規(guī)定時間內(nèi)沒做任何操作(既沒 send 數(shù)據(jù),也沒 complete),這個連接就會超時關(guān)閉。所以在實(shí)際使用中,得根據(jù)業(yè)務(wù)場景合理設(shè)置超時時間,別太短也別太長。太短了,可能數(shù)據(jù)還沒推完就斷了;太長了,會占用服務(wù)器連接資源。
咱們再來說說客戶端怎么接收這些流式數(shù)據(jù)。如果是瀏覽器端,可以用 JavaScript 的 Fetch API 或者 XMLHttpRequest 來接收。比如用 Fetch:
fetch('/stream')
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function read() {
return reader.read().then(({ done, value }) => {
if (done) {
console.log('推送完成');
return;
}
// 解碼收到的二進(jìn)制數(shù)據(jù)為字符串
const data = decoder.decode(value, { stream: true });
console.log('收到數(shù)據(jù):', data);
return read();
});
}
return read();
})
.catch(error => {
console.error('發(fā)生錯誤:', error);
});這樣,瀏覽器控制臺就會每隔 1 秒打印出服務(wù)器推送的數(shù)據(jù),體驗(yàn)還是挺不錯的。不過,上面那個例子還是太簡單了,實(shí)際項(xiàng)目中用起來,還得考慮不少問題。比如說,怎么管理多個 Emitter 實(shí)例?如果一個客戶端打開了多個頁面,或者多個客戶端同時連接,總不能每個請求都 new 一個 Emitter 吧?這時候,咱們就需要一個 Emitter 的管理器了。
咱們可以搞一個 EmitterManager 類,用來保存和管理所有的 Emitter 實(shí)例:
@Component
public class EmitterManager {
// 用一個Map來保存Emitter,key可以是用戶標(biāo)識,value是對應(yīng)的Emitter
private final Map<String, ResponseBodyEmitter> emitters = new ConcurrentHashMap<>();
// 添加Emitter
public void addEmitter(String userId, ResponseBodyEmitter emitter) {
// 當(dāng)Emitter完成或出錯時,自動從Map中移除
emitter.onCompletion(() -> emitters.remove(userId));
emitter.onError(e -> emitters.remove(userId));
emitters.put(userId, emitter);
}
// 根據(jù)用戶標(biāo)識獲取Emitter
public ResponseBodyEmitter getEmitter(String userId) {
return emitters.get(userId);
}
// 移除Emitter
public void removeEmitter(String userId) {
emitters.remove(userId);
}
// 向所有用戶推送數(shù)據(jù)
public void sendToAll(Object data) throws IOException {
for (ResponseBodyEmitter emitter : emitters.values()) {
emitter.send(data);
}
}
// 向指定用戶推送數(shù)據(jù)
public void sendToUser(String userId, Object data) throws IOException {
ResponseBodyEmitter emitter = emitters.get(userId);
if (emitter != null) {
emitter.send(data);
}
}
}有了這個管理器,咱們就可以在 Controller 里根據(jù)用戶標(biāo)識來管理 Emitter 了。比如,用戶登錄后,建立一個 Emitter 連接,服務(wù)器可以向這個用戶單獨(dú)推送數(shù)據(jù),也可以向所有用戶廣播數(shù)據(jù)。
@RestController
public class UserEmitterController {
@Autowired
private EmitterManager emitterManager;
@GetMapping("/user/stream/{userId}")
public ResponseBodyEmitter userStream(@PathVariable String userId) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter(30000L);
emitterManager.addEmitter(userId, emitter);
return emitter;
}
@PostMapping("/send/to/user")
public String sendToUser(@RequestParam String userId, @RequestParam String message) {
try {
emitterManager.sendToUser(userId, message);
return "發(fā)送成功";
} catch (IOException e) {
return "發(fā)送失敗:" + e.getMessage();
}
}
@PostMapping("/send/to/all")
public String sendToAll(@RequestParam String message) {
try {
emitterManager.sendToAll(message);
return "發(fā)送成功";
} catch (IOException e) {
return "發(fā)送失?。? + e.getMessage();
}
}
}這樣一來,就實(shí)現(xiàn)了針對用戶的實(shí)時推送功能。比如在一個在線聊天系統(tǒng)里,A 用戶給 B 用戶發(fā)消息,服務(wù)器就可以通過 B 用戶的 Emitter 把消息推過去;如果有系統(tǒng)公告,就可以通過 sendToAll 推給所有在線用戶。不過,這里有個潛在的問題:如果用戶長時間不操作,Emitter 超時關(guān)閉了,這時候再給這個用戶推送消息,就會失敗。所以,咱們得想個辦法,讓客戶端在連接超時前,自動重新建立連接。
客戶端可以這么搞:當(dāng)檢測到連接關(guān)閉后,隔一小會兒就重新發(fā)起請求,建立新的 Emitter 連接。比如在 JavaScript 里:
function connect(userId) {
fetch(`/user/stream/${userId}`)
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function read() {
return reader.read().then(({ done, value }) => {
if (done) {
console.log('連接關(guān)閉,重新連接...');
// 連接關(guān)閉后,1秒后重新連接
setTimeout(() => connect(userId), 1000);
return;
}
const data = decoder.decode(value, { stream: true });
console.log('收到消息:', data);
return read();
});
}
return read();
})
.catch(error => {
console.error('連接出錯,重新連接...', error);
setTimeout(() => connect(userId), 1000);
});
}
// 頁面加載時,建立連接
connect('當(dāng)前登錄用戶的ID');這樣就能保證客戶端和服務(wù)器之間一直有一個活躍的連接了。再來說說數(shù)據(jù)格式的問題。上面的例子里,咱們推送的都是字符串,實(shí)際項(xiàng)目中,可能需要推送 JSON 格式的數(shù)據(jù)。沒關(guān)系,ResponseBodyEmitter 支持發(fā)送各種類型的對象,Spring 會自動幫我們進(jìn)行序列化。
比如,咱們定義一個消息類:
public class Message {
private String type; // 消息類型
private String content; // 消息內(nèi)容
private long timestamp; // 時間戳
// 構(gòu)造方法、getter、setter省略
}然后在發(fā)送的時候,直接 send 這個對象就行:
Message message = new Message();
message.setType("chat");
message.setContent("你好啊,老鐵");
message.setTimestamp(System.currentTimeMillis());
emitter.send(message);Spring 會默認(rèn)把它序列化成 JSON 格式的字符串推送給客戶端,客戶端收到后再解析成 JSON 對象就行,非常方便??赡苡屑?xì)心的兄弟發(fā)現(xiàn)了,咱們在發(fā)送數(shù)據(jù)的時候,用的是 emitter.send () 方法,那這個方法是線程安全的嗎?答案是:是的。ResponseBodyEmitter 的 send () 方法是線程安全的,所以我們可以在多個線程里同時調(diào)用 send () 方法推送數(shù)據(jù),不用擔(dān)心并發(fā)問題。這一點(diǎn)還是挺貼心的,省得咱們自己加鎖了。
不過,雖然 send () 方法是線程安全的,但也不能瞎用。如果推送的數(shù)據(jù)量特別大,或者推送頻率特別高,還是可能會造成性能問題。這時候,咱們可能需要考慮用線程池來管理發(fā)送線程,避免創(chuàng)建過多的線程。
比如,可以在 EmitterManager 里注入一個線程池:
@Component
public class EmitterManager {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
// 其他代碼省略...
public void sendToUser(String userId, Object data) {
ResponseBodyEmitter emitter = emitters.get(userId);
if (emitter != null) {
executor.submit(() -> {
try {
emitter.send(data);
} catch (IOException e) {
// 處理異常
emitter.completeWithError(e);
emitters.remove(userId);
}
});
}
}
}這樣,發(fā)送數(shù)據(jù)的操作就會交給線程池里的線程去處理,不會阻塞主線程,也能控制并發(fā)量。另外,咱們還得考慮一個問題:如果服務(wù)器重啟了,或者 Emitter 因?yàn)槟承┰蛞馔怅P(guān)閉了,客戶端怎么知道?除了客戶端自動重連之外,服務(wù)器也可以在推送一些 “心跳” 數(shù)據(jù),告訴客戶端 “我還活著”。比如,每隔一段時間,給所有客戶端推送一個空消息或者特定格式的心跳消息。
@Component
publicclass HeartbeatTask {
@Autowired
private EmitterManager emitterManager;
@Scheduled(fixedRate = 10000) // 每10秒執(zhí)行一次
public void sendHeartbeat() {
try {
Message heartbeat = new Message();
heartbeat.setType("heartbeat");
heartbeat.setContent("");
heartbeat.setTimestamp(System.currentTimeMillis());
emitterManager.sendToAll(heartbeat);
} catch (IOException e) {
// 處理異常
}
}
}客戶端收到心跳消息后,就知道連接還是正常的,不用重新連接。如果超過一定時間沒收到心跳,就主動重新連接。說到這里,可能有兄弟會拿 ResponseBodyEmitter 和 WebSocket 做比較。其實(shí),這倆各有各的適用場景。WebSocket 適合那種需要雙向通信、實(shí)時性要求特別高的場景,比如在線游戲、實(shí)時協(xié)作編輯等。而 ResponseBodyEmitter 更適合那種服務(wù)器單向推送數(shù)據(jù),客戶端主要是接收的場景,比如實(shí)時數(shù)據(jù)監(jiān)控、消息通知等。而且,ResponseBodyEmitter 用起來比 WebSocket 簡單多了,不需要額外的協(xié)議支持,直接在 HTTP 協(xié)議上就能玩。
還有一個和 ResponseBodyEmitter 類似的東西,叫 SseEmitter,它是 ResponseBodyEmitter 的子類,專門用來支持 Server-Sent Events(SSE)。SSE 是一種 HTML5 規(guī)范,專門用于服務(wù)器向客戶端推送事件流。如果你的客戶端是瀏覽器,用 SseEmitter 可能更合適,因?yàn)闉g覽器原生支持 SSE 的接收。
SseEmitter 的用法和 ResponseBodyEmitter 差不多,就是 send 方法可以發(fā)送 SseEventBuilder 對象,能設(shè)置事件 ID、事件類型等信息:
@GetMapping("/sse/stream")
public SseEmitter sseStream() {
SseEmitter emitter = new SseEmitter(30000L);
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
// 構(gòu)建一個SSE事件
SseEmitter.SseEventBuilder event = SseEmitter.event()
.id(String.valueOf(i)) // 事件ID
.name("message") // 事件類型
.data("這是第" + (i + 1)次SSE推送的數(shù)據(jù)");
emitter.send(event);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}客戶端用瀏覽器原生的 EventSource 來接收:
const source = new EventSource('/sse/stream');
// 監(jiān)聽message類型的事件
source.addEventListener('message', function(event) {
console.log('收到SSE數(shù)據(jù):', event.data);
});
// 監(jiān)聽錯誤
source.onerror = function(error) {
console.error('SSE錯誤:', error);
source.close();
};是不是也挺簡單的?所以,如果你的應(yīng)用主要面向?yàn)g覽器客戶端,SseEmitter 可能是個更好的選擇,畢竟是瀏覽器原生支持的。咱們再來說說 ResponseBodyEmitter 在實(shí)際項(xiàng)目中可能遇到的一些坑。
第一個坑就是超時問題。如果推送數(shù)據(jù)的時間間隔比較長,超過了設(shè)置的超時時間,Emitter 就會自動關(guān)閉。這時候,要么把超時時間設(shè)置長一點(diǎn),要么在超時前發(fā)送一個心跳消息,重置超時計時器。
第二個坑是內(nèi)存泄漏。如果 Emitter 關(guān)閉后沒有從管理器中移除,就會造成內(nèi)存泄漏。所以,一定要在 Emitter 的 onCompletion 和 onError 回調(diào)里,把它從 Map 中刪掉,就像咱們在 EmitterManager 里做的那樣。
第三個坑是數(shù)據(jù)亂序。因?yàn)?send () 方法可以在多個線程中調(diào)用,如果多個線程同時發(fā)送數(shù)據(jù),客戶端收到的數(shù)據(jù)可能是亂序的。如果你的業(yè)務(wù)要求數(shù)據(jù)必須有序,那就要自己想辦法保證,比如給數(shù)據(jù)加個序號,客戶端收到后再排序。
第四個坑是大數(shù)據(jù)量推送。如果一次性推送的數(shù)據(jù)量特別大,可能會導(dǎo)致客戶端處理不過來,或者服務(wù)器內(nèi)存占用過高。這時候,可以把大數(shù)據(jù)拆分成小塊,分多次推送,客戶端收到后再拼接起來。
除了這些坑,還有一些性能優(yōu)化的點(diǎn)可以說說。
首先,Emitter 的數(shù)量不能太多。每個 Emitter 都會占用一個服務(wù)器連接,如果并發(fā)用戶特別多,Emitter 數(shù)量就會很多,會消耗大量的服務(wù)器資源。這時候,可能需要考慮其他方案,比如結(jié)合消息隊(duì)列,或者用 WebSocket 的廣播功能。
其次,推送的數(shù)據(jù)要盡量精簡。別把沒用的字段都往客戶端推,不僅浪費(fèi)帶寬,還增加了序列化和反序列化的開銷。
再者,可以考慮使用壓縮。如果推送的文本數(shù)據(jù)比較多,可以開啟 Gzip 壓縮,能大大減少數(shù)據(jù)傳輸量。在 SpringBoot 里,開啟 Gzip 壓縮很簡單,在 application.properties 里配置一下就行:
server.compression.enabled=true
server.compression.mime-types=application/json,application/xml,text/html,text/plain最后,要做好監(jiān)控和告警。比如監(jiān)控 Emitter 的數(shù)量、發(fā)送數(shù)據(jù)的頻率、連接超時的次數(shù)等,一旦出現(xiàn)異常,及時告警,方便排查問題。咱們再來舉個實(shí)際點(diǎn)的例子,看看 ResponseBodyEmitter 在項(xiàng)目中具體能怎么用。
假設(shè)咱們要做一個實(shí)時日志查看功能,用戶在網(wǎng)頁上可以實(shí)時看到服務(wù)器的日志輸出。這時候,就可以用 ResponseBodyEmitter 來實(shí)現(xiàn)。
服務(wù)器端可以這樣搞:
@RestController
publicclass LogController {
@Autowired
private EmitterManager emitterManager;
@GetMapping("/logs/stream/{userId}")
public ResponseBodyEmitter logStream(@PathVariableString userId) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter(60000L);
emitterManager.addEmitter(userId, emitter);
// 啟動一個線程,實(shí)時讀取日志文件,并推送給客戶端
new Thread(() -> {
try {
// 這里只是示例,實(shí)際中可以用FileTailer之類的工具實(shí)時讀取日志
BufferedReader reader = new BufferedReader(new FileReader("application.log"));
String line;
while ((line = reader.readLine()) != null) {
// 如果Emitter已經(jīng)關(guān)閉,就退出循環(huán)
if (emitter.isCompleted() || emitter.isClosed()) {
break;
}
emitter.send(line);
// 稍微休眠一下,避免推送太快
Thread.sleep(100);
}
reader.close();
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}客戶端頁面上,就可以用 JavaScript 實(shí)時接收日志,并顯示在頁面上:
<div id="logContainer" style="white-space: pre;"></div>
<script>
function connectLogStream(userId) {
fetch(`/logs/stream/${userId}`)
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
const logContainer = document.getElementById('logContainer');
function read() {
return reader.read().then(({ done, value }) => {
if (done) {
console.log('日志流結(jié)束,重新連接...');
setTimeout(() => connectLogStream(userId), 1000);
return;
}
const line = decoder.decode(value, { stream: true });
// 把新的日志行添加到頁面上
logContainer.textContent += line + '\n';
// 自動滾動到底部
logContainer.scrollTop = logContainer.scrollHeight;
return read();
});
}
return read();
})
.catch(error => {
console.error('日志流出錯,重新連接...', error);
setTimeout(() => connectLogStream(userId), 1000);
});
}
// 假設(shè)當(dāng)前用戶ID是123
connectLogStream('123');
</script>這樣,用戶打開頁面后,就能實(shí)時看到服務(wù)器日志不斷地在頁面上刷新,就像在服務(wù)器上用 tail -f 命令看日志一樣,體驗(yàn)還是挺不錯的。再比如,在一個數(shù)據(jù)導(dǎo)入的場景中,用戶上傳一個大文件,服務(wù)器在后臺異步處理,同時可以通過 ResponseBodyEmitter 實(shí)時向客戶端推送處理進(jìn)度,比如 “已處理 10%”、“已處理 50%”、“處理完成” 等。客戶端收到這些進(jìn)度信息后,就可以在頁面上顯示一個進(jìn)度條,讓用戶知道當(dāng)前的處理狀態(tài),不至于以為頁面卡了。
說了這么多,總結(jié)一下 ResponseBodyEmitter 的優(yōu)點(diǎn):
- 用法簡單,集成在 SpringBoot 里,不用引入額外的依賴。
- 基于 HTTP 協(xié)議,兼容性好,客戶端不用做太多特殊處理。
- 支持異步流式推送,能提升用戶體驗(yàn)。
- 線程安全,多個線程可以同時發(fā)送數(shù)據(jù)。
當(dāng)然,它也有一些局限性:
- 主要是服務(wù)器單向推送,客戶端不能通過這個連接向服務(wù)器發(fā)送數(shù)據(jù)(除非再發(fā)一個 HTTP 請求)。
- 并發(fā)量大的時候,會占用較多的服務(wù)器連接資源。
- 依賴 HTTP 長連接,可能會被代理服務(wù)器、防火墻等中途斷開。
所以,在選擇技術(shù)方案的時候,要根據(jù)實(shí)際的業(yè)務(wù)場景來決定。如果是簡單的實(shí)時推送場景,ResponseBodyEmitter 是個不錯的選擇,足夠優(yōu)雅,也足夠好用。如果是復(fù)雜的雙向?qū)崟r通信場景,可能還是 WebSocket 更合適。
最后,再給大家提幾個在實(shí)際項(xiàng)目中使用 ResponseBodyEmitter 的小建議:
- 做好異常處理。無論是服務(wù)器發(fā)送數(shù)據(jù)出錯,還是客戶端連接斷開,都要妥善處理,避免程序崩潰,或者出現(xiàn)資源泄漏。
- 合理設(shè)置超時時間。根據(jù)業(yè)務(wù)的實(shí)際情況,設(shè)置一個合適的超時時間,既保證連接的穩(wěn)定性,又不會浪費(fèi)服務(wù)器資源。
- 考慮斷線重連機(jī)制??蛻舳撕头?wù)器都要做好斷線重連的準(zhǔn)備,保證推送的連續(xù)性。
- 進(jìn)行充分的測試。特別是在高并發(fā)場景下,要測試 Emitter 的性能和穩(wěn)定性,看看它能不能扛住壓力。
- 不要過度使用。不是所有場景都需要流式推送,簡單的 “一問一答” 模式能解決的問題,就別用這么復(fù)雜的東西。
總的來說,SpringBoot 的 ResponseBodyEmitter 是一個非常實(shí)用的工具,用好了能給咱們的項(xiàng)目帶來很大的便利,提升用戶體驗(yàn)。它不像有些技術(shù)那樣晦澀難懂,只要理解了它的基本原理和用法,上手還是挺容易的。


































