偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

深度解讀:C++線程池設(shè)計原理與應(yīng)用實踐

開發(fā) 前端
C++ 作為一門強(qiáng)大的編程語言,其標(biāo)準(zhǔn)庫雖提供了多線程支持,但倘若直接使用std::thread進(jìn)行大規(guī)模并發(fā)編程,線程創(chuàng)建與銷毀所帶來的開銷不容小覷。此時,線程池這一高效管理線程的機(jī)制應(yīng)運(yùn)而生。

C++ 作為一門強(qiáng)大的編程語言,其標(biāo)準(zhǔn)庫雖提供了多線程支持,但倘若直接使用std::thread進(jìn)行大規(guī)模并發(fā)編程,線程創(chuàng)建與銷毀所帶來的開銷不容小覷。此時,線程池這一高效管理線程的機(jī)制應(yīng)運(yùn)而生。簡單來說,線程池是一種多線程處理形式,內(nèi)部維護(hù)著一定數(shù)量的工作線程,并借助任務(wù)隊列管理執(zhí)行任務(wù)。它就像一位智能管家,通過重用已有的線程,顯著減少了對象創(chuàng)建與銷毀的開銷,進(jìn)而提升性能。

而且,當(dāng)任務(wù)抵達(dá)時,無需等待線程創(chuàng)建即可立即執(zhí)行,大大消除了延遲,讓應(yīng)用程序響應(yīng)更為迅速。此外,線程池還能對線程進(jìn)行統(tǒng)一分配、調(diào)優(yōu)與監(jiān)控,極大地提高了線程的可管理性。接下來,讓我們深入探討 C++ 線程池的設(shè)計原理,結(jié)合實際案例展示其在不同場景中的應(yīng)用實踐,一同領(lǐng)略線程池在提升程序性能與資源利用率方面的強(qiáng)大魅力 。

Part1.線程池是什么

1.1線程池概述

在并發(fā)編程的世界里,線程池是一種非常重要的多線程處理技術(shù)。它的核心思想就像是一個精心管理的工人團(tuán)隊,在這個團(tuán)隊里,預(yù)先創(chuàng)建了一定數(shù)量的線程,這些線程就如同待命的工人,時刻準(zhǔn)備接受任務(wù)并執(zhí)行。當(dāng)有新的任務(wù)到來時,線程池不會像傳統(tǒng)方式那樣每次都去創(chuàng)建新的線程,而是直接從這個 “池子” 中挑選一個空閑的線程來處理任務(wù)。當(dāng)任務(wù)執(zhí)行完畢后,線程也不會被銷毀,而是重新回到線程池中,等待下一次任務(wù)的到來 ,就像工人完成一項工作后,不會離開團(tuán)隊,而是繼續(xù)留在團(tuán)隊里等待下一個工作安排。

為什么要使用這樣的方式呢?這是因為線程的創(chuàng)建和銷毀是比較 “昂貴” 的操作,會消耗一定的系統(tǒng)資源和時間。就好比每次有工作來臨時,都重新招聘和解雇工人,這不僅需要花費(fèi)時間和精力去招聘、培訓(xùn)新工人,還可能會因為頻繁的人員變動而影響工作效率。而線程池通過復(fù)用線程,就避免了這種頻繁創(chuàng)建和銷毀線程帶來的開銷,大大提高了系統(tǒng)的性能和資源利用率。同時,線程池還能夠有效地控制并發(fā)的線程數(shù),避免因為線程過多而導(dǎo)致系統(tǒng)資源競爭激烈、上下文切換頻繁等問題,從而保證系統(tǒng)的穩(wěn)定性和高效運(yùn)行 。

圖片圖片

1.2為什么要用 C++ 線程池

在 C++ 編程中,多線程是提升程序性能和處理能力的重要手段。但是,如果每次有任務(wù)就創(chuàng)建新線程,任務(wù)完成就銷毀線程,會帶來諸多問題 。比如,線程創(chuàng)建和銷毀的過程涉及操作系統(tǒng)資源的分配與回收,這個過程需要消耗一定的時間和系統(tǒng)資源。想象一下,你要舉辦一場活動,每次有嘉賓來參加活動,你都要重新搭建一個活動場地,嘉賓離開后又馬上拆除場地,這顯然是非常低效且浪費(fèi)資源的。在程序中,頻繁創(chuàng)建和銷毀線程就類似這種情況,會導(dǎo)致程序的運(yùn)行效率降低,尤其是在處理大量短時間任務(wù)時,這種開銷可能會成為性能瓶頸。

線程數(shù)量過多也會占用大量系統(tǒng)資源,如內(nèi)存、CPU 時間片等。過多的線程同時競爭這些資源,會導(dǎo)致上下文切換頻繁發(fā)生。上下文切換是指當(dāng) CPU 從一個線程切換到另一個線程執(zhí)行時,需要保存當(dāng)前線程的執(zhí)行狀態(tài),然后加載另一個線程的執(zhí)行狀態(tài),這個過程同樣會消耗 CPU 時間和系統(tǒng)資源。就像一個服務(wù)員要同時服務(wù)太多客人,不斷在不同客人之間來回切換,導(dǎo)致每個客人都不能得到及時有效的服務(wù),程序也會因為頻繁的上下文切換而降低整體性能 。

