偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

從零到一實(shí)現(xiàn) Rust 的 Channel 并發(fā)處理模型

開(kāi)發(fā) 前端
這篇文章我們介紹 Rust 中并發(fā)的基礎(chǔ)概念,包括 Mutex、Condvar、Arc、Atomic 等,然后我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 MPSC channel,即多生產(chǎn)者單消費(fèi)者模型,理解了 channel 內(nèi)部的實(shí)現(xiàn)原理,其內(nèi)部也是基于 Mutex 和 Condvar 這些基礎(chǔ)的原語(yǔ)來(lái)實(shí)現(xiàn)的。

隨著 SWC、NAPI-RS、Rspack 等等 Rust 前端工具鏈的出現(xiàn),Rust 正在逐步成為前端工程化的一種新的選擇,無(wú)論是在性能、安全性還是開(kāi)發(fā)體驗(yàn)上都有著很大的優(yōu)勢(shì)。筆者在工作中也在使用 Rust 進(jìn)行一些前端工具鏈的開(kāi)發(fā)工作,對(duì)于 Rust 的一些特性也在不斷的學(xué)習(xí)和探索,最近也會(huì)不定期的分享一些 Rust 的相關(guān)內(nèi)容,比如: 如何用 napi-rs 搭建一個(gè) Node.js 可以調(diào)用的 Rust 庫(kù)、Rust 并發(fā)和異步模型、Rust 宏編程 等等話題。

這篇文章將會(huì)圍繞 Rust 的并發(fā)模型展開(kāi),首先會(huì)介紹并發(fā)的基本概念,然后會(huì)對(duì) Rust 中一些重要的并發(fā)工具進(jìn)行介紹,比如 Atomic、Mutex、Condvar 等等,最后會(huì)實(shí)現(xiàn)一個(gè) channel 并發(fā)處理模型。

注: 關(guān)于基礎(chǔ)的環(huán)境搭建和語(yǔ)法內(nèi)容不會(huì)進(jìn)行講解,可以參考 《Rust 語(yǔ)言圣經(jīng)》這本書(shū),相信對(duì)于初學(xué)者是一個(gè)不錯(cuò)的選擇,地址: https://course.rs/about-book.html。

什么是并發(fā)?

要理解并發(fā),我們繞不開(kāi)另外一個(gè)相似的概念——并行,這兩個(gè)概念也是計(jì)算機(jī)科學(xué)中經(jīng)常被提到的兩個(gè)概念,它們之間到底有什么區(qū)別?

這里引入非常經(jīng)典的解釋,來(lái)自 Golang 之父 Rob Pike 的一段話:

Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.

翻譯過(guò)來(lái)就是: 并發(fā)是指同時(shí)處理很多事情,而并行是指同時(shí)做很多事情。

在并發(fā)的場(chǎng)景中,對(duì)于正在處理的一些任務(wù),雖然看起來(lái)好像它們?cè)谕瑫r(shí)執(zhí)行,但實(shí)際上是通過(guò)在單個(gè)處理器上交替輪流運(yùn)行,某個(gè)時(shí)刻只有一個(gè)任務(wù)在運(yùn)行,而其他任務(wù)都處于等待狀態(tài)。

而在并行的場(chǎng)景中,對(duì)于正在處理的一些任務(wù),它們是真正的同時(shí)執(zhí)行。

而兩者也并不是相互排斥的,并發(fā)和并行可以同時(shí)存在,比如在多核的 CPU 中,我們可以同時(shí)運(yùn)行多個(gè)并發(fā)的任務(wù),這樣就可以充分利用多核 CPU 的優(yōu)勢(shì),提高程序的執(zhí)行效率。

Rust 中的并發(fā)原語(yǔ)

我們通??梢酝ㄟ^(guò)把任務(wù)放到多線程,或者多個(gè)異步任務(wù)來(lái)實(shí)現(xiàn)并發(fā),在這個(gè)過(guò)程中,其實(shí)真正的難點(diǎn)不在于如何創(chuàng)建多個(gè)線程或者異步任務(wù),而在于如何處理這些并發(fā)任務(wù)的同步和競(jìng)態(tài)問(wèn)題。

