直播必備!Spring Boot 實(shí)現(xiàn)高并發(fā)觀眾統(tǒng)計(jì)功能的技術(shù)方案揭秘
隨著直播平臺(tái)用戶數(shù)量激增,實(shí)時(shí)觀眾統(tǒng)計(jì)功能成為衡量直播間熱度和服務(wù)質(zhì)量的核心指標(biāo)之一。然而,在高并發(fā)場景下,如何保證統(tǒng)計(jì)數(shù)據(jù)的準(zhǔn)確性與系統(tǒng)性能,是一個(gè)關(guān)鍵技術(shù)挑戰(zhàn)。本文將深入探討如何基于 Spring Boot 框架,構(gòu)建一個(gè)支持高并發(fā)、低延遲、可擴(kuò)展的觀眾統(tǒng)計(jì)系統(tǒng)。
系統(tǒng)需求分析
功能需求
- 實(shí)時(shí)統(tǒng)計(jì)每個(gè)直播間的在線觀眾人數(shù)。
- 支持觀眾進(jìn)入、退出時(shí)的即時(shí)更新。
- 提供 REST API 接口供前端定時(shí)拉取或后端推送使用。
- 支持 WebSocket 實(shí)時(shí)推送觀眾人數(shù)變化。
技術(shù)挑戰(zhàn)
- 并發(fā)量高:直播高峰期間同一直播間可能存在上萬并發(fā)連接。
- 數(shù)據(jù)一致性:進(jìn)入、退出直播間事件頻繁,需精準(zhǔn)計(jì)算。
- 系統(tǒng)性能:避免頻繁操作數(shù)據(jù)庫導(dǎo)致性能瓶頸。
系統(tǒng)架構(gòu)設(shè)計(jì)
+----------------+ +------------------+ +------------------+
| Web Frontend | <-----> | Spring Boot API | <-----> | Redis In-Memory |
| (WebSocket) | | + WebSocket | | Store |
+----------------+ +------------------+ +------------------+
|
v
+---------------+
| MySQL (持久化) |
+---------------+
核心組件
- Spring Boot API:提供 REST 接口與 WebSocket 服務(wù)。
- Redis:作為高性能計(jì)數(shù)器,用于記錄各直播間實(shí)時(shí)觀眾數(shù)。
- MySQL:作為歷史數(shù)據(jù)的持久化存儲(chǔ)。
技術(shù)實(shí)現(xiàn)方案(含代碼)
Redis 結(jié)構(gòu)設(shè)計(jì)
- 使用 Hash + Set 結(jié)構(gòu)保存直播間觀眾信息。
Key: live:room:{roomId}:viewers
Type: Set
Value: 用戶唯一標(biāo)識(shí)(userId 或 sessionId)
Key: live:room:{roomId}:viewerCount
Type: String
Value: 當(dāng)前觀眾人數(shù)
用戶進(jìn)入/退出直播間邏輯
我們通過 Redis 的 Set 來記錄唯一用戶 ID,同時(shí)通過計(jì)數(shù)器確保人數(shù)準(zhǔn)確。
@Service
public class LiveViewerService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String VIEWER_SET_KEY = "live:room:%s:viewers";
private static final String COUNT_KEY = "live:room:%s:viewerCount";
/**
* 用戶進(jìn)入直播間
*/
public void userEnter(String roomId, String userId) {
String viewerSetKey = String.format(VIEWER_SET_KEY, roomId);
String countKey = String.format(COUNT_KEY, roomId);
// 添加用戶到觀眾集合
Boolean isNew = redisTemplate.opsForSet().add(viewerSetKey, userId) == 1;
if (isNew) {
// 若是新用戶,人數(shù) +1
redisTemplate.opsForValue().increment(countKey);
}
}
/**
* 用戶退出直播間
*/
public void userExit(String roomId, String userId) {
String viewerSetKey = String.format(VIEWER_SET_KEY, roomId);
String countKey = String.format(COUNT_KEY, roomId);
// 移除用戶
Boolean removed = redisTemplate.opsForSet().remove(viewerSetKey, userId) == 1;
if (removed) {
redisTemplate.opsForValue().decrement(countKey);
}
}
/**
* 獲取直播間當(dāng)前觀眾人數(shù)
*/
public Long getViewerCount(String roomId) {
String countKey = String.format(COUNT_KEY, roomId);
String count = redisTemplate.opsForValue().get(countKey);
return count == null ? 0 : Long.parseLong(count);
}
}
REST 接口示例
@RestController
@RequestMapping("/api/live")
public class LiveController {
@Autowired
private LiveViewerService viewerService;
@PostMapping("/enter")
public ResponseEntity<String> enter(@RequestParam String roomId, @RequestParam String userId) {
viewerService.userEnter(roomId, userId);
return ResponseEntity.ok("entered");
}
@PostMapping("/exit")
public ResponseEntity<String> exit(@RequestParam String roomId, @RequestParam String userId) {
viewerService.userExit(roomId, userId);
return ResponseEntity.ok("exited");
}
@GetMapping("/count")
public ResponseEntity<Long> getCount(@RequestParam String roomId) {
return ResponseEntity.ok(viewerService.getViewerCount(roomId));
}
}
WebSocket 實(shí)時(shí)推送實(shí)現(xiàn)
@ServerEndpoint("/ws/viewerCount/{roomId}")
@Component
public class ViewerCountWebSocket {
private static Map<String, Set<Session>> roomSessions = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam("roomId") String roomId) {
roomSessions.computeIfAbsent(roomId, k -> ConcurrentHashMap.newKeySet()).add(session);
}
@OnClose
public void onClose(Session session, @PathParam("roomId") String roomId) {
Set<Session> sessions = roomSessions.get(roomId);
if (sessions != null) {
sessions.remove(session);
}
}
public static void broadcastViewerCount(String roomId, Long count) {
Set<Session> sessions = roomSessions.get(roomId);
if (sessions != null) {
for (Session session : sessions) {
session.getAsyncRemote().sendText(count.toString());
}
}
}
}
在 LiveViewerService
中用戶每次進(jìn)入或退出后,調(diào)用:
ViewerCountWebSocket.broadcastViewerCount(roomId, getViewerCount(roomId));
即可實(shí)時(shí)推送更新。
前端集成示例 (Thymeleaf + JavaScript)
<script>
const roomId = "10001";
const socket = new WebSocket("ws://localhost:8080/ws/viewerCount/" + roomId);
socket.onmessage = function(event) {
document.getElementById("viewerCount").innerText = event.data;
};
</script>
<span>當(dāng)前觀眾人數(shù):<strong id="viewerCount">0</strong></span>
系統(tǒng)優(yōu)化建議
- Redis 原子性保障:使用 Lua 腳本處理計(jì)數(shù)與集合操作,避免競態(tài)。
- 斷開連接處理:使用心跳機(jī)制或定時(shí)任務(wù)清理無效連接。
- Redis 過期策略:為每個(gè) Set 設(shè)置過期時(shí)間,避免垃圾數(shù)據(jù)堆積。
- 數(shù)據(jù)持久化:定期將統(tǒng)計(jì)數(shù)據(jù)寫入數(shù)據(jù)庫,供歷史分析使用。
總結(jié)
通過 Redis 的高并發(fā)處理能力,結(jié)合 Spring Boot 提供的 REST 接口和 WebSocket 推送機(jī)制,我們可以實(shí)現(xiàn)一個(gè)既輕量又高效的直播觀眾統(tǒng)計(jì)系統(tǒng)。該方案不僅能應(yīng)對大量用戶同時(shí)在線的場景,還具備良好的可擴(kuò)展性與實(shí)時(shí)性,是現(xiàn)代直播平臺(tái)不可或缺的基礎(chǔ)能力。