使用 C++ 線程池,可以很好地解決這些問題。線程池通過預(yù)先創(chuàng)建一定數(shù)量的線程,讓這些線程復(fù)用,避免了頻繁的線程創(chuàng)建和銷毀開銷,就像提前搭建好一個固定的活動場地,所有嘉賓都在這個場地里活動,不需要每次都重新搭建和拆除。同時,線程池可以有效控制并發(fā)線程的數(shù)量,避免線程過多導(dǎo)致資源競爭和上下文切換的問題,保證系統(tǒng)資源的合理利用,讓程序能夠更加穩(wěn)定、高效地運(yùn)行 。

Part2.C++線程池的原理剖析

要深入理解 C++ 實現(xiàn)線程池的過程,首先得剖析其核心原理。線程池主要由幾個關(guān)鍵部分協(xié)同工作,包括線程隊列、任務(wù)隊列、互斥鎖、條件變量等,它們各自承擔(dān)著獨(dú)特的職責(zé),共同構(gòu)建起線程池高效運(yùn)行的基礎(chǔ) 。

2.1線程隊列

線程隊列,就像是一個隨時待命的團(tuán)隊,其中包含了預(yù)先創(chuàng)建好的多個線程。這些線程在創(chuàng)建后并不會立即執(zhí)行具體的任務(wù),而是進(jìn)入一種等待狀態(tài),隨時準(zhǔn)備接受任務(wù)的分配。它們就像訓(xùn)練有素的士兵,在軍營中等待著出征的命令。在 C++ 中,我們可以使用std::vector<std::thread>來創(chuàng)建和管理這個線程隊列。例如:

std::vector<std::thread> threads;
for (size_t i = 0; i < threadCount; ++i) {
    threads.emplace_back([this] { this->worker(); });
}

在這段代碼中,threadCount表示我們希望創(chuàng)建的線程數(shù)量,通過循環(huán)創(chuàng)建了threadCount個線程,并將它們添加到threads向量中。每個線程都執(zhí)行worker函數(shù),這個函數(shù)就是線程的工作邏輯所在。

任務(wù)隊列

任務(wù)隊列則是存儲待執(zhí)行任務(wù)的地方,它像是一個任務(wù)倉庫。當(dāng)有新的任務(wù)到來時,就會被添加到這個隊列中等待處理。任務(wù)隊列可以使用std::queue來實現(xiàn),為了確保在多線程環(huán)境下的安全訪問,還需要配合互斥鎖和條件變量。比如:

std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;

這里定義了一個tasks任務(wù)隊列,用于存儲類型為std::function<void()>的任務(wù),也就是可以調(diào)用且無返回值的函數(shù)對象。queueMutex是互斥鎖,用于保護(hù)任務(wù)隊列,防止多個線程同時訪問導(dǎo)致數(shù)據(jù)不一致。condition是條件變量,用于線程間的同步,當(dāng)有新任務(wù)添加到隊列時,通過條件變量通知等待的線程。

2.2互斥鎖

互斥鎖的作用至關(guān)重要,它就像一把鎖,用來保護(hù)共享資源,確保同一時間只有一個線程能夠訪問任務(wù)隊列。當(dāng)一個線程想要訪問任務(wù)隊列(比如添加任務(wù)或取出任務(wù))時,它必須先獲取互斥鎖。如果此時互斥鎖已經(jīng)被其他線程持有,那么這個線程就會被阻塞,直到互斥鎖被釋放。在 C++ 中,使用std::mutex來實現(xiàn)互斥鎖,例如:

std::mutex mutex;
mutex.lock();
// 訪問任務(wù)隊列的代碼
mutex.unlock();

在這段代碼中,mutex.lock()用于獲取互斥鎖,當(dāng)獲取到鎖后,就可以安全地訪問任務(wù)隊列。訪問完成后,通過mutex.unlock()釋放互斥鎖,讓其他線程有機(jī)會獲取鎖并訪問任務(wù)隊列。為了避免忘記解鎖導(dǎo)致死鎖,更推薦使用std::lock_guard或std::unique_lock,它們會在作用域結(jié)束時自動釋放鎖,例如:

{
    std::unique_lock<std::mutex> lock(mutex);
    // 訪問任務(wù)隊列的代碼
} // lock自動析構(gòu),釋放鎖

2.3條件變量

條件變量主要用于線程間的同步和通信。它與互斥鎖配合使用,當(dāng)任務(wù)隊列中沒有任務(wù)時,工作線程可以通過條件變量進(jìn)入等待狀態(tài),釋放互斥鎖,讓出 CPU 資源。當(dāng)有新任務(wù)添加到任務(wù)隊列時,就可以通過條件變量通知等待的線程,讓它們醒來并獲取互斥鎖,從任務(wù)隊列中取出任務(wù)執(zhí)行。例如:

std::condition_variable condition;
std::unique_lock<std::mutex> lock(mutex);
while (tasks.empty()) {
    condition.wait(lock);
}
auto task = std::move(tasks.front());
tasks.pop();