在 Rust 中,提供了一些并發(fā)原語(yǔ),來(lái)幫助我們處理并發(fā)任務(wù)的同步和競(jìng)態(tài)問(wèn)題,這些原語(yǔ)包括: Atomic、Mutex、Condvar、Arc 等等,下面我們來(lái)逐一介紹一下。

Atomic

Atomic 是原子操作,它提供了一些原子操作的方法,比如 fetch_add、fetch_sub 等等,這些方法都是原子化的,也就是說(shuō),這些方法在執(zhí)行的過(guò)程中,不會(huì)被其他線程打斷,也不會(huì)被其他線程修改,這樣就可以保證這些方法的執(zhí)行是安全的。比如:

use std::sync::atomic::{AtomicUsize, Ordering};

let a = AtomicUsize::new(0);
a.fetch_add(1, Ordering::SeqCst);

Ordering::SeqCst 代表嚴(yán)格控制操作順序的一致性,可以參考: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html

上面的代碼中,我們創(chuàng)建了一個(gè) AtomicUsize 類型的變量 a,然后調(diào)用了 fetch_add 方法,這個(gè)方法會(huì)將 a 的值加 1,這個(gè)過(guò)程是原子化的。

為什么這里要突出強(qiáng)調(diào)一下原子化呢?這里我們來(lái)舉個(gè)例子:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

let counter = AtomicUsize::new(0);

let t1 = thread::spawn(|| {
for _ in 0..100 {
counter.fetch_add(1, Ordering::Relaxed);
}
});

let t2 = thread::spawn(|| {
for _ in 0..100 {
counter.fetch_add(1, Ordering::Relaxed);
}
});

t1.join().unwrap();
t2.join().unwrap();

assert_eq!(counter.load(Ordering::Relaxed), 200);

如果 fetch_add 方法執(zhí)行不是原子化的,那么就可能出現(xiàn)競(jìng)態(tài)問(wèn)題。例如,當(dāng)線程 t1 和 t2 同時(shí)運(yùn)行時(shí),它們可能讀取相同的計(jì)數(shù)器值,然后各自將其增加,并將結(jié)果存回計(jì)數(shù)器中,從而導(dǎo)致丟失一次增加的操作。這樣就會(huì)導(dǎo)致最終結(jié)果小于預(yù)期值 200。

所以所謂的原子化,實(shí)際上是將某些步驟合并成一個(gè)原子操作,不能中斷,拿這里的 fetch_add 來(lái)說(shuō):

  1. 讀取 counter 的值。
  2. 將 counter 的值加 1。

這兩個(gè)步驟不能中斷,如果中斷了,那么就會(huì)導(dǎo)致競(jìng)態(tài)問(wèn)題。

Mutex

Mutex 是常用的一種互斥鎖,它可以保證在同一時(shí)刻,只有一個(gè)線程可以訪問(wèn)某個(gè)數(shù)據(jù),其他線程必須等待,直到鎖被釋放。

Mutex 有兩種狀態(tài): 鎖定和未鎖定,當(dāng) Mutex 處于鎖定狀態(tài)時(shí),其他線程就無(wú)法再次獲取鎖,直到 Mutex 處于未鎖定狀態(tài)。

舉一個(gè)例子:

use std::sync::Mutex;
use std::thread;

let counter = Mutex::new(0);

let mut handles = vec![];

