C++多線(xiàn)程網(wǎng)絡(luò)編程:助力高并發(fā)服務(wù)器性能提升
在數(shù)字化時(shí)代,高并發(fā)是互聯(lián)網(wǎng)服務(wù)的常態(tài) —— 電商購(gòu)物節(jié)的海量訂單、社交網(wǎng)絡(luò)的熱門(mén)話(huà)題討論、在線(xiàn)游戲的萬(wàn)人同服,都需要強(qiáng)大的并發(fā)處理能力。高并發(fā)服務(wù)器作為核心支柱,其性能與穩(wěn)定性直接影響用戶(hù)體驗(yàn)和業(yè)務(wù)成敗。C++ 憑借卓越性能、高效執(zhí)行效率和對(duì)系統(tǒng)資源的精準(zhǔn)掌控,在高并發(fā)服務(wù)器開(kāi)發(fā)中地位關(guān)鍵。多線(xiàn)程網(wǎng)絡(luò)編程更是其核心優(yōu)勢(shì),能充分利用多核 CPU 算力,讓服務(wù)器同時(shí)處理多個(gè)任務(wù),大幅提升并發(fā)處理能力和響應(yīng)速度。
想象一下,在一場(chǎng)激烈的電商大促中,C++ 多線(xiàn)程網(wǎng)絡(luò)編程驅(qū)動(dòng)的服務(wù)器如同一位不知疲倦的超級(jí)英雄,輕松應(yīng)對(duì)數(shù)百萬(wàn)用戶(hù)同時(shí)下單的請(qǐng)求,快速準(zhǔn)確地處理每一筆交易,讓購(gòu)物車(chē)秒變快遞盒,讓用戶(hù)暢享絲滑的購(gòu)物體驗(yàn)。在這篇文章中,我們將一同深入 C++ 多線(xiàn)程網(wǎng)絡(luò)編程的奇妙世界,從基礎(chǔ)概念到高級(jí)技巧,從理論知識(shí)到實(shí)戰(zhàn)演練,揭開(kāi)高并發(fā)服務(wù)器開(kāi)發(fā)的神秘面紗,為你打造一把開(kāi)啟高性能編程大門(mén)的鑰匙。
Part1.C++ 多線(xiàn)程編程
1.1線(xiàn)程和進(jìn)程是什么?
線(xiàn)程,作為操作系統(tǒng)中一個(gè)至關(guān)重要的概念,是程序執(zhí)行流的最小單元,也被稱(chēng)為輕量級(jí)進(jìn)程。如果把進(jìn)程看作是一個(gè)正在運(yùn)行的工廠,那么線(xiàn)程就是工廠里的各個(gè)生產(chǎn)線(xiàn),每個(gè)生產(chǎn)線(xiàn)都可以獨(dú)立運(yùn)作,同時(shí)又共享著工廠的資源,如場(chǎng)地、設(shè)備等 。線(xiàn)程自己基本不擁有系統(tǒng)資源,僅持有一些在運(yùn)行中必不可少的資源,像程序計(jì)數(shù)器、一組寄存器和棧,但它能夠與同屬一個(gè)進(jìn)程的其他線(xiàn)程共享進(jìn)程所擁有的全部資源。
進(jìn)程是操作系統(tǒng)進(jìn)行資源分配和調(diào)度的獨(dú)立單位,擁有獨(dú)立的地址空間、代碼和數(shù)據(jù)空間。而線(xiàn)程則是進(jìn)程中的實(shí)際運(yùn)作單位,是 CPU 調(diào)度和分派的基本單位。一個(gè)進(jìn)程可以包含多個(gè)線(xiàn)程,這些線(xiàn)程共享進(jìn)程的資源,使得它們之間的通信和數(shù)據(jù)共享更加高效,但也帶來(lái)了線(xiàn)程安全的問(wèn)題。就好比一個(gè)辦公室里有多個(gè)員工(線(xiàn)程),他們共享辦公室的辦公用品(進(jìn)程資源),如果多個(gè)員工同時(shí)爭(zhēng)搶使用同一臺(tái)打印機(jī)(共享資源),就可能出現(xiàn)混亂,這就需要一些規(guī)則(同步機(jī)制)來(lái)協(xié)調(diào)。
在 Linux 系統(tǒng)中,我們可以使用 fork () 函數(shù)來(lái)創(chuàng)建新的進(jìn)程。fork () 函數(shù)的作用是復(fù)制當(dāng)前進(jìn)程,生成一個(gè)子進(jìn)程。這個(gè)子進(jìn)程幾乎是父進(jìn)程的一個(gè)副本,它擁有與父進(jìn)程相同的代碼、數(shù)據(jù)和文件描述符等。
fork () 函數(shù)的原理并不復(fù)雜。當(dāng)父進(jìn)程調(diào)用 fork () 時(shí),操作系統(tǒng)會(huì)為子進(jìn)程分配一個(gè)新的進(jìn)程控制塊(PCB),用于管理子進(jìn)程的相關(guān)信息。子進(jìn)程會(huì)繼承父進(jìn)程的大部分資源,包括內(nèi)存空間的映射(但有寫(xiě)時(shí)復(fù)制機(jī)制,后面會(huì)詳細(xì)介紹)、打開(kāi)的文件描述符等。
fork () 函數(shù)有一個(gè)獨(dú)特的返回值特性:在父進(jìn)程中,它返回子進(jìn)程的進(jìn)程 ID(PID);而在子進(jìn)程中,它返回 0。通過(guò)這個(gè)返回值,我們可以區(qū)分當(dāng)前是父進(jìn)程還是子進(jìn)程在執(zhí)行,從而讓它們執(zhí)行不同的代碼邏輯。下面是一個(gè)簡(jiǎn)單的代碼示例:
#include <stdio.h>
#include <unistd.h>
int main() {
pid_t pid;
// 調(diào)用fork()創(chuàng)建子進(jìn)程
pid = fork();
if (pid < 0) {
// fork()失敗
perror("fork failed");
return 1;
} else if (pid == 0) {
// 子進(jìn)程
printf("I am the child process, my PID is %d, my parent's PID is %d\n", getpid(), getppid());
} else {
// 父進(jìn)程
printf("I am the parent process, my PID is %d, my child's PID is %d\n", getpid(), pid);
}
return 0;
}運(yùn)行這段代碼,你會(huì)看到父進(jìn)程和子進(jìn)程分別輸出不同的信息,證明它們是獨(dú)立運(yùn)行的。
在多核處理器的環(huán)境下,多線(xiàn)程編程能夠充分發(fā)揮硬件的并行處理能力,極大地提高程序的執(zhí)行效率。比如,在一個(gè)圖像編輯軟件中,一個(gè)線(xiàn)程可以負(fù)責(zé)顯示圖像,另一個(gè)線(xiàn)程可以同時(shí)進(jìn)行圖像的處理,這樣用戶(hù)就能感受到更流暢的操作體驗(yàn)。
1.2 C++ 線(xiàn)程庫(kù)初體驗(yàn)
從 C++11 開(kāi)始,標(biāo)準(zhǔn)庫(kù)引入了<thread>頭文件,為我們提供了方便的線(xiàn)程相關(guān)類(lèi)和函數(shù),使得編寫(xiě)跨平臺(tái)的多線(xiàn)程程序變得更加容易。要?jiǎng)?chuàng)建一個(gè)線(xiàn)程,我們可以使用std::thread類(lèi)。這個(gè)類(lèi)的構(gòu)造函數(shù)接受一個(gè)可調(diào)用對(duì)象,比如普通函數(shù)、lambda 表達(dá)式或者成員函數(shù),作為線(xiàn)程執(zhí)行的入口點(diǎn)。
下面是一個(gè)簡(jiǎn)單的示例,展示了如何創(chuàng)建和啟動(dòng)一個(gè)線(xiàn)程:
#include <iostream>
#include <thread>
// 普通函數(shù),作為線(xiàn)程執(zhí)行的任務(wù)
void printHello() {
std::cout << "Hello from thread!" << std::endl;
}
int main() {
// 創(chuàng)建線(xiàn)程對(duì)象,傳入printHello函數(shù)
std::thread t(printHello);
// 主線(xiàn)程繼續(xù)執(zhí)行
std::cout << "Hello from main!" << std::endl;
// 等待子線(xiàn)程完成
t.join();
return 0;
}在這個(gè)例子中,我們首先定義了一個(gè)printHello函數(shù),它將在新線(xiàn)程中執(zhí)行。然后,在main函數(shù)中,我們創(chuàng)建了一個(gè)std::thread對(duì)象t,并將printHello函數(shù)作為參數(shù)傳遞給它,這就啟動(dòng)了一個(gè)新線(xiàn)程。主線(xiàn)程會(huì)繼續(xù)執(zhí)行后續(xù)的代碼,輸出Hello from main!。最后,通過(guò)調(diào)用t.join(),主線(xiàn)程會(huì)阻塞,等待子線(xiàn)程執(zhí)行完畢,確保程序的正常結(jié)束。
除了普通函數(shù),我們還可以使用 lambda 表達(dá)式來(lái)創(chuàng)建線(xiàn)程,讓代碼更加簡(jiǎn)潔和靈活:
#include <iostream>
#include <thread>
int main() {
// 使用lambda表達(dá)式創(chuàng)建線(xiàn)程
std::thread t([]() {
std::cout << "Hello from thread created by lambda!" << std::endl;
});
std::cout << "Hello from main!" << std::endl;
t.join();
return 0;
}在這個(gè)示例中,我們直接在std::thread的構(gòu)造函數(shù)中傳入了一個(gè) lambda 表達(dá)式,這個(gè)表達(dá)式定義了線(xiàn)程要執(zhí)行的任務(wù);通過(guò)這兩個(gè)簡(jiǎn)單的示例,我們對(duì) C++ 線(xiàn)程庫(kù)的基本使用有了初步的了解。創(chuàng)建和啟動(dòng)線(xiàn)程是多線(xiàn)程編程的基礎(chǔ),后續(xù)我們還會(huì)深入探討線(xiàn)程的同步、通信以及更高級(jí)的應(yīng)用場(chǎng)景。
Part2.網(wǎng)絡(luò)編程基礎(chǔ)
2.1套接字(Socket)編程詳解
套接字(Socket)堪稱(chēng)網(wǎng)絡(luò)編程領(lǐng)域中最為基礎(chǔ)且關(guān)鍵的概念,它是一種抽象的數(shù)據(jù)結(jié)構(gòu),為網(wǎng)絡(luò)應(yīng)用程序之間提供了至關(guān)重要的通信接口。從本質(zhì)上講,套接字就像是網(wǎng)絡(luò)通信中的一個(gè)端點(diǎn),承擔(dān)著發(fā)送和接收數(shù)據(jù)的重要職責(zé),使得運(yùn)行在不同機(jī)器上的應(yīng)用程序能夠?qū)崿F(xiàn)信息的交換,進(jìn)而達(dá)成各種網(wǎng)絡(luò)功能 。
套接字主要分為兩種類(lèi)型,分別是基于 TCP 協(xié)議的流式套接字(SOCK_STREAM)和基于 UDP 協(xié)議的數(shù)據(jù)報(bào)套接字(SOCK_DGRAM)。TCP 套接字以其可靠的連接特性著稱(chēng),它能夠確保數(shù)據(jù)的有序傳輸,并且通過(guò)確認(rèn)機(jī)制和重傳機(jī)制來(lái)保證數(shù)據(jù)的完整性,就如同一位嚴(yán)謹(jǐn)?shù)目爝f員,確保每個(gè)包裹都能準(zhǔn)確無(wú)誤地送達(dá)目的地;而 UDP 套接字則提供了無(wú)連接的服務(wù),它具有傳輸速度快、效率高的優(yōu)勢(shì),不過(guò)不保證數(shù)據(jù)包的到達(dá)順序,甚至可能會(huì)出現(xiàn)數(shù)據(jù)包丟失的情況,類(lèi)似于普通郵件的投遞,雖然速度較快,但不能完全保證郵件的送達(dá)。
圖片
基于 TCP 協(xié)議的客戶(hù)端和服務(wù)器:
- 服務(wù)端和客戶(hù)端初始化 socket,得到文件描述符;
- 服務(wù)端調(diào)用 bind,綁定 IP 地址和端口;
- 服務(wù)端調(diào)用 listen,進(jìn)行監(jiān)聽(tīng);
- 服務(wù)端調(diào)用 accept,等待客戶(hù)端連接;
- 客戶(hù)端調(diào)用 connect,向服務(wù)器端的地址和端口發(fā)起連接請(qǐng)求;
- 服務(wù)端 accept 返回 用于傳輸?shù)?socket的文件描述符;
- 客戶(hù)端調(diào)用 write 寫(xiě)入數(shù)據(jù);服務(wù)端調(diào)用 read 讀取數(shù)據(jù);
- 客戶(hù)端斷開(kāi)連接時(shí),會(huì)調(diào)用 close,那么服務(wù)端 read 讀取數(shù)據(jù)的時(shí)候,就會(huì)讀取到了 EOF,待處理完數(shù)據(jù)后,服務(wù)端調(diào)用 close,表示連接關(guān)閉。
這里需要注意的是,服務(wù)端調(diào)用 accept 時(shí),連接成功了會(huì)返回一個(gè)已完成連接的 socket,后續(xù)用來(lái)傳輸數(shù)據(jù);所以,監(jiān)聽(tīng)的 socket 和真正用來(lái)傳送數(shù)據(jù)的 socket,是「兩個(gè)」 socket,一個(gè)叫作監(jiān)聽(tīng) socket,一個(gè)叫作已完成連接 socket;成功連接建立之后,雙方開(kāi)始通過(guò) read 和 write 函數(shù)來(lái)讀寫(xiě)數(shù)據(jù),就像往一個(gè)文件流里面寫(xiě)東西一樣。
在 C++ 中,進(jìn)行套接字編程時(shí),常用的頭文件在 Unix-like 系統(tǒng)(包括 Linux 和 macOS)上是<sys/socket.h>、<netinet/in.h>、<arpa/inet.h>等,而在 Windows 系統(tǒng)上則需要使用<winsock2.h>頭文件,并且還需要進(jìn)行特定的初始化操作 。下面通過(guò)一個(gè)簡(jiǎn)單的服務(wù)器和客戶(hù)端示例代碼,來(lái)詳細(xì)說(shuō)明如何在 C++ 中使用套接字進(jìn)行網(wǎng)絡(luò)通信。
服務(wù)器端代碼:
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
#define PORT 8080
int main() {
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[1024] = {0};
// 創(chuàng)建套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 綁定套接字
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
if (bind(server_fd, (struct sockaddr *) &address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 監(jiān)聽(tīng)連接
if (listen(server_fd, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
// 接受連接
if ((new_socket = accept(server_fd, (struct sockaddr *) &address, (socklen_t *) &addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
}
// 接收數(shù)據(jù)
int valread = read(new_socket, buffer, 1024);
std::cout << "Received: " << buffer << std::endl;
// 發(fā)送數(shù)據(jù)
const char *response = "Hello from server";
send(new_socket, response, strlen(response), 0);
std::cout << "Response sent" << std::endl;
// 關(guān)閉套接字
close(new_socket);
close(server_fd);
return 0;
}在這段服務(wù)器端代碼中,首先使用socket函數(shù)創(chuàng)建了一個(gè)套接字,其中AF_INET表示使用 IPv4 協(xié)議,SOCK_STREAM表示使用 TCP 協(xié)議 。接著,通過(guò)setsockopt函數(shù)設(shè)置套接字選項(xiàng),允許地址和端口的重用,以避免端口被占用的問(wèn)題。然后,將套接字綁定到指定的 IP 地址(INADDR_ANY表示接受所有網(wǎng)卡的連接)和端口上。
使用listen函數(shù)將套接字設(shè)置為監(jiān)聽(tīng)模式,等待客戶(hù)端的連接請(qǐng)求,參數(shù)3表示允許的最大連接數(shù)。當(dāng)有客戶(hù)端連接時(shí),accept函數(shù)會(huì)從已連接隊(duì)列中提取第一個(gè)連接請(qǐng)求,并返回一個(gè)新的套接字new_socket,用于與客戶(hù)端進(jìn)行通信。之后,通過(guò)read函數(shù)接收客戶(hù)端發(fā)送的數(shù)據(jù),再使用send函數(shù)向客戶(hù)端發(fā)送響應(yīng)數(shù)據(jù)。最后,關(guān)閉與客戶(hù)端通信的套接字new_socket以及服務(wù)器監(jiān)聽(tīng)的套接字server_fd。
客戶(hù)端代碼:
#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#define PORT 8080
int main() {
int sock = 0;
struct sockaddr_in serv_addr;
char buffer[1024] = {0};
// 創(chuàng)建套接字
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
std::cerr << "Socket creation error" << std::endl;
return -1;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// 將地址轉(zhuǎn)換成二進(jìn)制格式
if (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
std::cerr << "Invalid address/ Address not supported" << std::endl;
return -1;
}
// 連接到服務(wù)器
if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
std::cerr << "Connection Failed" << std::endl;
return -1;
}
// 發(fā)送數(shù)據(jù)
const char *message = "Hello from client";
send(sock, message, strlen(message), 0);
std::cout << "Message sent" << std::endl;
// 接收數(shù)據(jù)
int valread = read(sock, buffer, 1024);
std::cout << "Received: " << buffer << std::endl;
// 關(guān)閉套接字
close(sock);
return 0;
}客戶(hù)端代碼同樣先使用socket函數(shù)創(chuàng)建套接字,然后設(shè)置服務(wù)器的地址和端口信息。通過(guò)inet_pton函數(shù)將 IP 地址字符串轉(zhuǎn)換為二進(jìn)制格式,以便套接字能夠正確識(shí)別。接著,使用connect函數(shù)連接到服務(wù)器。連接成功后,通過(guò)send函數(shù)向服務(wù)器發(fā)送數(shù)據(jù),再使用read函數(shù)接收服務(wù)器返回的響應(yīng)數(shù)據(jù)。最后,關(guān)閉套接字。
通過(guò)這兩個(gè)示例代碼,我們可以清晰地看到在 C++ 中使用套接字進(jìn)行 TCP 網(wǎng)絡(luò)通信的基本流程,包括套接字的創(chuàng)建、綁定、監(jiān)聽(tīng)、連接以及數(shù)據(jù)的發(fā)送和接收等操作。在實(shí)際開(kāi)發(fā)中,還需要注意錯(cuò)誤處理,確保程序的穩(wěn)定性和健壯性 。
2.2網(wǎng)絡(luò)協(xié)議與數(shù)據(jù)傳輸
在網(wǎng)絡(luò)通信的廣袤領(lǐng)域中,常見(jiàn)的網(wǎng)絡(luò)協(xié)議猶如交通規(guī)則一般,規(guī)范著數(shù)據(jù)的傳輸方式和流程,其中 TCP/IP 和 UDP/IP 協(xié)議占據(jù)著舉足輕重的地位。
TCP/IP 協(xié)議是網(wǎng)絡(luò)通信的基石,它實(shí)際上是一個(gè)協(xié)議族,其中 TCP(傳輸控制協(xié)議)負(fù)責(zé)數(shù)據(jù)的可靠傳輸,通過(guò)三次握手建立連接,保證數(shù)據(jù)的有序傳輸,并利用確認(rèn)機(jī)制和重傳機(jī)制來(lái)確保數(shù)據(jù)的完整性。就好比在一場(chǎng)重要的文件傳輸任務(wù)中,TCP 協(xié)議會(huì)像一位認(rèn)真負(fù)責(zé)的管家,確保每一頁(yè)文件都能準(zhǔn)確無(wú)誤、按順序地送達(dá)目的地,若有文件丟失或損壞,它會(huì)及時(shí)要求重新發(fā)送。
而 IP(網(wǎng)際協(xié)議)則承擔(dān)著數(shù)據(jù)的路由和尋址工作,實(shí)現(xiàn)數(shù)據(jù)包在網(wǎng)絡(luò)中的傳輸,它如同快遞員手中的地址簿,根據(jù)目標(biāo)地址將數(shù)據(jù)包準(zhǔn)確地投遞到下一個(gè)節(jié)點(diǎn) 。TCP 協(xié)議適用于對(duì)數(shù)據(jù)準(zhǔn)確性和完整性要求極高的場(chǎng)景,比如文件下載、網(wǎng)頁(yè)瀏覽等,因?yàn)樵谶@些場(chǎng)景中,數(shù)據(jù)的錯(cuò)誤或丟失可能會(huì)導(dǎo)致嚴(yán)重的后果,如文件損壞無(wú)法打開(kāi)、網(wǎng)頁(yè)顯示錯(cuò)誤等。
以 TCP 協(xié)議的流式套接字為例,連接建立需要通過(guò)三次握手來(lái)完成 。三次握手的過(guò)程如下:
- 第一次握手:客戶(hù)端向服務(wù)器發(fā)送一個(gè) SYN(同步)報(bào)文段,該報(bào)文段中包含客戶(hù)端的初始序列號(hào)(Sequence Number,簡(jiǎn)稱(chēng) Seq),假設(shè)為 x 。此時(shí),客戶(hù)端進(jìn)入 SYN_SENT 狀態(tài),等待服務(wù)器的響應(yīng)。這個(gè)過(guò)程就好比客戶(hù)端給服務(wù)器打電話(huà)說(shuō):“我想和你建立連接,這是我的初始序號(hào) x”。
- 第二次握手:服務(wù)器接收到客戶(hù)端的 SYN 報(bào)文段后,會(huì)回復(fù)一個(gè) SYN-ACK(同步確認(rèn))報(bào)文段 。該報(bào)文段中包含服務(wù)器的初始序列號(hào),假設(shè)為 y,同時(shí) ACK(確認(rèn))字段的值為 x + 1,表示服務(wù)器已經(jīng)收到客戶(hù)端的 SYN 報(bào)文段,并且確認(rèn)號(hào)為客戶(hù)端的序列號(hào)加 1。此時(shí),服務(wù)器進(jìn)入 SYN_RCVD 狀態(tài)。這就像是服務(wù)器接起電話(huà)回應(yīng)客戶(hù)端:“我收到你的連接請(qǐng)求了,這是我的初始序號(hào) y,我確認(rèn)收到了你的序號(hào) x”。
- 第三次握手:客戶(hù)端接收到服務(wù)器的 SYN-ACK 報(bào)文段后,會(huì)發(fā)送一個(gè) ACK 報(bào)文段給服務(wù)器 。該報(bào)文段的 ACK 字段的值為 y + 1,表示客戶(hù)端已經(jīng)收到服務(wù)器的 SYN-ACK 報(bào)文段,并且確認(rèn)號(hào)為服務(wù)器的序列號(hào)加 1。此時(shí),客戶(hù)端進(jìn)入 ESTABLISHED 狀態(tài),服務(wù)器接收到 ACK 報(bào)文段后也進(jìn)入 ESTABLISHED 狀態(tài),連接建立成功。這相當(dāng)于客戶(hù)端再次回應(yīng)服務(wù)器:“我收到你的回復(fù)了,連接建立成功,我們可以開(kāi)始通信了”。
三次握手的作用在于確保雙方的通信能力正常,并且能夠同步初始序列號(hào),為后續(xù)的數(shù)據(jù)傳輸建立可靠的基礎(chǔ) 。通過(guò)三次握手,客戶(hù)端和服務(wù)器都能確認(rèn)對(duì)方可以正常接收和發(fā)送數(shù)據(jù),避免了舊連接請(qǐng)求的干擾,保證了連接的唯一性和正確性
UDP/IP 協(xié)議中的 UDP(用戶(hù)數(shù)據(jù)報(bào)協(xié)議)則是另一種風(fēng)格,它是一種無(wú)連接的協(xié)議,不保證數(shù)據(jù)傳輸?shù)目煽啃院晚樞蛐?。UDP 協(xié)議在傳輸數(shù)據(jù)時(shí),就像一位追求速度的快遞員,直接將數(shù)據(jù)包發(fā)送出去,不關(guān)心是否能夠準(zhǔn)確送達(dá)以及到達(dá)的順序。雖然 UDP 不提供可靠的傳輸保障,但它具有傳輸速度快、開(kāi)銷(xiāo)小的優(yōu)點(diǎn),適用于對(duì)實(shí)時(shí)性要求較高、數(shù)據(jù)丟失可以容忍的場(chǎng)景,如音視頻傳輸、在線(xiàn)游戲等。在視頻會(huì)議中,偶爾丟失幾個(gè)數(shù)據(jù)包可能只會(huì)導(dǎo)致短暫的畫(huà)面卡頓,但不會(huì)影響整個(gè)會(huì)議的進(jìn)行,而 UDP 協(xié)議的快速傳輸特性能夠保證視頻和音頻的流暢播放,讓參會(huì)者獲得較好的體驗(yàn)。
在數(shù)據(jù)傳輸階段,發(fā)送端和接收端的數(shù)據(jù)流動(dòng)過(guò)程如下:
- 發(fā)送端:應(yīng)用程序調(diào)用write或send函數(shù)將數(shù)據(jù)發(fā)送到 Socket 。這些函數(shù)會(huì)將應(yīng)用程序緩沖區(qū)中的數(shù)據(jù)拷貝到 Socket 的發(fā)送緩沖區(qū)中。然后,內(nèi)核會(huì)根據(jù) Socket 的類(lèi)型和協(xié)議,對(duì)數(shù)據(jù)進(jìn)行封裝。對(duì)于 TCP 套接字,數(shù)據(jù)會(huì)被分割成 TCP 段,并添加 TCP 頭部,包括源端口、目標(biāo)端口、序列號(hào)、確認(rèn)號(hào)等信息;對(duì)于 UDP 套接字,數(shù)據(jù)會(huì)被封裝成 UDP 數(shù)據(jù)報(bào),并添加 UDP 頭部,包含源端口和目標(biāo)端口。接著,數(shù)據(jù)會(huì)被傳遞到網(wǎng)絡(luò)層,添加 IP 頭部,包含源 IP 地址和目標(biāo) IP 地址,形成 IP 數(shù)據(jù)包。最后,IP 數(shù)據(jù)包通過(guò)網(wǎng)絡(luò)接口層發(fā)送到物理網(wǎng)絡(luò)上。
- 接收端:數(shù)據(jù)從物理網(wǎng)絡(luò)進(jìn)入接收端的網(wǎng)絡(luò)接口層 。網(wǎng)絡(luò)接口層接收到 IP 數(shù)據(jù)包后,會(huì)進(jìn)行解包,將 IP 頭部去除,然后將數(shù)據(jù)傳遞到網(wǎng)絡(luò)層。網(wǎng)絡(luò)層根據(jù) IP 頭部中的目標(biāo) IP 地址,判斷該數(shù)據(jù)包是否是發(fā)給本機(jī)的。如果是,則去除 IP 頭部,將數(shù)據(jù)傳遞到傳輸層。傳輸層根據(jù)協(xié)議類(lèi)型(TCP 或 UDP),對(duì)數(shù)據(jù)進(jìn)行相應(yīng)的處理。對(duì)于 TCP 數(shù)據(jù),會(huì)檢查序列號(hào)和確認(rèn)號(hào),進(jìn)行流量控制和錯(cuò)誤重傳等操作;對(duì)于 UDP 數(shù)據(jù),直接去除 UDP 頭部,將數(shù)據(jù)傳遞到 Socket 的接收緩沖區(qū)。最后,應(yīng)用程序調(diào)用read或recv函數(shù)從 Socket 的接收緩沖區(qū)中讀取數(shù)據(jù)到應(yīng)用程序緩沖區(qū)中,完成數(shù)據(jù)的接收。
當(dāng)數(shù)據(jù)在網(wǎng)絡(luò)中傳輸時(shí),它會(huì)經(jīng)歷一系列復(fù)雜的過(guò)程。應(yīng)用層的數(shù)據(jù)首先會(huì)被封裝成數(shù)據(jù)包,然后依次經(jīng)過(guò)傳輸層、網(wǎng)絡(luò)層和數(shù)據(jù)鏈路層,每一層都會(huì)添加相應(yīng)的頭部信息,以實(shí)現(xiàn)不同的功能 。在傳輸過(guò)程中,可能會(huì)出現(xiàn)粘包和分包的問(wèn)題。粘包現(xiàn)象通常發(fā)生在 TCP 協(xié)議中,由于 TCP 是面向流的協(xié)議,數(shù)據(jù)在發(fā)送時(shí)可能會(huì)被合并成一個(gè)大的數(shù)據(jù)包發(fā)送,或者接收方?jīng)]有及時(shí)讀取數(shù)據(jù),導(dǎo)致多個(gè)數(shù)據(jù)包被一起讀取,就像多個(gè)快遞包裹被捆在一起送到了收件人手中,收件人難以區(qū)分每個(gè)包裹的內(nèi)容。分包則是指一個(gè)大的數(shù)據(jù)包由于網(wǎng)絡(luò)傳輸?shù)南拗疲ㄈ?MTU,最大傳輸單元),被分割成多個(gè)小的數(shù)據(jù)包進(jìn)行傳輸,到了接收方再進(jìn)行重組,這就好比一個(gè)大的快遞被拆分成多個(gè)小包裹分別發(fā)送,收件人需要將這些小包裹重新組裝成完整的物品。
為了解決粘包和分包問(wèn)題,常見(jiàn)的方法有定包長(zhǎng)、包尾加分隔符號(hào)、包頭加上包體長(zhǎng)度等。以包頭加上包體長(zhǎng)度的方法為例,在發(fā)送數(shù)據(jù)時(shí),先在數(shù)據(jù)包的頭部添加一個(gè)固定長(zhǎng)度的字段,用于表示包體的長(zhǎng)度,接收方在接收到數(shù)據(jù)后,首先讀取包頭中的長(zhǎng)度字段,然后根據(jù)這個(gè)長(zhǎng)度來(lái)準(zhǔn)確地讀取包體的數(shù)據(jù),這樣就能確保每個(gè)數(shù)據(jù)包都能被正確地解析,避免了粘包和分包帶來(lái)的混亂。
在高并發(fā)服務(wù)器開(kāi)發(fā)中,深入理解網(wǎng)絡(luò)協(xié)議和數(shù)據(jù)傳輸過(guò)程,以及掌握解決粘包、分包等問(wèn)題的方法,是構(gòu)建高性能、穩(wěn)定網(wǎng)絡(luò)應(yīng)用的關(guān)鍵所在。
Part3.C++多線(xiàn)程網(wǎng)絡(luò)編程實(shí)踐
3.1多線(xiàn)程服務(wù)器架構(gòu)設(shè)計(jì)
在構(gòu)建高并發(fā)服務(wù)器時(shí),精心設(shè)計(jì)多線(xiàn)程服務(wù)器架構(gòu)至關(guān)重要,它就如同建造摩天大樓的藍(lán)圖,直接決定了服務(wù)器的性能、可擴(kuò)展性和穩(wěn)定性 。
一種常見(jiàn)且高效的多線(xiàn)程服務(wù)器架構(gòu)模式是主從線(xiàn)程模型。在這個(gè)模型中,主線(xiàn)程猶如一位經(jīng)驗(yàn)豐富的指揮官,承擔(dān)著監(jiān)聽(tīng)端口和接受新連接的重要職責(zé)。一旦捕捉到新的連接請(qǐng)求,它便迅速將這個(gè)連接分配給工作線(xiàn)程池中的某個(gè)工作線(xiàn)程。工作線(xiàn)程則像是一群訓(xùn)練有素的士兵,專(zhuān)注于與客戶(hù)端進(jìn)行數(shù)據(jù)的收發(fā)和處理工作。這種分工明確的模式,使得主線(xiàn)程能夠心無(wú)旁騖地專(zhuān)注于新連接的接納,而工作線(xiàn)程可以全身心地投入到數(shù)據(jù)處理中,極大地提高了服務(wù)器的并發(fā)處理能力 。
線(xiàn)程池作為多線(xiàn)程服務(wù)器架構(gòu)中的關(guān)鍵組件,發(fā)揮著不可或缺的作用。線(xiàn)程池就像是一個(gè)訓(xùn)練有素的團(tuán)隊(duì),預(yù)先創(chuàng)建一定數(shù)量的線(xiàn)程,這些線(xiàn)程在空閑時(shí)處于待命狀態(tài),一旦有任務(wù)下達(dá),它們便能迅速響應(yīng),投入工作。通過(guò)線(xiàn)程池,我們可以有效避免頻繁創(chuàng)建和銷(xiāo)毀線(xiàn)程所帶來(lái)的巨大開(kāi)銷(xiāo)。線(xiàn)程的創(chuàng)建和銷(xiāo)毀就好比是反復(fù)招募和解雇員工,不僅耗時(shí)費(fèi)力,還會(huì)造成資源的浪費(fèi)。而線(xiàn)程池則像是一個(gè)穩(wěn)定的團(tuán)隊(duì),員工們可以持續(xù)工作,大大提高了效率。線(xiàn)程池還能夠?qū)€(xiàn)程數(shù)量進(jìn)行精細(xì)控制,防止因線(xiàn)程過(guò)多而導(dǎo)致系統(tǒng)資源被過(guò)度消耗,從而確保服務(wù)器在高并發(fā)環(huán)境下依然能夠穩(wěn)定運(yùn)行 。
為了更直觀地理解多線(xiàn)程服務(wù)器架構(gòu),我們來(lái)看一個(gè)簡(jiǎn)化的示例代碼,展示如何使用 C++ 和線(xiàn)程池來(lái)構(gòu)建一個(gè)簡(jiǎn)單的多線(xiàn)程服務(wù)器:
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
// 線(xiàn)程池類(lèi)
class ThreadPool {
public:
ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; ++i) {
threads.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queueMutex);
this->condition.wait(lock, [this] { return this->stop ||!this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& thread : threads) {
thread.join();
}
}
template<class F, class... Args>
void enqueue(F&& f, Args&&... args) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
}
condition.notify_one();
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop = false;
};
// 處理客戶(hù)端連接的函數(shù)
void handleClient(int clientSocket) {
char buffer[1024] = {0};
int valread = read(clientSocket, buffer, 1024);
std::cout << "Received: " << buffer << std::endl;
const char *response = "Hello from server";
send(clientSocket, response, strlen(response), 0);
std::cout << "Response sent" << std::endl;
close(clientSocket);
}
int main() {
int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(8080);
if (bind(serverSocket, (struct sockaddr *) &address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
if (listen(serverSocket, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
ThreadPool pool(4); // 創(chuàng)建包含4個(gè)線(xiàn)程的線(xiàn)程池
while (true) {
struct sockaddr_in clientAddress;
socklen_t clientAddressLen = sizeof(clientAddress);
int clientSocket = accept(serverSocket, (struct sockaddr *) &clientAddress, &clientAddressLen);
if (clientSocket < 0) {
perror("accept");
continue;
}
// 將處理客戶(hù)端連接的任務(wù)加入線(xiàn)程池
pool.enqueue([clientSocket] {
handleClient(clientSocket);
});
}
close(serverSocket);
return 0;
}在這段代碼中,ThreadPool類(lèi)實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的線(xiàn)程池,通過(guò)構(gòu)造函數(shù)創(chuàng)建指定數(shù)量的線(xiàn)程,并在析構(gòu)函數(shù)中正確地停止和回收這些線(xiàn)程。enqueue方法用于將任務(wù)添加到任務(wù)隊(duì)列中,并通知一個(gè)等待的線(xiàn)程來(lái)執(zhí)行任務(wù)。handleClient函數(shù)負(fù)責(zé)處理客戶(hù)端的連接,接收客戶(hù)端發(fā)送的數(shù)據(jù)并返回響應(yīng)。在main函數(shù)中,創(chuàng)建了一個(gè)服務(wù)器套接字,監(jiān)聽(tīng)指定端口,并將接收到的客戶(hù)端連接任務(wù)提交給線(xiàn)程池處理 。
通過(guò)這樣的架構(gòu)設(shè)計(jì)和線(xiàn)程池的運(yùn)用,我們能夠構(gòu)建出一個(gè)高效、穩(wěn)定的多線(xiàn)程服務(wù)器,為高并發(fā)場(chǎng)景下的網(wǎng)絡(luò)通信提供堅(jiān)實(shí)的基礎(chǔ) 。
3.2線(xiàn)程同步與互斥
在多線(xiàn)程編程的復(fù)雜世界里,線(xiàn)程同步與互斥是確保程序正確運(yùn)行的關(guān)鍵環(huán)節(jié),就如同交通規(guī)則對(duì)于城市交通的重要性一樣 。當(dāng)多個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn)和修改共享資源時(shí),如果沒(méi)有有效的同步機(jī)制,就可能引發(fā)一系列嚴(yán)重的問(wèn)題,如數(shù)據(jù)競(jìng)爭(zhēng)和數(shù)據(jù)不一致。數(shù)據(jù)競(jìng)爭(zhēng)就像是多個(gè)司機(jī)在沒(méi)有交通規(guī)則的路口爭(zhēng)搶通行,導(dǎo)致交通堵塞和混亂;而數(shù)據(jù)不一致則好比是不同的人對(duì)同一份文件進(jìn)行修改,卻沒(méi)有協(xié)調(diào)好,最終使得文件內(nèi)容混亂不堪 。
為了有效解決這些問(wèn)題,C++ 提供了一系列強(qiáng)大的同步工具,其中互斥鎖(std::mutex)和條件變量(std::condition_variable)是最為常用的。
互斥鎖(std::mutex)是一種簡(jiǎn)單而有效的同步機(jī)制,它就像是一扇門(mén),一次只允許一個(gè)線(xiàn)程進(jìn)入臨界區(qū),訪(fǎng)問(wèn)共享資源。當(dāng)一個(gè)線(xiàn)程獲取到互斥鎖時(shí),就相當(dāng)于拿到了這扇門(mén)的鑰匙,其他線(xiàn)程必須等待,直到該線(xiàn)程釋放互斥鎖,將鑰匙歸還,其他線(xiàn)程才有機(jī)會(huì)進(jìn)入。例如,在一個(gè)多線(xiàn)程的銀行賬戶(hù)管理系統(tǒng)中,多個(gè)線(xiàn)程可能同時(shí)嘗試對(duì)賬戶(hù)余額進(jìn)行修改,如果沒(méi)有互斥鎖的保護(hù),就可能出現(xiàn)數(shù)據(jù)不一致的情況,導(dǎo)致賬戶(hù)余額錯(cuò)誤。使用互斥鎖可以確保在任何時(shí)刻,只有一個(gè)線(xiàn)程能夠?qū)~戶(hù)余額進(jìn)行操作,從而保證數(shù)據(jù)的一致性 。
下面是一個(gè)使用互斥鎖的簡(jiǎn)單示例代碼:
#include <iostream>
#include <mutex>
#include <thread>
std::mutex mtx; // 創(chuàng)建一個(gè)互斥鎖
int sharedResource = 0; // 共享資源
void increment() {
for (int i = 0; i < 1000; ++i) {
mtx.lock(); // 加鎖,進(jìn)入臨界區(qū)
++sharedResource; // 訪(fǎng)問(wèn)和修改共享資源
mtx.unlock(); // 解鎖,離開(kāi)臨界區(qū)
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final value of sharedResource: " << sharedResource << std::endl;
return 0;
}在這個(gè)示例中,std::mutex對(duì)象mtx用于保護(hù)sharedResource的訪(fǎng)問(wèn)。在increment函數(shù)中,通過(guò)調(diào)用mtx.lock()來(lái)獲取鎖,進(jìn)入臨界區(qū),對(duì)sharedResource進(jìn)行操作,操作完成后,調(diào)用mtx.unlock()釋放鎖,離開(kāi)臨界區(qū)。這樣,就保證了在同一時(shí)間只有一個(gè)線(xiàn)程能夠修改sharedResource,避免了數(shù)據(jù)競(jìng)爭(zhēng) 。
條件變量(std::condition_variable)則是一種更高級(jí)的同步工具,它允許線(xiàn)程在特定條件滿(mǎn)足時(shí)被喚醒,從而實(shí)現(xiàn)線(xiàn)程之間的協(xié)作。條件變量通常與互斥鎖配合使用,就像是一個(gè)信號(hào)燈,當(dāng)條件不滿(mǎn)足時(shí),線(xiàn)程可以在條件變量上等待,釋放互斥鎖,進(jìn)入睡眠狀態(tài);當(dāng)條件滿(mǎn)足時(shí),其他線(xiàn)程可以通知條件變量,喚醒等待的線(xiàn)程,使其重新獲取互斥鎖,繼續(xù)執(zhí)行。例如,在一個(gè)生產(chǎn)者 - 消費(fèi)者模型中,生產(chǎn)者線(xiàn)程生產(chǎn)數(shù)據(jù)并放入緩沖區(qū),消費(fèi)者線(xiàn)程從緩沖區(qū)中取出數(shù)據(jù)進(jìn)行處理。當(dāng)緩沖區(qū)為空時(shí),消費(fèi)者線(xiàn)程需要等待生產(chǎn)者線(xiàn)程生產(chǎn)數(shù)據(jù);當(dāng)緩沖區(qū)滿(mǎn)時(shí),生產(chǎn)者線(xiàn)程需要等待消費(fèi)者線(xiàn)程取出數(shù)據(jù)。通過(guò)條件變量,我們可以實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者線(xiàn)程之間的高效協(xié)作 。
下面是一個(gè)使用條件變量的示例代碼:
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> dataQueue;
// 生產(chǎn)者線(xiàn)程函數(shù)
void producer() {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
dataQueue.push(i);
std::cout << "Produced: " << i << std::endl;
lock.unlock();
cv.notify_one(); // 通知一個(gè)等待的消費(fèi)者線(xiàn)程
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// 消費(fèi)者線(xiàn)程函數(shù)
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return!dataQueue.empty(); }); // 等待隊(duì)列中有數(shù)據(jù)
int data = dataQueue.front();
dataQueue.pop();
std::cout << "Consumed: " << data << std::endl;
lock.unlock();
if (data == 9) break; // 當(dāng)消費(fèi)完最后一個(gè)數(shù)據(jù)時(shí)退出
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}在這個(gè)示例中,std::condition_variable對(duì)象cv用于協(xié)調(diào)生產(chǎn)者和消費(fèi)者線(xiàn)程。生產(chǎn)者線(xiàn)程在生產(chǎn)數(shù)據(jù)后,通過(guò)cv.notify_one()通知等待的消費(fèi)者線(xiàn)程;消費(fèi)者線(xiàn)程在cv.wait(lock, [] { return!dataQueue.empty(); });處等待,直到隊(duì)列中有數(shù)據(jù)時(shí)被喚醒,然后從隊(duì)列中取出數(shù)據(jù)進(jìn)行處理 。
通過(guò)合理使用互斥鎖和條件變量,我們能夠有效地實(shí)現(xiàn)線(xiàn)程同步與互斥,確保多線(xiàn)程程序的正確性和穩(wěn)定性 。
3.3并發(fā)控制策略
在多線(xiàn)程編程的廣袤領(lǐng)域中,并發(fā)控制策略猶如精密儀器中的調(diào)節(jié)裝置,對(duì)于確保程序在高并發(fā)環(huán)境下的高效、穩(wěn)定運(yùn)行起著至關(guān)重要的作用。除了前面介紹的互斥鎖和條件變量等基本同步機(jī)制外,無(wú)鎖數(shù)據(jù)結(jié)構(gòu)和讀寫(xiě)鎖等高級(jí)并發(fā)控制策略,為我們應(yīng)對(duì)復(fù)雜的多線(xiàn)程場(chǎng)景提供了更為強(qiáng)大的工具 。
無(wú)鎖數(shù)據(jù)結(jié)構(gòu),作為一種獨(dú)特的并發(fā)控制方案,通過(guò)采用先進(jìn)的算法和技術(shù),巧妙地避免了傳統(tǒng)鎖機(jī)制帶來(lái)的性能瓶頸和死鎖風(fēng)險(xiǎn)。它就像是一個(gè)高效的自動(dòng)化工廠,各個(gè)生產(chǎn)線(xiàn)(線(xiàn)程)可以在無(wú)需等待鎖的情況下,同時(shí)對(duì)共享資源進(jìn)行操作,極大地提高了系統(tǒng)的并發(fā)性能。無(wú)鎖數(shù)據(jù)結(jié)構(gòu)通常依賴(lài)于原子操作,如比較并交換(CAS,Compare-And-Swap)操作,來(lái)實(shí)現(xiàn)數(shù)據(jù)的安全訪(fǎng)問(wèn)和修改。CAS 操作就像是一位公正的裁判,它會(huì)在修改數(shù)據(jù)之前,先仔細(xì)檢查數(shù)據(jù)的當(dāng)前值是否與預(yù)期值一致,如果一致,才會(huì)進(jìn)行修改,否則就放棄操作。這種方式使得多個(gè)線(xiàn)程能夠在不使用鎖的情況下,安全地并發(fā)訪(fǎng)問(wèn)共享數(shù)據(jù) 。
以無(wú)鎖鏈表為例,它在實(shí)現(xiàn)上摒棄了傳統(tǒng)鏈表中對(duì)鎖的依賴(lài),通過(guò)精心設(shè)計(jì)的節(jié)點(diǎn)結(jié)構(gòu)和原子操作,實(shí)現(xiàn)了高效的并發(fā)插入和刪除操作。在無(wú)鎖鏈表中,每個(gè)節(jié)點(diǎn)都包含一個(gè)指向下一個(gè)節(jié)點(diǎn)的指針,并且這些指針的更新操作都是通過(guò)原子操作來(lái)完成的。當(dāng)一個(gè)線(xiàn)程想要插入一個(gè)新節(jié)點(diǎn)時(shí),它會(huì)首先找到合適的位置,然后使用 CAS 操作將新節(jié)點(diǎn)插入到鏈表中。如果在插入過(guò)程中,其他線(xiàn)程同時(shí)對(duì)鏈表進(jìn)行了修改,導(dǎo)致當(dāng)前線(xiàn)程的預(yù)期值與實(shí)際值不一致,那么當(dāng)前線(xiàn)程會(huì)重新嘗試插入操作,直到成功為止 。
下面是一個(gè)簡(jiǎn)化的無(wú)鎖鏈表實(shí)現(xiàn)示例(僅展示關(guān)鍵部分):
#include <atomic>
template <typename T>
struct Node {
T data;
std::atomic<Node<T>*> next;
Node(const T& value) : data(value), next(nullptr) {}
};
template <typename T>
class LockFreeList {
public:
LockFreeList() : head(nullptr) {}
bool insert(const T& value) {
Node<T> *newNode = new Node<T>(value);
Node<T> *prev = nullptr;
Node<T> *curr = head.load();
while (curr != nullptr && curr->data < value) {
prev = curr;
curr = curr->next.load();
}
if (curr != nullptr && curr->data == value) {
delete newNode;
return false; // 數(shù)據(jù)已存在,插入失敗
}
newNode->next.store(curr);
if (prev == nullptr) {
while (!head.compare_exchange_weak(curr, newNode)) {
if (curr != nullptr && curr->data < value) {
prev = curr;
curr = curr->next.load();
} else {
break;
}
}
} else {
while (!prev->next.compare_exchange_weak(curr, newNode)) {
if (curr != nullptr && curr->data < value) {
prev = curr;
curr = curr->next.load();
} else {
break;
}
}
}
return true;
}
private:
std::atomic<Node<T>*> head;
};在這個(gè)示例中,LockFreeList類(lèi)實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的無(wú)鎖鏈表。insert方法使用 CAS 操作來(lái)插入新節(jié)點(diǎn),確保在多線(xiàn)程環(huán)境下的正確性和高效性 。
讀寫(xiě)鎖(std::shared_mutex)則是另一種重要的并發(fā)控制策略,它專(zhuān)門(mén)針對(duì)讀多寫(xiě)少的場(chǎng)景進(jìn)行了優(yōu)化。讀寫(xiě)鎖就像是一個(gè)智能的門(mén)禁系統(tǒng),當(dāng)多個(gè)線(xiàn)程同時(shí)進(jìn)行讀操作時(shí),它允許這些線(xiàn)程同時(shí)進(jìn)入,共享資源;但當(dāng)有線(xiàn)程進(jìn)行寫(xiě)操作時(shí),它會(huì)立即禁止其他線(xiàn)程的讀寫(xiě)操作,以保證數(shù)據(jù)的一致性。這種機(jī)制大大提高了讀操作的并發(fā)性能,減少了線(xiàn)程之間的競(jìng)爭(zhēng) 。
在 C++ 中,std::shared_mutex提供了讀寫(xiě)鎖的功能。線(xiàn)程可以通過(guò)調(diào)用lock_shared方法來(lái)獲取共享鎖(讀鎖),允許多個(gè)線(xiàn)程同時(shí)持有共享鎖進(jìn)行讀操作;通過(guò)調(diào)用lock方法來(lái)獲取獨(dú)占鎖(寫(xiě)鎖),此時(shí)其他線(xiàn)程無(wú)法獲取任何鎖,直到獨(dú)占鎖被釋放 。
以下是一個(gè)使用讀寫(xiě)鎖的示例代碼:
#include <iostream>
#include <shared_mutex>
#include <thread>
std::shared_mutex rwMutex;
int sharedData = 0;
// 讀操作
void readData() {
rwMutex.lock_shared();
std::cout << "Read data: " << sharedData << std::endl;
rwMutex.unlock_shared();
}
// 寫(xiě)操作
void writeData(int value) {
rwMutex.lock();
sharedData = value;
std::cout << "Write data: " << value << std::endl;
rwMutex.unlock();
}
int main() {
std::thread t1(readData);
std::thread t2(readData);
std::thread t3(writeData, 42);
std::thread t4(readData);
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}在這個(gè)示例中,std::shared_mutex對(duì)象rwMutex用于保護(hù)sharedData的讀寫(xiě)操作。讀操作通過(guò)lock_shared和unlock_shared來(lái)獲取和釋放共享鎖,允許多個(gè)讀線(xiàn)程同時(shí)訪(fǎng)問(wèn);寫(xiě)操作通過(guò)lock和unlock來(lái)獲取和釋放獨(dú)占鎖,確保在寫(xiě)操作時(shí)沒(méi)有其他線(xiàn)程可以訪(fǎng)問(wèn)sharedData 。
通過(guò)靈活運(yùn)用無(wú)鎖數(shù)據(jù)結(jié)構(gòu)和讀寫(xiě)鎖等并發(fā)控制策略,我們能夠根據(jù)不同的應(yīng)用場(chǎng)景和需求,選擇最合適的方案,提升多線(xiàn)程程序的性能和并發(fā)處理能力 。
3.4 I/O 多路復(fù)用技術(shù)
在編程世界里,I/O 操作(如文件讀寫(xiě)、網(wǎng)絡(luò)通信等)是非常常見(jiàn)的任務(wù)。傳統(tǒng)的 I/O 模型中,一個(gè)線(xiàn)程通常只能處理一個(gè) I/O 操作,如果要處理多個(gè) I/O 操作,就需要?jiǎng)?chuàng)建多個(gè)線(xiàn)程或者進(jìn)程,這會(huì)帶來(lái)資源浪費(fèi)和復(fù)雜度增加的問(wèn)題。
IO 多路復(fù)用(I/O Multiplexing)技術(shù)的出現(xiàn),很好地解決了這個(gè)問(wèn)題。它允許一個(gè)進(jìn)程同時(shí)監(jiān)聽(tīng)多個(gè)文件描述符(File Descriptor,簡(jiǎn)稱(chēng) fd,在 Linux 系統(tǒng)中,一切皆文件,文件描述符是內(nèi)核為了高效管理已被打開(kāi)的文件所創(chuàng)建的索引)的 I/O 事件,當(dāng)某個(gè)文件描述符就緒(有數(shù)據(jù)可讀、可寫(xiě)或有異常發(fā)生)時(shí),進(jìn)程能夠及時(shí)得到通知并進(jìn)行相應(yīng)的處理 。這就好比一個(gè)餐廳服務(wù)員,他可以同時(shí)照顧多桌客人,當(dāng)某一桌客人有需求(比如需要加水、上菜等)時(shí),服務(wù)員能夠及時(shí)響應(yīng),而不是一個(gè)服務(wù)員只服務(wù)一桌客人,造成資源浪費(fèi)。
在 Linux 系統(tǒng)中,常見(jiàn)的 IO 多路復(fù)用方式有 select、poll 和 epoll,它們各自有著不同的特點(diǎn)和適用場(chǎng)景。
①select:select 是最早出現(xiàn)的 IO 多路復(fù)用方式,它通過(guò)一個(gè)select()系統(tǒng)調(diào)用來(lái)監(jiān)視多個(gè)文件描述符的數(shù)組。select()函數(shù)的原型如下:
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);參數(shù)說(shuō)明
- nfds:需要監(jiān)聽(tīng)的文件描述符的最大值加 1。
- readfds:需要監(jiān)聽(tīng)讀事件的文件描述符集合。
- writefds:需要監(jiān)聽(tīng)寫(xiě)事件的文件描述符集合。
- exceptfds:需要監(jiān)聽(tīng)異常事件的文件描述符集合。
- timeout:設(shè)置select函數(shù)的超時(shí)時(shí)間,如果為NULL,則表示一直阻塞等待。
返回值說(shuō)明
- 成功時(shí)返回就緒文件描述符個(gè)數(shù)。
- 超時(shí)時(shí)返回 0。
- 出錯(cuò)時(shí)返回負(fù)值。
使用select時(shí),需要先初始化文件描述符集合,將需要監(jiān)聽(tīng)的文件描述符添加到對(duì)應(yīng)的集合中,然后調(diào)用select函數(shù)。當(dāng)select返回后,通過(guò)檢查返回值和文件描述符集合,判斷哪些文件描述符就緒,進(jìn)而進(jìn)行相應(yīng)的讀寫(xiě)操作。例如:
#include <iostream>
#include <sys/select.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#define PORT 8080
#define MAX_CLIENTS 10
int main() {
int server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket == -1) {
perror("socket creation failed");
return 1;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("bind failed");
close(server_socket);
return 1;
}
if (listen(server_socket, 3) == -1) {
perror("listen failed");
close(server_socket);
return 1;
}
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(server_socket, &read_fds);
int max_fd = server_socket;
while (true) {
fd_set temp_fds = read_fds;
int activity = select(max_fd + 1, &temp_fds, NULL, NULL, NULL);
if (activity == -1) {
perror("select error");
break;
} else if (activity > 0) {
if (FD_ISSET(server_socket, &temp_fds)) {
int client_socket = accept(server_socket, NULL, NULL);
if (client_socket != -1) {
FD_SET(client_socket, &read_fds);
if (client_socket > max_fd) {
max_fd = client_socket;
}
}
}
for (int i = 0; i <= max_fd; ++i) {
if (FD_ISSET(i, &temp_fds) && i != server_socket) {
char buffer[1024] = {0};
int valread = read(i, buffer, sizeof(buffer));
if (valread == -1) {
perror("read failed");
close(i);
FD_CLR(i, &read_fds);
} else if (valread == 0) {
close(i);
FD_CLR(i, &read_fds);
} else {
std::cout << "Received: " << buffer << std::endl;
}
}
}
}
}
close(server_socket);
return 0;
}這段代碼創(chuàng)建了一個(gè)簡(jiǎn)單的 TCP 服務(wù)器,使用select監(jiān)聽(tīng)新的客戶(hù)端連接和客戶(hù)端發(fā)送的數(shù)據(jù)。
select 的優(yōu)點(diǎn)是幾乎在所有平臺(tái)上都支持,具有良好的跨平臺(tái)性;缺點(diǎn)是單個(gè)進(jìn)程能夠監(jiān)視的文件描述符數(shù)量有限,在 Linux 上一般為 1024,并且每次調(diào)用select都需要將文件描述符集合從用戶(hù)態(tài)拷貝到內(nèi)核態(tài),隨著文件描述符數(shù)量的增大,其復(fù)制和遍歷的開(kāi)銷(xiāo)也會(huì)線(xiàn)性增長(zhǎng)。
②poll:poll 出現(xiàn)的時(shí)間比 select 稍晚,它和 select 在本質(zhì)上沒(méi)有太大差別,也是通過(guò)輪詢(xún)的方式來(lái)檢查文件描述符是否就緒。poll函數(shù)的原型如下:
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);參數(shù)說(shuō)明—fds:一個(gè)指向struct pollfd結(jié)構(gòu)體數(shù)組的指針,struct pollfd結(jié)構(gòu)體定義如下:
struct pollfd {
int fd; // 文件描述符
short events; // 等待的事件
short revents; // 實(shí)際發(fā)生的事件
};- nfds:指定fds數(shù)組中結(jié)構(gòu)體的個(gè)數(shù)。
- timeout:設(shè)置超時(shí)時(shí)間,單位是毫秒。
返回值說(shuō)明:
- 成功時(shí)返回就緒文件描述符個(gè)數(shù)。
- 超時(shí)時(shí)返回 0。
- 出錯(cuò)時(shí)返回負(fù)值。
與 select 相比,poll 沒(méi)有最大文件描述符數(shù)量的限制,并且它將輸入輸出參數(shù)進(jìn)行了分離,不需要每次都重新設(shè)定。但是,poll 同樣存在包含大量文件描述符的數(shù)組被整體復(fù)制于用戶(hù)態(tài)和內(nèi)核的地址空間之間的問(wèn)題,其開(kāi)銷(xiāo)隨著文件描述符數(shù)量的增加而線(xiàn)性增大。
③epoll:epoll 是在 Linux 2.6 內(nèi)核中引入的,它被公認(rèn)為是 Linux 下性能最好的多路 I/O 就緒通知方法。
epoll 有三個(gè)主要函數(shù):
⑴epoll_create:用于創(chuàng)建一個(gè) epoll 實(shí)例,返回一個(gè) epoll 專(zhuān)用的文件描述符。
#include <sys/epoll.h>
int epoll_create(int size);這里的size參數(shù)在 Linux 2.6.8 版本之后被忽略,但仍需傳入一個(gè)大于 0 的值。
⑵epoll_ctl:用于控制某個(gè) epoll 實(shí)例監(jiān)聽(tīng)的文件描述符,比如添加、刪除或修改監(jiān)聽(tīng)事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);參數(shù)說(shuō)明:
- epfd:epoll 實(shí)例的文件描述符。
- op:操作類(lèi)型,有EPOLL_CTL_ADD(添加)、EPOLL_CTL_MOD(修改)、EPOLL_CTL_DEL(刪除)。
- fd:要操作的文件描述符。
event:指向struct epoll_event結(jié)構(gòu)體的指針,用于設(shè)置監(jiān)聽(tīng)的事件和關(guān)聯(lián)的數(shù)據(jù),struct epoll_event結(jié)構(gòu)體定義如下:
struct epoll_event {
uint32_t events; // Epoll事件
epoll_data_t data; // 用戶(hù)數(shù)據(jù)
};其中,events可以是EPOLLIN(可讀事件)、EPOLLOUT(可寫(xiě)事件)等事件的組合;data可以是一個(gè)void*指針,用于關(guān)聯(lián)用戶(hù)自定義的數(shù)據(jù)。
⑶epoll_wait:用于等待 epoll 實(shí)例上的事件發(fā)生,返回就緒的事件列表。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);參數(shù)說(shuō)明
- epfd:epoll 實(shí)例的文件描述符。
- events:用于存儲(chǔ)就緒事件的數(shù)組。
- maxevents:指定events數(shù)組的大小。
- timeout:設(shè)置超時(shí)時(shí)間,單位是毫秒,若為 - 1 則表示一直阻塞。
返回值說(shuō)明
- 成功時(shí)返回就緒事件的個(gè)數(shù)。
- 超時(shí)時(shí)返回 0。
- 出錯(cuò)時(shí)返回負(fù)值。
epoll 使用一個(gè)文件描述符管理多個(gè)描述符,將用戶(hù)關(guān)心的文件描述符的事件存放到內(nèi)核的一個(gè)事件表中,這樣在用戶(hù)空間和內(nèi)核空間的拷貝只需一次。而且,epoll 采用基于事件的就緒通知方式,當(dāng)某個(gè)文件描述符就緒時(shí),內(nèi)核會(huì)采用類(lèi)似 callback 的回調(diào)機(jī)制,迅速激活這個(gè)文件描述符,當(dāng)進(jìn)程調(diào)用epoll_wait時(shí)便得到通知,大大提高了效率。
綜上所述,select、poll 和 epoll 各有優(yōu)劣,在實(shí)際應(yīng)用中,我們需要根據(jù)具體的需求和場(chǎng)景來(lái)選擇合適的 IO 多路復(fù)用方式。如果需要跨平臺(tái)支持,且文件描述符數(shù)量較少,select 是一個(gè)不錯(cuò)的選擇;如果需要處理大量的文件描述符,且對(duì)性能要求較高,epoll 則是更好的選擇;而 poll 則處于兩者之間,在一些特定場(chǎng)景下也有其用武之地。
Part4.實(shí)戰(zhàn)案例:高并發(fā)聊天服務(wù)器的實(shí)現(xiàn)
4.1功能需求分析
在當(dāng)今數(shù)字化社交高度發(fā)達(dá)的時(shí)代,即時(shí)通訊軟件如微信、QQ 等已成為人們?nèi)粘I钪胁豢苫蛉钡臏贤üぞ?。為了打造一款能夠滿(mǎn)足現(xiàn)代用戶(hù)需求的高并發(fā)聊天服務(wù)器,我們需要全面而細(xì)致地剖析其功能需求。
用戶(hù)注冊(cè)登錄功能是聊天服務(wù)器的基礎(chǔ)門(mén)檻,它就像是進(jìn)入社交大廈的鑰匙,只有通過(guò)注冊(cè)獲得專(zhuān)屬賬號(hào),并成功登錄,用戶(hù)才能開(kāi)啟與他人交流的大門(mén)。在實(shí)現(xiàn)這一功能時(shí),需要確保用戶(hù)賬號(hào)的唯一性,如同每個(gè)人的身份證號(hào)碼獨(dú)一無(wú)二一樣,避免賬號(hào)沖突。同時(shí),要采用安全可靠的密碼加密存儲(chǔ)方式,比如使用強(qiáng)大的哈希算法,如 SHA - 256,將用戶(hù)密碼進(jìn)行加密處理后存儲(chǔ)在數(shù)據(jù)庫(kù)中,防止密碼明文泄露,保障用戶(hù)賬號(hào)的安全 。
群組聊天功能是社交互動(dòng)的核心舞臺(tái),它讓用戶(hù)能夠像參加熱鬧的派對(duì)一樣,與志同道合的人在同一個(gè)群組中暢所欲言。服務(wù)器需要高效地管理群組信息,包括群組的創(chuàng)建、成員的加入和退出等操作。當(dāng)有新消息在群組中發(fā)布時(shí),服務(wù)器要能夠迅速將消息準(zhǔn)確無(wú)誤地廣播給群內(nèi)的每一個(gè)成員,確保信息的實(shí)時(shí)傳遞 。
私聊功能則為用戶(hù)提供了一個(gè)私密的交流空間,就像是在熱鬧的派對(duì)中找一個(gè)安靜的角落,與特定的人進(jìn)行一對(duì)一的深入交談。服務(wù)器需要精準(zhǔn)地處理私聊消息的路由,確保消息能夠準(zhǔn)確無(wú)誤地送達(dá)目標(biāo)用戶(hù),避免消息誤發(fā)或丟失 。
消息推送功能是保持用戶(hù)與服務(wù)器實(shí)時(shí)連接的紐帶,它能讓用戶(hù)在第一時(shí)間收到新消息的通知,就像快遞員及時(shí)將包裹送到收件人手中一樣。服務(wù)器需要支持實(shí)時(shí)推送,采用如 WebSocket 等高效的通信協(xié)議,實(shí)現(xiàn)消息的即時(shí)傳輸,讓用戶(hù)感受到流暢的聊天體驗(yàn) 。
4.2代碼實(shí)現(xiàn)與解析
接下來(lái),讓我們深入探討高并發(fā)聊天服務(wù)器的核心代碼實(shí)現(xiàn),通過(guò)實(shí)際代碼來(lái)感受多線(xiàn)程網(wǎng)絡(luò)編程在構(gòu)建聊天服務(wù)器中的強(qiáng)大魅力 。
服務(wù)器端代碼實(shí)現(xiàn):
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <unordered_map>
#include <string>
// 線(xiàn)程池類(lèi)
class ThreadPool {
public:
ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; ++i) {
threads.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queueMutex);
this->condition.wait(lock, [this] { return this->stop ||!this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& thread : threads) {
thread.join();
}
}
template<class F, class... Args>
void enqueue(F&& f, Args&&... args) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
}
condition.notify_one();
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop = false;
};
// 用戶(hù)信息結(jié)構(gòu)體
struct User {
std::string username;
int socketFd;
};
std::unordered_map<int, User> users; // 存儲(chǔ)在線(xiàn)用戶(hù)
std::mutex usersMutex; // 保護(hù)用戶(hù)信息的互斥鎖
// 處理客戶(hù)端連接的函數(shù)
void handleClient(int clientSocket) {
char buffer[1024] = {0};
int valread = read(clientSocket, buffer, 1024);
if (valread < 0) {
perror("read failed");
close(clientSocket);
return;
}
std::string message(buffer, valread);
// 解析消息,假設(shè)消息格式為 "command|data",例如 "login|username"
size_t pos = message.find('|');
if (pos == std::string::npos) {
std::cerr << "Invalid message format" << std::endl;
close(clientSocket);
return;
}
std::string command = message.substr(0, pos);
std::string data = message.substr(pos + 1);
if (command == "login") {
{
std::lock_guard<std::mutex> lock(usersMutex);
User user = {data, clientSocket};
users[clientSocket] = user;
std::cout << "User " << data << " logged in" << std::endl;
}
const char* response = "Login successful";
send(clientSocket, response, strlen(response), 0);
} else if (command == "send") {
// 解析發(fā)送消息的目標(biāo)用戶(hù)和內(nèi)容,假設(shè)格式為 "target_user|message_content"
pos = data.find('|');
if (pos == std::string::npos) {
std::cerr << "Invalid send message format" << std::endl;
close(clientSocket);
return;
}
std::string targetUser = data.substr(0, pos);
std::string messageContent = data.substr(pos + 1);
{
std::lock_guard<std::mutex> lock(usersMutex);
for (auto& user : users) {
if (user.second.username == targetUser) {
std::string fullMessage = users[clientSocket].username + " says: " + messageContent;
send(user.second.socketFd, fullMessage.c_str(), fullMessage.size(), 0);
break;
}
}
}
} else {
std::cerr << "Unknown command: " << command << std::endl;
close(clientSocket);
return;
}
while (true) {
valread = read(clientSocket, buffer, 1024);
if (valread < 0) {
perror("read failed");
break;
} else if (valread == 0) {
std::cout << "Client disconnected" << std::endl;
break;
}
message.assign(buffer, valread);
// 處理其他消息,如群組消息等
}
{
std::lock_guard<std::mutex> lock(usersMutex);
users.erase(clientSocket);
}
close(clientSocket);
}
int main() {
int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(8080);
if (bind(serverSocket, (struct sockaddr *) &address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
if (listen(serverSocket, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
ThreadPool pool(4); // 創(chuàng)建包含4個(gè)線(xiàn)程的線(xiàn)程池
while (true) {
struct sockaddr_in clientAddress;
socklen_t clientAddressLen = sizeof(clientAddress);
int clientSocket = accept(serverSocket, (struct sockaddr *) &clientAddress, &clientAddressLen);
if (clientSocket < 0) {
perror("accept");
continue;
}
// 將處理客戶(hù)端連接的任務(wù)加入線(xiàn)程池
pool.enqueue([clientSocket] {
handleClient(clientSocket);
});
}
close(serverSocket);
return 0;
}在這段服務(wù)器端代碼中,ThreadPool類(lèi)實(shí)現(xiàn)了一個(gè)線(xiàn)程池,用于高效地處理多個(gè)客戶(hù)端連接。handleClient函數(shù)負(fù)責(zé)處理單個(gè)客戶(hù)端的連接,解析客戶(hù)端發(fā)送的消息,并根據(jù)不同的命令(如登錄、發(fā)送消息等)進(jìn)行相應(yīng)的處理 。users是一個(gè)std::unordered_map,用于存儲(chǔ)在線(xiàn)用戶(hù)的信息,通過(guò)usersMutex互斥鎖來(lái)保證對(duì)用戶(hù)信息的安全訪(fǎng)問(wèn) 。
客戶(hù)端代碼實(shí)現(xiàn):
#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
void receiveMessages(int socketFd) {
char buffer[1024] = {0};
while (true) {
int valread = read(socketFd, buffer, 1024);
if (valread < 0) {
perror("read failed");
break;
} else if (valread == 0) {
std::cout << "Server disconnected" << std::endl;
break;
}
std::string message(buffer, valread);
std::cout << "Received: " << message << std::endl;
}
}
int main() {
int sock = 0;
struct sockaddr_in serv_addr;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
std::cerr << "Socket creation error" << std::endl;
return -1;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(8080);
if (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
std::cerr << "Invalid address/ Address not supported" << std::endl;
return -1;
}
if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
std::cerr << "Connection Failed" << std::endl;
return -1;
}
std::thread receiveThread(receiveMessages, sock);
char buffer[1024] = {0};
while (true) {
std::cout << "Enter message: ";
std::cin.getline(buffer, 1024);
if (send(sock, buffer, strlen(buffer), 0) < 0) {
std::cerr << "Send failed" << std::endl;
break;
}
if (strcmp(buffer, "exit") == 0) {
break;
}
}
receiveThread.join();
close(sock);
return 0;
}客戶(hù)端代碼中,receiveMessages函數(shù)在一個(gè)單獨(dú)的線(xiàn)程中運(yùn)行,負(fù)責(zé)接收服務(wù)器發(fā)送的消息并輸出到控制臺(tái) 。main函數(shù)中,創(chuàng)建套接字并連接到服務(wù)器,然后啟動(dòng)接收線(xiàn)程,同時(shí)通過(guò)std::cin獲取用戶(hù)輸入的消息,并發(fā)送給服務(wù)器 。
4.3性能優(yōu)化與測(cè)試
為了讓高并發(fā)聊天服務(wù)器在實(shí)際應(yīng)用中能夠穩(wěn)定高效地運(yùn)行,我們需要對(duì)其進(jìn)行一系列的性能優(yōu)化 。
減少鎖的使用是提高性能的關(guān)鍵策略之一。在前面的代碼中,對(duì)users的訪(fǎng)問(wèn)使用了互斥鎖usersMutex,這在高并發(fā)情況下可能會(huì)成為性能瓶頸。可以考慮使用無(wú)鎖數(shù)據(jù)結(jié)構(gòu),如前面介紹的無(wú)鎖鏈表或無(wú)鎖哈希表,來(lái)替換std::unordered_map,以減少鎖競(jìng)爭(zhēng),提高并發(fā)性能 。
優(yōu)化 I/O 操作也至關(guān)重要??梢圆捎?I/O 多路復(fù)用技術(shù),如epoll(在 Linux 系統(tǒng)上),來(lái)替代傳統(tǒng)的阻塞 I/O。epoll能夠在一個(gè)線(xiàn)程中同時(shí)監(jiān)控多個(gè)文件描述符的狀態(tài),當(dāng)有事件發(fā)生時(shí),及時(shí)通知程序進(jìn)行處理,大大提高了 I/O 的效率 。
接下來(lái),我們通過(guò)性能測(cè)試工具來(lái)展示優(yōu)化前后的效果對(duì)比 。使用Webbench等工具對(duì)聊天服務(wù)器進(jìn)行性能測(cè)試,模擬大量并發(fā)用戶(hù)連接到服務(wù)器,發(fā)送和接收消息 。
在優(yōu)化前,當(dāng)并發(fā)用戶(hù)數(shù)達(dá)到 100 時(shí),服務(wù)器的響應(yīng)時(shí)間明顯增加,部分消息的處理出現(xiàn)延遲,甚至出現(xiàn)丟包現(xiàn)象 。這是因?yàn)閭鹘y(tǒng)的阻塞 I/O 和頻繁的鎖競(jìng)爭(zhēng)導(dǎo)致服務(wù)器的處理能力達(dá)到了瓶頸 。
經(jīng)過(guò)性能優(yōu)化后,同樣在100個(gè)并發(fā)用戶(hù)的情況下,服務(wù)器的響應(yīng)時(shí)間大幅縮短,消息能夠及時(shí)準(zhǔn)確地處理和傳輸,丟包現(xiàn)象也基本消失;這充分展示了減少鎖使用和優(yōu)化I/O操作對(duì)提升服務(wù)器性能的顯著效果 。
通過(guò)不斷地優(yōu)化和測(cè)試,我們能夠打造出一個(gè)高性能、穩(wěn)定可靠的高并發(fā)聊天服務(wù)器,為用戶(hù)提供更加流暢、高效的聊天體驗(yàn) 。





