在這段代碼中,condition.wait(lock)會使線程進(jìn)入等待狀態(tài),并釋放lock鎖。當(dāng)其他線程調(diào)用condition.notify_one()或condition.notify_all()通知時,等待的線程會被喚醒,重新獲取lock鎖,然后繼續(xù)執(zhí)行后續(xù)代碼,從任務(wù)隊列中取出任務(wù)。

2.4協(xié)同工作流程

線程池的工作流程是一個有序且高效的協(xié)作過程。當(dāng)有新任務(wù)到來時,任務(wù)會被添加到任務(wù)隊列中。這個過程中,需要先獲取互斥鎖,以保證任務(wù)隊列的線程安全。添加任務(wù)后,通過條件變量通知等待的線程有新任務(wù)到來。

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

在這段代碼中,enqueue函數(shù)用于將任務(wù)添加到任務(wù)隊列。首先,通過std::bind和std::make_shared創(chuàng)建一個包裝了任務(wù)的std::packaged_task,并獲取其對應(yīng)的std::future用于獲取任務(wù)執(zhí)行結(jié)果。然后,在臨界區(qū)內(nèi)(通過std::unique_lock自動管理鎖)將任務(wù)添加到任務(wù)隊列tasks中。最后,通過condition.notify_one()通知一個等待的線程有新任務(wù)。

而工作線程在啟動后,會不斷地嘗試從任務(wù)隊列中獲取任務(wù)并執(zhí)行。它們首先獲取互斥鎖,檢查任務(wù)隊列是否為空。如果為空,就通過條件變量等待,直到有新任務(wù)被添加。當(dāng)獲取到任務(wù)后,線程會執(zhí)行任務(wù),執(zhí)行完成后再次回到獲取任務(wù)的循環(huán)中。如果線程池停止,且任務(wù)隊列為空,線程就會退出。

void ThreadPool::worker() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            condition.wait(lock, [this] { return stop ||!tasks.empty(); });
            if (stop && tasks.empty()) return;
            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}

在worker函數(shù)中,線程首先創(chuàng)建一個std::function<void()>類型的變量task用于存儲從任務(wù)隊列中取出的任務(wù)。然后,在臨界區(qū)內(nèi)(通過std::unique_lock自動管理鎖),使用condition.wait等待條件滿足,條件是stop為true(表示線程池停止)或者任務(wù)隊列不為空。當(dāng)條件滿足且stop為true且任務(wù)隊列為空時,線程返回退出。否則,從任務(wù)隊列中取出任務(wù)并移動到task中。最后,在臨界區(qū)外執(zhí)行任務(wù)task()。這樣,通過線程隊列、任務(wù)隊列、互斥鎖和條件變量的緊密協(xié)作,線程池實現(xiàn)了高效的任務(wù)管理和并發(fā)執(zhí)行 。

Part3.線程池實現(xiàn)方式

若線程池配置了 3 個線程,且任務(wù)隊列不設(shè)容量上限,其運(yùn)行時會出現(xiàn)哪些典型情況?

具體來說,可能包括:當(dāng)任務(wù)數(shù)量少于或等于 3 時,所有任務(wù)可被立即分配給線程執(zhí)行;當(dāng)任務(wù)數(shù)量超過 3 時,超出部分會進(jìn)入隊列等待;若任務(wù)持續(xù)涌入且執(zhí)行速度慢于提交速度,隊列會不斷膨脹;當(dāng)所有任務(wù)執(zhí)行完畢后,3 個線程會處于空閑狀態(tài)等待新任務(wù);此外,還可能出現(xiàn)部分線程因任務(wù)阻塞而暫時閑置,其他線程仍在工作的情況。

3.1情況

①:無事可做,集體待命

線程池已啟動,三個工作線程全部就緒,但主線程尚未提交任何任務(wù)。此時任務(wù)隊列空無一人,三個線程只能原地阻塞等待,如同剛放長假時的你,一身力氣沒處使,只能閑著發(fā)呆。

圖片圖片

3.2情況

②:任務(wù)抵達(dá),恰好分配

主線程忽然送來三個任務(wù),像投遞包裹般接連放入隊列。三個線程隨即反應(yīng):“有任務(wù)了!” 立刻從等待狀態(tài)中蘇醒,依次取出任務(wù)。任務(wù)隊列轉(zhuǎn)眼被取空,三個線程各自帶著任務(wù)投入執(zhí)行。此時主線程也毫無壓力,因為它提交的任務(wù)數(shù)量剛好能被線程池容納,不多不少。

圖片圖片

3.3情況

③:任務(wù)過剩,排隊等候

三個線程正全力處理任務(wù)時,主線程又新增了一個任務(wù)。由于此時線程池已無空閑線程,這個新任務(wù)只能進(jìn)入隊列排隊等候。待三個線程中任意一個完成手頭工作,便會主動從隊列中取出下一個任務(wù)繼續(xù)執(zhí)行。這正是線程池的 “先處理,后排隊” 機(jī)制,形成了 “線程等任務(wù),任務(wù)等線程” 的循環(huán)。

圖片圖片

3.4情況

④:任務(wù)爆滿,主線程被迫停滯