for _ in 0..10 {
let handle = thread::spawn(move || {
let mut value = counter.lock().unwrap();
*value += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());

這段代碼會(huì)有編譯問(wèn)題,后續(xù)會(huì)分析。

這里我們通過(guò)循環(huán)創(chuàng)建了 10 個(gè)線程來(lái)增加計(jì)數(shù)器的值。每個(gè)線程都獲取了 Mutex 鎖,并修改了計(jì)數(shù)器的值。當(dāng)某個(gè)線程完成時(shí),它會(huì)釋放互斥鎖,允許其他線程進(jìn)行修改。

最后,我們使用 join() 方法等待所有線程完成,并打印出最終結(jié)果。

但這里的代碼涉及到所有權(quán)轉(zhuǎn)移的問(wèn)題,我們知道,在 Rust 中,同一時(shí)間一個(gè)變量只能有一個(gè)所有者,當(dāng)我們將 counter 傳遞給線程時(shí),就會(huì)發(fā)生所有權(quán)轉(zhuǎn)移,這樣就會(huì)導(dǎo)致其它的線程無(wú)法獲取 counter 的所有權(quán),導(dǎo)致編譯報(bào)錯(cuò)。

我們需要使用 Arc 來(lái)解決這個(gè)問(wèn)題。

Arc

Arc 是原子引用計(jì)數(shù),它可以在多個(gè)線程之間共享數(shù)據(jù),它的內(nèi)部實(shí)現(xiàn)是通過(guò)原子操作來(lái)實(shí)現(xiàn)的,所以它是線程安全的。

我們可以通過(guò) Arc::new 來(lái)創(chuàng)建一個(gè) Arc 對(duì)象,然后通過(guò) Arc::clone 來(lái)克隆一個(gè) Arc 對(duì)象,這樣就可以在多個(gè)線程之間共享數(shù)據(jù)了。

use std::sync::{Arc, Mutex};
use std::thread;

let counter = Arc::new(Mutex::new(0));

let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut value = counter.lock().unwrap();
*value += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());

Condvar

Condvar 是一個(gè)條件變量,它可以讓線程等待某個(gè)條件滿足,然后再執(zhí)行。比如:

use std::sync::{Arc, Condvar, Mutex};

let pair = Arc::new((Mutex::new(false), Condvar::new()));

let pair2 = Arc::clone(&pair);

let thread1 = std::thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
});

let (lock, cvar) = &*pair;

let mut started = lock.lock().unwrap();

while !*started {
started = cvar.wait(started).unwrap();
}

thread1.join().unwrap();

上面的代碼中,我們創(chuàng)建了一個(gè) pair,它是一個(gè)元組,第一個(gè)元素是一個(gè) Mutex,第二個(gè)元素是一個(gè) Condvar。然后我們創(chuàng)建了一個(gè)線程 thread1,它會(huì)將 Mutex 中的值設(shè)置為 true,然后調(diào)用 Condvar 的 notify_one 方法,通知 Condvar 等待的線程。

而在主線程中,我們會(huì)調(diào)用 Condvar 的 wait 方法,等待 Condvar 的通知,當(dāng)主線程收到通知后,就會(huì)繼續(xù)執(zhí)行。

使用 Channel 處理并發(fā)

讀到這里,你可能會(huì)說(shuō)了,我們使用 Mutex、Arc、Condvar 等方式來(lái)處理并發(fā),看起來(lái)很麻煩呀?其實(shí),Rust 中還有一種更簡(jiǎn)單的方式來(lái)處理并發(fā),那就是通過(guò) Channel。

Channel 的本質(zhì)是一個(gè)消息隊(duì)列,它可以讓多個(gè)線程之間進(jìn)行消息通信,把讀者和寫者分離。根據(jù)讀者和寫者的數(shù)量,Channel 可以分為下面的幾個(gè)類型:

  • 單生產(chǎn)者單消費(fèi)者(Single Producer, Single Consumer, SPSC)
  • 單生產(chǎn)者多消費(fèi)者(Single Producer, Multiple Consumer, SPMC)
  • 多生產(chǎn)者單消費(fèi)者(Multiple Producer, Single Consumer, MPSC)
  • 多生產(chǎn)者多消費(fèi)者(Multiple Producer, Multiple Consumer, MPMC)

其中 MPSC 是最常用的,在 Rust 中,它是通過(guò) std::sync::mpsc 模塊來(lái)實(shí)現(xiàn)的。我們來(lái)看看它是如何使用的。

use std::sync::mpsc;

let (s, r) = mpsc::channel();

let s1 = mpsc::Sender::clone(&s);

std::thread::spawn(move || {
let val = String::from("hi");
s1.send(val).unwrap();
});

let received = r.recv().unwrap();

println!("Got: {}", received);

上面的代碼中,我們創(chuàng)建了一個(gè) Channel,它是一個(gè)元組,第一個(gè)元素是一個(gè) Sender,第二個(gè)元素是一個(gè) Receiver。Sender 用來(lái)發(fā)送消息,Receiver 用來(lái)接收消息。

我們通過(guò) mpsc::Sender::clone 方法來(lái)克隆一個(gè) Sender,然后將克隆的 Sender 傳遞給線程,線程中通過(guò) Sender 的 send 發(fā)送消息。而在主線程中,我們通過(guò) Receiver 的 recv 方法來(lái)接收消息。

