在 Python 中接管鍵盤(pán)中斷信號(hào)
假設(shè)有這樣一個(gè)需求,你需要從 Redis 中持續(xù)不斷讀取數(shù)據(jù),并把這些數(shù)據(jù)寫(xiě)入到 MongoDB 中。你可能會(huì)這樣寫(xiě)代碼:
- import json
- import redis
- import pymongo
- client = redis.Redis()
- handler = pymongo.MongoClient().example.col
- while True:
- data_raw = client.blpop('data', timeout=300)
- if not data_raw:
- continue
- data = json.loads(data_raw[1].decode())
- handler.insert_one(data)
但這樣寫(xiě)有一個(gè)問(wèn)題,就是每來(lái)一條數(shù)據(jù)都要連接一次 MongoDB,大量時(shí)間浪費(fèi)在了網(wǎng)絡(luò) I/O上。
于是大家會(huì)把代碼改成下面這樣:
- import json
- import redis
- import pymongo
- client = redis.Redis()
- handler = pymongo.MongoClient().example.col
- to_be_insert = []
- while True:
- data_raw = client.blpop('data', timeout=300)
- if not data_raw:
- continue
- data = json.loads(data_raw[1].decode())
- to_be_insert.append(data)
- if len(to_be_insert) >= 1000:
- handler.insert_many(to_be_insert)
- to_be_insert = []
每湊夠1000條數(shù)據(jù),批量寫(xiě)入到 MongoDB 中。
現(xiàn)在又面臨另外一個(gè)問(wèn)題。假設(shè)因?yàn)槟撤N原因,我需要更新這個(gè)程序,于是我按下了鍵盤(pán)上的Ctrl + C強(qiáng)制關(guān)閉了這個(gè)程序。而此時(shí)to_be_insert列表里面有999條數(shù)據(jù)將會(huì)永久丟失——它們已經(jīng)被從 Redis 中刪除了,但又沒(méi)有來(lái)得及寫(xiě)入 MongoDB 中。
我想實(shí)現(xiàn),當(dāng)我按下 Ctrl + C 時(shí),程序不再?gòu)?Redis 中讀取數(shù)據(jù),但會(huì)先把to_be_insert中的數(shù)據(jù)(無(wú)論有幾條)都插入 MongoDB 中。最后再關(guān)閉程序。
要實(shí)現(xiàn)這個(gè)需求,就必須在我們按下Ctrl + C時(shí),程序還能繼續(xù)運(yùn)行一段代碼??蓡?wèn)題是按下Ctrl + C時(shí),程序就直接結(jié)束了,如何還能再運(yùn)行一段代碼?
實(shí)際上,當(dāng)我們按下鍵盤(pán)上的Ctrl + C時(shí),Python 收到一個(gè)名為SIGINT的信號(hào)。具體規(guī)則可以閱讀官方文檔。收到信號(hào)以后,Python 會(huì)調(diào)用一個(gè)信號(hào)回調(diào)函數(shù)。只不過(guò)默認(rèn)的回調(diào)函數(shù)就是讓程序拋出一個(gè) KeyboardInterrupt異常導(dǎo)致程序關(guān)閉?,F(xiàn)在,我們可以設(shè)法讓 Python 使用我們自定義的一段函數(shù)來(lái)作為信號(hào)回調(diào)函數(shù)。
要使用信號(hào),我們需用導(dǎo)入 Python 的signal庫(kù)。然后自定義一個(gè)信號(hào)回調(diào)函數(shù),當(dāng) Python 收到某個(gè)信號(hào)時(shí),調(diào)用這個(gè)函數(shù)。
所以我們修改一下上面的代碼:
- import signal
- import json
- import redis
- import pymongo
- client = redis.Redis()
- handler = pymongo.MongoClient().example.col
- stop = False
- def keyboard_handler(signum, frame):
- global stop
- stop = True
- signal.signal(signal.SIGINT, keyboard_handler)
- to_be_insert = []
- while not stop:
- data_raw = client.blpop('data', timeout=300)
- if not data_raw:
- continue
- data = json.loads(data_raw[1].decode())
- to_be_insert.append(data)
- if len(to_be_insert) >= 1000:
- handler.insert_many(to_be_insert)
- to_be_insert = []
- if to_be_insert:
- handler.insert_many(to_be_insert)
我們定義了一個(gè)全局變量stop,默認(rèn)為 False,所以默認(rèn)情況下,while not stop所在的循環(huán)體會(huì)持續(xù)運(yùn)行。
我們定義了一個(gè)函數(shù)keyboard_handler,它的作用是修改全局變量stop為 True。需要注意的是,在函數(shù)里面修改全局變量,必須先使用global 變量名聲明這個(gè)變量為全局變量。否則無(wú)法修改。
修改以后,while not stop循環(huán)停止,于是程序進(jìn)入:
- if to_be_insert:
- handler.insert_many(to_be_insert)
只要列表里面有數(shù)據(jù),就會(huì)批量插入 MongoDB 中。然后程序結(jié)束。
整段代碼的關(guān)鍵就在signal.signal(signal.SIGINT, keyboard_handler)這里把信號(hào)SIGINT與函數(shù)keyboard_handler關(guān)聯(lián)上了,于是,在上面這段代碼運(yùn)行的任何時(shí)候,只要按下鍵盤(pán)的Ctrl + C,程序就會(huì)進(jìn)入keyboard_handler函數(shù)里面,優(yōu)先執(zhí)行這個(gè)函數(shù)里面的代碼。執(zhí)行完成以后,回到之前中斷的地方,繼續(xù)執(zhí)行之前沒(méi)有完成的代碼。而由于在函數(shù)里面我已經(jīng)修改了stop的值,所以原來(lái)的循環(huán)不能繼續(xù)執(zhí)行,于是進(jìn)入最后的收尾工作。
需要注意的是,如果你的整個(gè)代碼全都是使用 Python 寫(xiě)的,那么 signal可以在你程序的任何階段觸發(fā),只要你按下 Ctrl + C,立刻就會(huì)進(jìn)入設(shè)置好的信號(hào)回調(diào)函數(shù)中。
但如果你的代碼中,有一部分代碼是使用 C 語(yǔ)言寫(xiě)的,那么當(dāng)你按下Ctrl + C以后,可能需要等這段C 語(yǔ)言的代碼運(yùn)行完成以后,才會(huì)進(jìn)入你設(shè)置的信號(hào)回調(diào)函數(shù)中。