這種情形較為極端,我們先假設(shè)任務(wù)隊列設(shè)有最大容量限制(例如最多只能存放 5 個任務(wù))。此時線程池的三個線程都在忙碌,隊列也已處于滿員狀態(tài)。當(dāng)主線程再想提交新任務(wù)時,會發(fā)現(xiàn)既沒有空閑位置可存放,也沒有線程能接手,只能原地等待,直到隊列中有任務(wù)被取走、騰出空位為止。

圖片圖片

Part4.C++實現(xiàn)線程池的步驟

4.1創(chuàng)建任務(wù)類

任務(wù)類在整個線程池體系中扮演著關(guān)鍵的角色,它主要負(fù)責(zé)封裝任務(wù)函數(shù)以及與之相關(guān)的參數(shù),使得任務(wù)能夠以一種統(tǒng)一、規(guī)范的形式被線程池管理和調(diào)度。

在 C++ 中,我們可以通過以下方式來定義一個任務(wù)類:

class Task {
public:
    // 使用模板來接受任意可調(diào)用對象及其參數(shù)
    template<class F, class... Args>
    Task(F&& f, Args&&... args) : func(std::bind(std::forward<F>(f), std::forward<Args>(args)...)) {}

    // 定義任務(wù)的執(zhí)行函數(shù)
    void execute() {
        if (func) {
            func();
        }
    }

private:
    std::function<void()> func;
};

在上述代碼中,我們利用了 C++ 的模板特性和std::function、std::bind來實現(xiàn)任務(wù)的封裝。std::function<void()>類型的成員變量func用于存儲可調(diào)用對象,通過std::bind將傳入的函數(shù)f和參數(shù)args綁定成一個無參的可調(diào)用對象,賦值給func。execute函數(shù)則是任務(wù)的執(zhí)行入口,當(dāng)調(diào)用execute時,會執(zhí)行綁定好的函數(shù)。

例如,假設(shè)有一個簡單的加法函數(shù):

int add(int a, int b) {
    return a + b;
}

我們可以創(chuàng)建一個任務(wù)對象來執(zhí)行這個加法操作:

Task task(add, 3, 5);
task.execute(); // 執(zhí)行任務(wù),相當(dāng)于調(diào)用add(3, 5)

這樣,通過任務(wù)類的封裝,我們可以將各種不同的任務(wù)以統(tǒng)一的方式進(jìn)行管理和執(zhí)行,為線程池的任務(wù)調(diào)度提供了基礎(chǔ)。

4.2構(gòu)建線程池類

線程池類是整個線程池實現(xiàn)的核心部分,它負(fù)責(zé)管理線程隊列、任務(wù)隊列以及協(xié)調(diào)線程的工作。下面是線程池類的基本框架:

class ThreadPool {
public:
    // 構(gòu)造函數(shù),初始化線程池
    ThreadPool(size_t numThreads);

    // 析構(gòu)函數(shù),清理線程池資源
    ~ThreadPool();

    // 添加任務(wù)到任務(wù)隊列
    template<class F, class... Args>
    void enqueue(F&& f, Args&&... args);

private:
    // 線程執(zhí)行的函數(shù)
    void worker();

    // 線程隊列
    std::vector<std::thread> threads;

    // 任務(wù)隊列
    std::queue<std::unique_ptr<Task>> tasks;

    // 互斥鎖,保護(hù)任務(wù)隊列
    std::mutex queueMutex;

    // 條件變量,用于線程同步
    std::condition_variable condition;

    // 線程池停止標(biāo)志
    bool stop;
};

在這個線程池類中:

成員變量

threads是一個std::vector<std::thread>類型的線程隊列,用于存儲線程對象,每個線程都將執(zhí)行worker函數(shù)。

tasks是一個std::queue<std::unique_ptr<Task>>類型的任務(wù)隊列,用于存儲任務(wù)對象,這里使用std::unique_ptr來管理任務(wù)對象的生命周期,確保內(nèi)存安全。

queueMutex是一個互斥鎖,用于保護(hù)任務(wù)隊列,防止多個線程同時訪問任務(wù)隊列導(dǎo)致數(shù)據(jù)不一致。

condition是一個條件變量,與互斥鎖配合使用,用于線程間的同步。當(dāng)任務(wù)隊列中沒有任務(wù)時,工作線程可以通過條件變量進(jìn)入等待狀態(tài),當(dāng)有新任務(wù)添加到任務(wù)隊列時,通過條件變量通知等待的線程。

stop是一個布爾類型的標(biāo)志,用于控制線程池的停止。當(dāng)stop為true時,線程池將停止接受新任務(wù),并在處理完現(xiàn)有任務(wù)后關(guān)閉所有線程。

構(gòu)造函數(shù)

ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        threads.emplace_back([this] { this->worker(); });
    }
}

構(gòu)造函數(shù)接受一個參數(shù)numThreads,表示線程池中的線程數(shù)量。在構(gòu)造函數(shù)中,通過循環(huán)創(chuàng)建numThreads個線程,并將它們添加到threads隊列中。每個線程都執(zhí)行worker函數(shù),[this] { this->worker(); }是一個 lambda 表達(dá)式,它捕獲了this指針,使得線程能夠訪問線程池類的成員函數(shù)和變量。