實(shí)現(xiàn)一個(gè) Channel

接下來(lái)我們基于 Arc、Mutex、Condvar 來(lái)實(shí)現(xiàn)一個(gè) Channel,它的功能和 std::sync::mpsc 中的 channel 類似,支持多生產(chǎn)者單消費(fèi)者。

1、創(chuàng)建項(xiàng)目

首先我們通過(guò) cargo new my-channel --lib 來(lái)創(chuàng)建一個(gè)庫(kù)項(xiàng)目,然后在 Cargo.toml 中添加依賴:

[dependencies]
anyhow="1.0.40"

anyhow 是一個(gè)錯(cuò)誤處理庫(kù),它可以讓我們更方便的處理錯(cuò)誤。

2、整體設(shè)計(jì)

對(duì)外暴露一個(gè) channel 函數(shù),它返回一個(gè) Sender 和 Receiver,Sender 用來(lái)發(fā)送消息,Receiver 用來(lái)接收消息。

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
todo!()
}

因此關(guān)鍵的數(shù)據(jù)結(jié)構(gòu)就是 Sender 和 Receiver,它們都需要持有一個(gè)共享的內(nèi)部數(shù)據(jù)結(jié)構(gòu),我們將其命名為 Inner,它的定義如下:

// src/lib.rs
use anyhow::{anyhow, Ok, Result};
use std::{
collections::VecDeque,
sync::{atomic::AtomicUsize, Arc, Condvar, Mutex},
};

struct Inner<T> {
// 共享的數(shù)據(jù)
data: Mutex<VecDeque<T>>,
// 條件變量
condvar: Condvar,
// 發(fā)送者數(shù)量,使用原子操作
senders: AtomicUsize,
// 接收者數(shù)量,使用原子操作
receivers: AtomicUsize,
}

pub struct Sender<T> {
inner: Arc<Inner<T>>,
}

pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}

OK,確定了數(shù)據(jù)結(jié)構(gòu)之后,我們來(lái)實(shí)現(xiàn) Sender 和 Receiver 的行為。

3、實(shí)現(xiàn) Sender

首先我們來(lái)實(shí)現(xiàn) Sender:

impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<()> {
todo!()
}

pub fn get_receivers_count(&self) -> usize {
todo!()
}
}

我們需要實(shí)現(xiàn)下面的方法:

  • send 方法,用來(lái)發(fā)送消息。
  • get_receivers_count 方法,用來(lái)獲取接收者的數(shù)量。

具體實(shí)現(xiàn)如下:

impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<()> {
// 如果沒(méi)有接收者了,就拋錯(cuò)
if self.get_receivers_count() == 0 {
return Err(anyhow!("no more receivers"));
}
let mut data = self.inner.data.lock().unwrap();
data.push_back(value);
// 通知接收者
self.inner.condvar.notify_one();
Ok(())
}

pub fn get_receivers_count(&self) -> usize {
self.inner
.receivers
.load(std::sync::atomic::Ordering::SeqCst)
}
}

上面的代碼中,我們通過(guò) get_receivers_count 方法來(lái)獲取接收者的數(shù)量,如果沒(méi)有接收者了,就拋錯(cuò)。然后我們通過(guò) Mutex 的 lock 方法來(lái)獲取鎖,然后將消息放入隊(duì)列中,最后通過(guò) Condvar 的 notify_one 方法來(lái)通知接收者。

4、實(shí)現(xiàn) Receiver

接下來(lái)我們來(lái)實(shí)現(xiàn) Receiver:

impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T> {
todo!()
}

pub fn get_senders_count(&self) -> usize {
todo!()
}
}

我們需要實(shí)現(xiàn)下面的方法:

  • recv 方法,用來(lái)接收消息。
  • get_senders_count 方法,用來(lái)獲取發(fā)送者的數(shù)量。

具體實(shí)現(xiàn)如下:

impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T> {
let mut data = self.inner.data.lock().unwrap();
loop {
// 如果沒(méi)有發(fā)送者了,就拋錯(cuò)
if self.get_senders_count() == 0 {
return Err(anyhow!("no more senders"));
}
// 如果隊(duì)列中有消息,就返回
if let Some(value) = data.pop_front() {
return Ok(value);
}
// 如果隊(duì)列中沒(méi)有消息,就等待
data = self.inner.condvar.wait(data).unwrap();
}
}

