Python線程安全之鎖、信號量
在Realpython看到一篇關(guān)于線程安全的文章,覺得非常哇塞,分享給大家,今天先講前半部分。
提到線程必須了解兩個術(shù)語:
? 并發(fā)Concurrency,系統(tǒng)具備處理多個任務(wù)的能力,它們的在執(zhí)行在時間上重疊,但不一定同時發(fā)生。
? 并行Parallelism:多個任務(wù)利用多核CPU真正同時執(zhí)行。
Python的線程是一個并發(fā)框架,線程并行運行的時候,每個線程執(zhí)行代碼的一部分,Python解釋器在它們之間切換,將執(zhí)行控制權(quán)交給每個線程。
理解線程并行
先簡單舉個例子:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def threaded_function():
for number in range(3):
print(f"Printing from {threading.current_thread().name}. {number=}")
time.sleep(0.1)
with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
for _ in range(4):
executor.submit(threaded_function)先打印輸出:
Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2啟動了四個線程,可以觀察到在Worker_0打印number=0之后 ,并不會立刻打印number=1,原因就在于要切換給其他線程運行,而且各個線程的運行順序是不一樣的。
如何做到這個的呢?因為python解析器會進行上下文切換,默認的間隔時間如下列代碼:
import sys
sys.getswitchinterval()
0.0055毫秒的間隔并不意味著線程會精確地每5毫秒切換一次,而是意味著解釋器會在這些間隔內(nèi)考慮切換到另一個線程,而代碼中的sleep()是為了增加了在此期間發(fā)生上下文切換的可能性。
什么是線程安全
由于上下文切換,程序在多線程環(huán)境中運行時可能會表現(xiàn)出意外行為,這就導致線程不安全問題,而如果代碼在多線程環(huán)境中運行時表現(xiàn)出確定性并產(chǎn)生期望的輸出,那么它就被認為是線程安全的。
線程安全問題通常源于兩個原因:
? 共享可變數(shù)據(jù):線程共享父進程的內(nèi)存,因此所有變量和數(shù)據(jù)結(jié)構(gòu)在各線程之間是共享的。對這些共享數(shù)據(jù)進行修改時可能會引發(fā)錯誤。
? 非原子操作:多線程環(huán)境中,涉及多個步驟的操作可能會被上下文切換中斷,尤其是在執(zhí)行過程中切換到其他線程時,容易導致意外結(jié)果。
1:GIL
在討論Python線程時,Python 3.12之前的GIL(Global Interpreter Lock)不可避免要提到。GIL是一個互斥鎖,目的是保護Python對象的訪問,防止多個線程同時執(zhí)行Python字節(jié)碼。它阻止了真正的線程并行,尤其在CPU密集型任務(wù)中,多線程性能會受到嚴重限制。不過,這意味著對于I/O密集型任務(wù),線程并行仍然適用。
由于GIL的存在,當某個操作能在單個字節(jié)碼指令中完成時,它是原子的。那么,Python是否因此天然線程安全?并非如此。因為I/O操作仍然可以并行執(zhí)行,因此即使有GIL,訪問共享可變數(shù)據(jù)時依然需要鎖等同步機制確保線程安全。
GIL是否完全消除了Python的多線程并發(fā)能力?并沒有,Python支持通過多進程來實現(xiàn)真正的并行。
值得關(guān)注的是,從Python 3.13開始,Python提供了無GIL的解釋器,實現(xiàn)了真正的線程并行。但無論是否有GIL,編寫代碼時始終建議合理地保護線程安全——也就是說,不依賴GIL,主動保證線程安全。
2:競爭
現(xiàn)在來看看第二個核心概念,競爭條件發(fā)生在程序的結(jié)果依賴于不可控事件的順序或時間,如線程執(zhí)行順序時,如果沒有適當?shù)耐綍е鲁绦虺霈F(xiàn)不可預測的錯誤。
下面的例子就來模擬這種情況,兩個線程同時修改一個屬性:
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def withdraw(self, amount):
if self.balance >= amount:
new_balance = self.balance - amount
time.sleep(0.1)
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(account.withdraw, 500)
executor.submit(account.withdraw, 700)
print(f"Final account balance: {account.balance}")先猜可能的結(jié)果,代碼可能會輸出:
Final account balance: 300也可能會輸出:
Final account balance: 500為什么會出現(xiàn)這樣的情況呢?這就是由于線程執(zhí)行順序不一致導致的,如果先扣700,而第二個線程突然切換過來了,檢查余額夠,最終就扣了500,余額就變成500了,當然結(jié)果是錯誤的。
同步原語
為了解決線程不安全問題,Python的threading模塊提供了各種同步原語,以防止競爭條件并允許線程之間的協(xié)調(diào)。
同步原語會:
? 控制線程同時執(zhí)行代碼塊
? 使多個代碼語句對線程來說是原子的
? 限制線程的并發(fā)訪問
? 在線程之間進行協(xié)調(diào),并根據(jù)其他線程的狀態(tài)執(zhí)行操作
接下去使用Python線程鎖和信號實現(xiàn)互斥。
使用Python線程鎖實現(xiàn)互斥
鎖是一種同步原語,可用于獨占訪問資源,一旦一個線程獲取了鎖,其他線程就不能再獲取它并繼續(xù)執(zhí)行,直到鎖被釋放,可以使用鎖來封裝應(yīng)該原子執(zhí)行的語句或語句組。
python提供兩個lock相關(guān)的函數(shù):
? 當一個線程調(diào)用.acquire()方法時,如果Lock對象已經(jīng)被另一個線程鎖定,那么調(diào)用的線程會被阻塞,直到持有鎖的線程釋放鎖。
? release() 會釋放一個被線程獲取的鎖,如果嘗試釋放一個未鎖定的鎖,會引發(fā)RuntimeError。
如果使用with語句,Lock 對象可用作上下文管理器,可以自動獲取和釋放鎖。
為了解決上面代碼存在的問題,可以:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.account_lock = threading.Lock()
def withdraw(self, amount):
with self.account_lock:
if self.balance >= amount:
new_balance = self.balance - amount
print(f"Withdrawing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
def deposit(self, amount):
with self.account_lock:
new_balance = self.balance + amount
print(f"Depositing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(account.withdraw, 700)
executor.submit(account.deposit, 1000)
executor.submit(account.withdraw, 300)
print(f"Final account balance: {account.balance}")上述代碼通過鎖成功保證了線性安全。
如果由于代碼中的錯誤或疏忽導致鎖未正確釋放,可能會導致死鎖,即線程無限期地等待鎖被釋放。
死鎖的原因包括:
? 嵌套鎖獲取:如果一個線程嘗試獲取它已經(jīng)持有的鎖,可能會發(fā)生死鎖,同一線程嘗試多次獲取相同的鎖會導致線程阻塞自身,這種情況在沒有外部干預的情況下無法解決。
? 多重鎖獲?。寒斒褂枚鄠€鎖時,如果線程以不一致的順序獲取這些鎖,可能會發(fā)生死鎖,如果兩個線程各自持有一個鎖并等待對方釋放鎖,那么兩個線程都無法繼續(xù),從而導致死鎖。
對于多重鎖可以使用可重入鎖RLock解決,當持有線程再次請求鎖時,它不會阻塞,允許線程在釋放鎖之前多次獲取鎖,這在遞歸函數(shù)或線程需要重新進入已鎖定資源的情況下非常有用,相對來說,RLock因為要跟蹤同一線程獲取鎖的次數(shù),會有性能開銷。
Semaphores信號量
在資源數(shù)量有限且多個線程嘗試訪問這些有限資源時非常有用,它使用一個計數(shù)器來限制多個線程對臨界區(qū)的訪問,每次調(diào)用.acquire() 都會將信號量的計數(shù)器減少一個,當計數(shù)器達到零時,再.acquire() 調(diào)用將被阻塞。
舉一個例子,多個客戶在銀行等待有限數(shù)量的柜員服務(wù):
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)
def now():
return time.strftime("%H:%M:%S")
def serve_customer(name):
print(f"{now()}: {name} is waiting for a teller.")
with teller_semaphore:
print(f"{now()}: {name} is being served by a teller.")
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} is done being served.")
customers = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=3) as executor:
for customer_name in customers:
thread = executor.submit(serve_customer, customer_name)
print(f"{now()}: All customers have been served.")代碼很好理解,當某個線程達到計數(shù)器上限后,它會被阻塞,直到其他線程在with語句中因為完成服務(wù)而釋放,但不管怎么樣,每次只有三個客戶被服務(wù)。
參考:https://realpython.com/python-thread-lock/#using-python-threading-locks-for-mutual-exclusion
