析構(gòu)函數(shù)

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& thread : threads) {
        thread.join();
    }
}

析構(gòu)函數(shù)用于清理線程池的資源。首先,在一個臨界區(qū)內(nèi)(通過std::unique_lock自動管理鎖)將stop標(biāo)志設(shè)置為true,表示線程池要停止。然后,通過condition.notify_all()通知所有等待的線程,讓它們有機(jī)會檢查stop標(biāo)志并退出。最后,通過循環(huán)調(diào)用thread.join()等待所有線程執(zhí)行完畢,釋放線程資源。

添加任務(wù)函數(shù)

template<class F, class... Args>
void ThreadPool::enqueue(F&& f, Args&&... args) {
    auto task = std::make_unique<Task>(std::forward<F>(f), std::forward<Args>(args)...);
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.push(std::move(task));
    }
    condition.notify_one();
}

enqueue函數(shù)是一個模板函數(shù),用于將任務(wù)添加到任務(wù)隊列中。它接受一個可調(diào)用對象f和一系列參數(shù)args,通過std::make_unique創(chuàng)建一個Task對象,并將其添加到任務(wù)隊列tasks中。在添加任務(wù)時,先獲取互斥鎖,確保任務(wù)隊列的線程安全。如果線程池已經(jīng)停止(stop為true),則拋出異常。添加任務(wù)后,釋放互斥鎖,并通過condition.notify_one()通知一個等待的線程有新任務(wù)到來。

4.3實現(xiàn)關(guān)鍵函數(shù)

在上述線程池類中,worker函數(shù)和enqueue函數(shù)是實現(xiàn)線程池功能的關(guān)鍵。

worker函數(shù)

void ThreadPool::worker() {
    while (true) {
        std::unique_ptr<Task> task;
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            condition.wait(lock, [this] { return stop ||!tasks.empty(); });
            if (stop && tasks.empty()) {
                return;
            }
            task = std::move(tasks.front());
            tasks.pop();
        }
        task->execute();
    }
}

worker函數(shù)是線程執(zhí)行的主體函數(shù)。它在一個無限循環(huán)中運(yùn)行,不斷嘗試從任務(wù)隊列中獲取任務(wù)并執(zhí)行。

具體步驟如下:

  1. 首先創(chuàng)建一個std::unique_ptr<Task>類型的變量task,用于存儲從任務(wù)隊列中取出的任務(wù)。
  2. 使用std::unique_lock<std::mutex>來獲取互斥鎖,進(jìn)入臨界區(qū),保護(hù)任務(wù)隊列的訪問。
  3. 使用condition.wait等待條件滿足,條件是stop為true(表示線程池停止)或者任務(wù)隊列不為空。condition.wait會自動釋放互斥鎖,使線程進(jìn)入等待狀態(tài),直到被condition.notify_one()或condition.notify_all()喚醒。當(dāng)線程被喚醒時,會重新獲取互斥鎖,繼續(xù)執(zhí)行后續(xù)代碼。
  4. 檢查stop標(biāo)志和任務(wù)隊列是否為空,如果stop為true且任務(wù)隊列為空,說明線程池已經(jīng)停止且沒有任務(wù)了,此時線程返回,結(jié)束執(zhí)行。
  5. 從任務(wù)隊列中取出第一個任務(wù),并將其移動到task變量中,然后將任務(wù)從任務(wù)隊列中移除。
  6. 退出臨界區(qū),釋放互斥鎖,執(zhí)行任務(wù)的execute函數(shù),完成任務(wù)的執(zhí)行。
  7. 循環(huán)回到開始,繼續(xù)等待下一個任務(wù)。

enqueue函數(shù)

template<class F, class... Args>
void ThreadPool::enqueue(F&& f, Args&&... args) {
    auto task = std::make_unique<Task>(std::forward<F>(f), std::forward<Args>(args)...);
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.push(std::move(task));
    }
    condition.notify_one();
}

enqueue函數(shù)用于將任務(wù)添加到任務(wù)隊列中。具體步驟如下:

  1. 使用std::make_unique創(chuàng)建一個Task對象,將傳入的可調(diào)用對象f和參數(shù)args封裝到任務(wù)對象中。這里使用std::forward來實現(xiàn)完美轉(zhuǎn)發(fā),確保參數(shù)的左值 / 右值特性不變。
  2. 使用std::unique_lock<std::mutex>獲取互斥鎖,進(jìn)入臨界區(qū),保護(hù)任務(wù)隊列的訪問。
  3. 檢查線程池是否已經(jīng)停止,如果stop為true,說明線程池已經(jīng)停止,此時拋出std::runtime_error異常,提示不能在停止的線程池中添加任務(wù)。
  4. 將創(chuàng)建好的任務(wù)對象通過std::move移動到任務(wù)隊列tasks中,std::move用于將一個對象的所有權(quán)轉(zhuǎn)移給另一個對象,避免不必要的拷貝。
  5. 退出臨界區(qū),釋放互斥鎖。
  6. 通過condition.notify_one()通知一個等待的線程有新任務(wù)到來,被通知的線程會在condition.wait處被喚醒,然后嘗試從任務(wù)隊列中獲取任務(wù)并執(zhí)行。