pub fn get_senders_count(&self) -> usize {
self.inner
.senders
.load(std::sync::atomic::Ordering::SeqCst)
}
}

上面的代碼中,我們通過(guò) get_senders_count 方法來(lái)獲取發(fā)送者的數(shù)量,如果沒(méi)有發(fā)送者了,就拋錯(cuò)。

然后我們通過(guò) Mutex 的 lock 方法來(lái)獲取鎖,通過(guò) Condvar 的 wait 方法來(lái)等待消息,如果隊(duì)列中有消息,就返回,如果隊(duì)列中沒(méi)有消息,就繼續(xù)等待,直到有消息為止。

當(dāng)然,我們還需要實(shí)現(xiàn) Drop trait,當(dāng) Sender 或者 Receiver 被釋放時(shí),我們需要更新發(fā)送者數(shù)量或者接收者數(shù)量:

impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner
.senders
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner
.receivers
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}

5、實(shí)現(xiàn) channel 函數(shù)

最后我們來(lái)實(shí)現(xiàn) channel 函數(shù):

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
data: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
senders: AtomicUsize::new(1),
receivers: AtomicUsize::new(1),
});
(
Sender {
inner: inner.clone(),
},
Receiver { inner },
)
}

我們通過(guò) Arc 來(lái)包裝 Inner,然后創(chuàng)建一個(gè) Sender 和一個(gè) Receiver,最后返回。

6、測(cè)試

我們來(lái)測(cè)試一下目前的 channel 能否正常工作:

#[test]
fn test_channel() {
let (mut s, r) = channel();
let mut s1 = s.clone();
let mut s2 = s.clone();
let t = thread::spawn(move || {
s.send(1).unwrap();
});
let t1 = thread::spawn(move || {
s1.send(10).unwrap();
});
let t2 = thread::spawn(move || {
s2.send(100).unwrap();
});
for handle in [t, t1, t2] {
handle.join().unwrap();
}

let mut result = [r.recv().unwrap(), r.recv().unwrap(), r.recv().unwrap()];
// 保證順序的穩(wěn)定
result.sort();

assert_eq!(result, [1, 10, 100]);
}

#[test]
fn with_no_senders() {
let (s, r) = channel::<i32>();
drop(s);
assert!(r.recv().is_err());
}

#[test]
fn with_no_receivers() {
let (mut s, _) = channel::<i32>();
assert!(s.send(1).is_err());
}

OK,目前的 channel 已經(jīng)可以正常工作了。

總結(jié)

這篇文章中,我們介紹了 Rust 中并發(fā)的基礎(chǔ)概念,包括 Mutex、Condvar、Arc、Atomic 等,然后我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 MPSC channel,即多生產(chǎn)者單消費(fèi)者模型,理解了 channel 內(nèi)部的實(shí)現(xiàn)原理,其內(nèi)部也是基于 Mutex 和 Condvar 這些基礎(chǔ)的原語(yǔ)來(lái)實(shí)現(xiàn)的。

責(zé)任編輯:姜華 來(lái)源: 三元同學(xué)
相關(guān)推薦

2023-08-01 09:00:00

高并發(fā)性能優(yōu)化

2021-10-28 07:10:21

rollupPlugin插件編寫

2024-06-27 07:56:49

2020-09-08 18:37:49

TypeScript開(kāi)發(fā)前端

2022-09-20 14:04:09

SSD

2020-09-24 11:46:03

Promise

2017-08-03 08:34:54

gRPCCRust

2022-03-04 10:07:45

Go語(yǔ)言字節(jié)池

2021-08-15 22:52:30

前端H5拼圖

2024-04-01 09:24:39

2021-07-12 07:33:31

Nacos微服務(wù)管理

2024-11-25 09:10:03

2025-01-16 10:46:31

2013-12-18 13:30:19

Linux運(yùn)維Linux學(xué)習(xí)Linux入門

2024-02-28 10:13:25

Rust語(yǔ)言開(kāi)發(fā)

2021-08-07 21:51:17

服務(wù)器網(wǎng)站部署

2023-01-12 22:00:48

2021-12-12 18:15:06

Python并發(fā)編程

2024-04-26 08:17:09

GoGoogle項(xiàng)目
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)