聊聊 Python 中的同步原語,為什么有了 GIL 還需要同步原語
前言
? 在前面的文章中我們介紹了 Python 中的全局解釋器鎖 GIL,我們知道 GIL 可以保證在多線程場景下同一時刻只有一個線程運行,但是并不能保證線程安全(所謂線程安全簡單來說就是程序在多線程環(huán)境中運行時,線程在交替運行時能正常的訪問共享資源,不會造成數(shù)據(jù)不一致或者死鎖,最后都能達到預期的結果),比如我們看下面的兩個例子:
對 counter 進行累加
import threading
import time
counter = 0
temp_count = 0
def increment():
global counter, temp_count
for _ in range(1000):
counter += 1
temp = temp_count
time.sleep(0.0001)
temp_count = temp + 1
start = time.time()
threads = []
for _ in range(10):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
end = time.time()
print("Final counter value:", counter)
print("Final temp_count value:", temp_count)
print(f"總共耗時:{end - start}")
# 運行結果
Final counter value: 10000
Final temp_count value: 1001
總共耗時:0.5465419292449951
? 上面我們對 counter 做多線程累積時,盡管 counter += 1 是非原子操作,但是由于 CPU 執(zhí)行太快,因此我們很難復現(xiàn)線程不安全的情況,因此我們使用 temp_count 寫法進行手動模擬。
賬戶取款
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
def withdraw(self, amount):
if self.balance >= amount:
# 發(fā)生線程切換
self.balance -= amount
print(f"Withdrawal successful. Balance: {self.balance}")
else:
print("Insufficient funds")
def deposit(self, amount):
self.balance += amount
print(f"Deposit successful. Balance: {self.balance}")
if __name__ == "__main__":
account = BankAccount(1000)
# 創(chuàng)建多個線程進行取款存款操作
threads = []
for _ in range(5):
t = threading.Thread(target=account.withdraw, args=(account, 200))
threads.append(t)
t.start()
for t in threads:
t.join()
? 上面的代碼同樣是線程不安全的,考慮這個場景,如果此時賬戶余額中剩余200,線程1執(zhí)行完 self.balance >= amount 后切換到線程2,線程2正常取款200,然后切換回線程1,導致此時余額為-2200。
使用同步原語保證線程安全
? 從上面的兩個案例中我們可以看出,GIL 并不能保證線程安全,我們需要使用同步原語來進行線程同步保證線程安全。
locked、release 顯式獲取鎖和釋放鎖
? 在一些比較老的 python 代碼中,我們可以看到很多使用 locked、release 顯式獲取鎖和釋放鎖 的用法。
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
self.lock.locked()
if self.balance >= amount:
self.balance -= amount
print(f"Withdrawal successful. Balance: {self.balance}")
else:
print("Insufficient funds")
self.lock.release()
def deposit(self, amount):
self.lock.locked()
self.balance += amount
print(f"Deposit successful. Balance: {self.balance}")
self.lock.release()
if __name__ == "__main__":
account = BankAccount(1000)
# 創(chuàng)建多個線程進行取款存款操作
threads = []
for _ in range(5):
t = threading.Thread(target=account.withdraw, args=(account, 200))
threads.append(t)
t.start()
for t in threads:
t.join()
使用 with 語句同步原理
? 相比于這種顯式調(diào)用的方法,with 語句更加優(yōu)雅,也更不容易出錯,特別是程序員可能會忘記調(diào)用 release() 方法或者程序在獲得鎖之后產(chǎn)生異常這兩種情況(使用 with 語句可以保證在這兩種情況下仍能正確釋放鎖)。
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
print(f"Withdrawal successful. Balance: {self.balance}")
else:
print("Insufficient funds")
def deposit(self, amount):
with self.lock:
self.balance += amount
print(f"Deposit successful. Balance: {self.balance}")
if __name__ == "__main__":
account = BankAccount(1000)
# 創(chuàng)建多個線程進行取款存款操作
threads = []
for _ in range(5):
t = threading.Thread(target=account.withdraw, args=(account, 200))
threads.append(t)
t.start()
for t in threads:
t.join()
其它支持同步原語:RLock 和 Semaphore
RLock
? 一個 RLock (可重入鎖)可以被同一個線程多次獲取,主要用來實現(xiàn)基于監(jiān)測對象模式的鎖定和同步。在使用這種鎖的情況下,當鎖被持有時,只有一個線程可以使用完整的函數(shù)或者類中的方法。
import threading
class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
_lock = threading.RLock()
def __init__(self, initial_value = 0):
self._value = initial_value
def incr(self,delta=1):
'''
Increment the counter with locking
'''
with SharedCounter._lock:
self._value += delta
def decr(self,delta=1):
'''
Decrement the counter with locking
'''
with SharedCounter._lock:
self.incr(-delta)
? 在上邊這個例子中,沒有對每一個實例中的可變對象加鎖,取而代之的是一個被所有實例共享的類級鎖。這個鎖用來同步類方法,具體來說就是,這個鎖可以保證一次只有一個線程可以調(diào)用這個類方法。不過,與一個標準的鎖不同的是,已經(jīng)持有這個鎖的方法在調(diào)用同樣使用這個鎖的方法時,無需再次獲取鎖。比如 decr 方法。 這種實現(xiàn)方式的一個特點是,無論這個類有多少個實例都只用一個鎖。因此在需要大量使用計數(shù)器的情況下內(nèi)存效率更高。不過這樣做也有缺點,就是在程序中使用大量線程并頻繁更新計數(shù)器時會有爭用鎖的問題。
Semaphore
? 信號量對象是一個建立在共享計數(shù)器基礎上的同步原語。如果計數(shù)器不為0,with 語句將計數(shù)器減1,線程被允許執(zhí)行。with 語句執(zhí)行結束后,計數(shù)器加1。如果計數(shù)器為0,線程將被阻塞,直到其他線程結束將計數(shù)器加1。
import urllib.request
from threading import Semaphore
# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)
def fetch_url(url):
with _fetch_url_sema:
return urllib.request.urlopen(url)