通過以上關(guān)鍵函數(shù)的實現(xiàn),線程池能夠有效地管理線程和任務(wù),實現(xiàn)任務(wù)的并發(fā)執(zhí)行,提高程序的性能和效率。

Part5.線程池代碼示例與解析

5.1完整代碼展示

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

private:
    void worker();

    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;

    std::mutex queueMutex;
    std::condition_variable condition;

    bool stop;
};


ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        threads.emplace_back([this] {
            while (true) {
                std::function<void()> task;
                {
                    std::unique_lock<std::mutex> lock(queueMutex);
                    condition.wait(lock, [this] { return stop ||!tasks.empty(); });
                    if (stop && tasks.empty())
                        return;
                    task = std::move(tasks.front());
                    tasks.pop();
                }
                task();
            }
        });
    }
}


ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& thread : threads) {
        thread.join();
    }
}


template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

5.2代碼逐行解析

包含頭文件

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

這些頭文件提供了實現(xiàn)線程池所需的各種工具和數(shù)據(jù)結(jié)構(gòu)。vector用于存儲線程隊列,queue用于實現(xiàn)任務(wù)隊列,thread用于線程操作,mutex和condition_variable用于線程同步,functional用于處理可調(diào)用對象,future用于獲取異步任務(wù)的結(jié)果。

線程池類定義

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

private:
    void worker();

    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;

    std::mutex queueMutex;
    std::condition_variable condition;

    bool stop;
};

公有成員函數(shù)

  • ThreadPool(size_t numThreads):構(gòu)造函數(shù),用于初始化線程池,接受線程數(shù)量作為參數(shù)。
  • ~ThreadPool():析構(gòu)函數(shù),用于清理線程池資源,停止所有線程并等待它們結(jié)束。
  • template<class F, class... Args> auto enqueue(F&& f, Args&&... args) ->std::future<typename std::result_of<F(Args...)>::type>:模板函數(shù),用于將任務(wù)添加到任務(wù)隊列中,并返回一個std::future對象,以便獲取任務(wù)的執(zhí)行結(jié)果。

私有成員函數(shù):void worker():線程執(zhí)行的函數(shù),每個線程都會調(diào)用這個函數(shù),從任務(wù)隊列中獲取任務(wù)并執(zhí)行。

私有成員變量

  • std::vector<std::thread> threads:線程隊列,存儲線程對象。
  • std::queue<std::function<void()>> tasks:任務(wù)隊列,存儲可調(diào)用對象,即任務(wù)。
  • std::mutex queueMutex:互斥鎖,用于保護(hù)任務(wù)隊列,確保線程安全。
  • std::condition_variable condition:條件變量,用于線程同步,當(dāng)任務(wù)隊列有新任務(wù)時通知等待的線程。
  • bool stop:線程池停止標(biāo)志,當(dāng)stop為true時,線程池停止接受新任務(wù),并在處理完現(xiàn)有任務(wù)后關(guān)閉所有線程。

構(gòu)造函數(shù)實現(xiàn)

ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        threads.emplace_back([this] {
            while (true) {
                std::function<void()> task;
                {
                    std::unique_lock<std::mutex> lock(queueMutex);
                    condition.wait(lock, [this] { return stop ||!tasks.empty(); });
                    if (stop && tasks.empty())
                        return;
                    task = std::move(tasks.front());
                    tasks.pop();
                }
                task();
            }
        });
    }
}
  • ThreadPool::ThreadPool(size_t numThreads) : stop(false):構(gòu)造函數(shù)初始化列表,將stop標(biāo)志初始化為false。
  • for (size_t i = 0; i < numThreads; ++i):循環(huán)創(chuàng)建numThreads個線程。
  • threads.emplace_back([this] {... });:使用emplace_back將新線程添加到threads隊列中,每個線程執(zhí)行一個 lambda 表達(dá)式。
  • while (true):線程的主循環(huán),不斷嘗試從任務(wù)隊列中獲取任務(wù)并執(zhí)行。
  • std::function<void()> task;:定義一個變量task,用于存儲從任務(wù)隊列中取出的任務(wù)。
  • std::unique_lock<std::mutex> lock(queueMutex);:創(chuàng)建一個std::unique_lock對象,自動管理queueMutex鎖,進(jìn)入臨界區(qū)。
  • condition.wait(lock, [this] { return stop ||!tasks.empty(); });:線程等待條件變量condition,當(dāng)stop為true或者任務(wù)隊列不為空時,線程被喚醒。condition.wait會自動釋放lock鎖,使線程進(jìn)入等待狀態(tài),直到被通知喚醒,喚醒后會重新獲取lock鎖。
  • if (stop && tasks.empty()) return;:如果stop為true且任務(wù)隊列為空,說明線程池已經(jīng)停止且沒有任務(wù)了,線程返回,結(jié)束執(zhí)行。
  • task = std::move(tasks.front()); tasks.pop();:從任務(wù)隊列中取出第一個任務(wù),并將其移動到task變量中,然后將任務(wù)從任務(wù)隊列中移除。
  • task();:執(zhí)行任務(wù)。

析構(gòu)函數(shù)實現(xiàn)

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& thread : threads) {
        thread.join();
    }
}
  • std::unique_lock<std::mutex> lock(queueMutex); stop = true;:在臨界區(qū)內(nèi),將stop標(biāo)志設(shè)置為true,表示線程池要停止。
  • condition.notify_all();:通知所有等待的線程,讓它們有機(jī)會檢查stop標(biāo)志并退出。
  • for (std::thread& thread : threads) { thread.join(); }:通過循環(huán)調(diào)用thread.join()等待所有線程執(zhí)行完畢,釋放線程資源。

添加任務(wù)函數(shù)實現(xiàn)

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}
  • using return_type = typename std::result_of<F(Args...)>::type;:使用std::result_of獲取函數(shù)F的返回值類型,并將其命名為return_type。
  • auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));:使用std::make_shared創(chuàng)建一個std::packaged_task對象,將傳入的函數(shù)f和參數(shù)args綁定在一起,并包裝成一個可調(diào)用對象,用于異步執(zhí)行任務(wù)。std::forward用于完美轉(zhuǎn)發(fā),確保參數(shù)的左值 / 右值特性不變。
  • std::future<return_type> res = task->get_future();:獲取std::packaged_task對象的std::future,用于獲取任務(wù)的執(zhí)行結(jié)果。
  • std::unique_lock<std::mutex> lock(queueMutex);:創(chuàng)建一個std::unique_lock對象,自動管理queueMutex鎖,進(jìn)入臨界區(qū)。
  • if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");:檢查線程池是否已經(jīng)停止,如果stop為true,說明線程池已經(jīng)停止,拋出std::runtime_error異常,提示不能在停止的線程池中添加任務(wù)。
  • tasks.emplace([task]() { (*task)(); });:將任務(wù)添加到任務(wù)隊列中,[task]() { (*task)(); }是一個 lambda 表達(dá)式,用于執(zhí)行std::packaged_task對象。
  • condition.notify_one();:通知一個等待的線程有新任務(wù)到來,被通知的線程會在condition.wait處被喚醒,然后嘗試從任務(wù)隊列中獲取任務(wù)并執(zhí)行。
  • return res;:返回std::future對象,以便調(diào)用者獲取任務(wù)的執(zhí)行結(jié)果。

Part6.線程池的測試與優(yōu)化

6.1測試線程池

為了驗證我們實現(xiàn)的線程池是否正確且穩(wěn)定,編寫測試代碼是必不可少的環(huán)節(jié)。通過測試,我們可以檢查線程池的各項功能,如添加任務(wù)、執(zhí)行任務(wù)、線程復(fù)用等是否符合預(yù)期。以下是一個簡單的測試示例:

#include <iostream>
#include <chrono>
#include <thread>
#include "ThreadPool.h"  // 假設(shè)線程池定義在這個頭文件中

// 定義一個簡單的任務(wù)函數(shù)
void simpleTask(int num) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Task " << num << " executed by thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    ThreadPool pool(4);  // 創(chuàng)建一個包含4個線程的線程池

    // 添加多個任務(wù)到線程池
    for (int i = 0; i < 10; ++i) {
        pool.enqueue(simpleTask, i);
    }

    // 等待一段時間,讓任務(wù)有足夠時間執(zhí)行
    std::this_thread::sleep_for(std::chrono::seconds(3));

    return 0;
}

在這個測試代碼中:

  • 首先定義了一個simpleTask函數(shù),它接受一個整數(shù)參數(shù)num,在函數(shù)內(nèi)部,線程會休眠 1 秒鐘,然后輸出任務(wù)編號和執(zhí)行該任務(wù)的線程 ID。
  • 在main函數(shù)中,創(chuàng)建了一個包含 4 個線程的線程池pool。
  • 通過循環(huán)調(diào)用pool.enqueue方法,向線程池中添加 10 個任務(wù),每個任務(wù)都執(zhí)行simpleTask函數(shù),并傳入不同的參數(shù)i。
  • 最后,主線程休眠 3 秒鐘,這是為了給線程池中的線程足夠的時間來執(zhí)行任務(wù)。在實際應(yīng)用中,可能需要更復(fù)雜的同步機(jī)制來確保所有任務(wù)都執(zhí)行完畢。

運(yùn)行這個測試代碼后,可以觀察輸出結(jié)果,確認(rèn)每個任務(wù)是否都被正確執(zhí)行,以及任務(wù)是否是由線程池中的不同線程執(zhí)行的,從而驗證線程池的功能是否正常。

6.2性能優(yōu)化

盡管我們已經(jīng)實現(xiàn)了一個基本的線程池,但在實際應(yīng)用中,還需要對其性能進(jìn)行優(yōu)化,以滿足不同場景的需求。下面分析一些常見的性能瓶頸,并提出相應(yīng)的優(yōu)化建議。

調(diào)整線程數(shù)量:線程池中的線程數(shù)量是一個關(guān)鍵參數(shù),它直接影響著線程池的性能。如果線程數(shù)量過少,可能導(dǎo)致任務(wù)等待時間過長,無法充分利用系統(tǒng)資源;而線程數(shù)量過多,則會增加線程上下文切換的開銷,甚至可能導(dǎo)致系統(tǒng)資源耗盡。因此,需要根據(jù)任務(wù)的類型和系統(tǒng)的硬件配置來合理調(diào)整線程數(shù)量。

  • 對于 CPU 密集型任務(wù):由于這類任務(wù)主要消耗 CPU 資源,過多的線程會導(dǎo)致頻繁的上下文切換,降低性能。一般來說,線程數(shù)量可以設(shè)置為 CPU 核心數(shù)或略小于 CPU 核心數(shù)。例如,在一個具有 4 個 CPU 核心的系統(tǒng)中,對于 CPU 密集型任務(wù),線程池的線程數(shù)量可以設(shè)置為 4 或 3。
  • 對于 I/O 密集型任務(wù):這類任務(wù)在執(zhí)行過程中大部分時間都在等待 I/O 操作完成,CPU 利用率相對較低。因此,可以適當(dāng)增加線程數(shù)量,以充分利用 CPU 資源。通常,線程數(shù)量可以設(shè)置為 CPU 核心數(shù)的 2 - 3 倍。比如,在同樣具有 4 個 CPU 核心的系統(tǒng)中,對于 I/O 密集型任務(wù),線程池的線程數(shù)量可以設(shè)置為 8 - 12。

優(yōu)化任務(wù)隊列的數(shù)據(jù)結(jié)構(gòu):任務(wù)隊列是線程池中的重要組成部分,其數(shù)據(jù)結(jié)構(gòu)的選擇會影響任務(wù)的添加和獲取效率。在前面的實現(xiàn)中,我們使用了std::queue作為任務(wù)隊列,它是一個基于鏈表的隊列,在多線程環(huán)境下,鏈表的操作可能會帶來一定的性能開銷。

  • 可以考慮使用無鎖隊列:無鎖隊列利用原子操作來實現(xiàn)線程安全,避免了傳統(tǒng)鎖機(jī)制帶來的開銷,能夠提高任務(wù)隊列在高并發(fā)場景下的性能。例如,concurrent_queue是一個開源的無鎖隊列實現(xiàn),它基于 CAS(Compare - And - Swap)操作,在多線程環(huán)境下具有較高的性能。
  • 根據(jù)任務(wù)特性選擇合適的隊列:如果任務(wù)具有優(yōu)先級之分,可以使用優(yōu)先級隊列(如std::priority_queue)來存儲任務(wù),這樣可以確保高優(yōu)先級的任務(wù)優(yōu)先被執(zhí)行。在一個實時系統(tǒng)中,可能會有一些緊急任務(wù)需要立即處理,使用優(yōu)先級隊列就能滿足這種需求。

減少鎖的競爭:在多線程環(huán)境下,鎖的競爭是導(dǎo)致性能下降的一個重要因素。在線程池的實現(xiàn)中,互斥鎖用于保護(hù)任務(wù)隊列的訪問,當(dāng)多個線程同時嘗試訪問任務(wù)隊列時,就會產(chǎn)生鎖競爭。

  • 使用細(xì)粒度鎖:可以將任務(wù)隊列按照一定的規(guī)則進(jìn)行劃分,每個部分使用單獨(dú)的互斥鎖進(jìn)行保護(hù)。這樣,不同的線程可以同時訪問不同部分的任務(wù)隊列,減少鎖的競爭。例如,將任務(wù)隊列按照任務(wù)類型劃分為多個子隊列,每個子隊列都有自己的互斥鎖。
  • 采用無鎖數(shù)據(jù)結(jié)構(gòu):除了前面提到的無鎖隊列,還可以使用其他無鎖數(shù)據(jù)結(jié)構(gòu)來替代傳統(tǒng)的加鎖方式。例如,std::atomic類型可以用于實現(xiàn)一些簡單的無鎖數(shù)據(jù)結(jié)構(gòu),如原子計數(shù)器,它可以在多線程環(huán)境下高效地進(jìn)行計數(shù)操作,而不需要使用鎖。

通過以上性能優(yōu)化措施,可以顯著提升線程池的性能和效率,使其能夠更好地適應(yīng)各種復(fù)雜的應(yīng)用場景。

責(zé)任編輯:武曉燕 來源: 深度Linux
相關(guān)推薦

2025-02-24 08:00:00

線程池Java開發(fā)

2010-02-05 16:46:58

C++ TinyXml

2010-02-04 09:33:08

C++指針重載

2010-02-03 15:58:51

C++ timer

2017-05-04 16:33:58

Java線程池實踐

2024-05-06 00:00:00

ThreadPool線程調(diào)度

2010-02-05 17:58:32

C++鏈棧模板

2025-05-06 09:12:46

2025-01-14 00:10:00

Java應(yīng)用程序

2020-03-05 15:34:16

線程池C語言局域網(wǎng)

2025-06-06 02:00:00

2025-03-25 10:29:52

2020-10-19 10:01:12

Nodejs線程池設(shè)計

2009-07-22 09:39:18

CLR線程池

2024-10-12 14:18:21

C++OOP函數(shù)重載

2024-07-15 08:20:24

2024-10-18 16:58:26

2015-11-30 11:14:59

C++對象池自動回收

2021-05-26 11:30:24

Java線程池代碼

2010-01-21 14:07:14

CC++聲明
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號