手把手教你在Windows下設(shè)置分布式隊(duì)列Celery的心跳輪詢
1 前言
大家好,我是吳老板。用Celery 官方的話來說,Celery 是一個(gè)非常優(yōu)秀的分布式隊(duì)列,可應(yīng)用于分布式共享中間隊(duì)列和定時(shí)任務(wù)等等。
2 版本的差異
Celery 有很多個(gè)版本,各版本之間的差異可謂不小,比如最新的 Celery6.0 版本在穩(wěn)定性遠(yuǎn)不如 Celery4.0,所以在使用不同版本的時(shí)候,系統(tǒng)給到我們的反饋可能并不能如我們所愿。
3 服務(wù)
在 windows 下掛在 Celery 服務(wù)有時(shí)候會(huì)出現(xiàn)不穩(wěn)定的情況(unix中暫時(shí)未發(fā)現(xiàn)這種情況),比如在執(zhí)行定時(shí)任務(wù)的時(shí)候,過了一段時(shí)間之后,Celery 出現(xiàn)了假死狀態(tài),以至于不能按照我們指定的時(shí)間點(diǎn)去執(zhí)行任務(wù)。
這些任務(wù)只是加入到待運(yùn)行隊(duì)列中(堆積在 Redis 中),只能人為重啟 Celery 服務(wù)之后才能將堆積的任務(wù)釋放出來運(yùn)行。
這樣一來,第一是定時(shí)任務(wù)在指定時(shí)間點(diǎn)沒有正常運(yùn)行,其二是在其他時(shí)間運(yùn)行了這些任務(wù),很可能會(huì)產(chǎn)生更新數(shù)據(jù)不及時(shí),時(shí)間節(jié)點(diǎn)混亂的問題,不僅達(dá)不到業(yè)務(wù)需求,還會(huì)反受其害。
4 設(shè)置心跳
為了解決 Celery 在 windows 中的這種弊端,可以為 Celery 任務(wù)隊(duì)列設(shè)置一個(gè)心跳時(shí)間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊(duì)列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊(duì)列服務(wù)就不會(huì)出現(xiàn)假死狀態(tài)。
5 舉個(gè)栗子
我總是很喜歡用示例來說話,前些時(shí)間在對(duì)某平臺(tái)的商家后臺(tái)進(jìn)行數(shù)據(jù)采集的時(shí)候,為了使用時(shí)能自動(dòng)獲取該網(wǎng)站的 cookie ,
用Pyppeteer 寫了一個(gè)自動(dòng)化登陸的腳本,和往常一樣仍在 Celery 隊(duì)列中并迅速的啟動(dòng)服務(wù)。
腳本是這樣的(非常接近實(shí)際的偽代碼,沒辦法,保命要緊)
- # -*- coding: utf-8 -*-
 - from db.redisCurd import RedisQueue
 - import asyncio
 - import random
 - import tkinter
 - from pyppeteer.launcher import launch
 - from platLogin.config import USERNAME, PASSWORD, LOGIN_URL
 - class Login():
 - def __init__(self, shopId):
 - self.shopId = shopId
 - self.RedisQueue = RedisQueue("cookie")
 - def screen_size(self):
 - tk = tkinter.Tk()
 - width = tk.winfo_screenwidth()
 - height = tk.winfo_screenheight()
 - tk.quit()
 - return {'width': width, 'height': height}
 - async def login(self, username, password, url):
 - browser = await launch(
 - {
 - 'headless': False,
 - 'dumpio': True
 - },
 - args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'],
 - )
 - page = await browser.newPage() # 啟動(dòng)新的瀏覽器頁面
 - try:
 - await page.setViewport(viewport=self.screen_size())
 - await page.setJavaScriptEnabled(enabled=True) # 啟用js
 - await page.setUserAgent(
 - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299'
 - )
 - await self.page_evaluate(page)
 - await page.goto(url)
 - await asyncio.sleep(2)
 - # 輸入用戶名,密碼
 - await page.evaluate(f'document.querySelector("#userName").value=""')
 - await page.type('#userName', username, {'delay': self.input_time_random() - 50}) # delay是限制輸入的時(shí)間
 - await page.evaluate('document.querySelector("#passWord").value=""')
 - await page.type('#passWord', password, {'delay': self.input_time_random()})
 - await page.waitFor(6000)
 - loginImgVcode = await page.waitForSelector('#checkCode')
 - await loginImgVcode.screenshot({'path': './loginImg.png'})
 - await page.waitFor(6000)
 - res = use_cjy("./loginImg.png")
 - pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234"
 - await page.waitFor(6000)
 - await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})
 - await page.waitFor(6000)
 - await page.click('#subMit')
 - await page.waitFor(6000)
 - await asyncio.sleep(2)
 - await self.get_cookie(page)
 - await page.waitFor(3000)
 - await self.page_close(browser)
 - return {'code': 200, 'msg': '登陸成功'}
 - except:
 - return {'code': -1, 'msg': '出錯(cuò)'}
 - finally:
 - await page.waitFor(3000)
 - await self.page_close(browser)
 - # 獲取登錄后cookie
 - async def get_cookie(self, page):
 - cookies_list = await page.cookies()
 - cookies = ''
 - for cookie in cookies_list:
 - str_cookie = '{0}={1}; '
 - str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
 - cookies += str_cookie
 - # 將cookie 放入 cookie 池
 - self.RedisQueue.put_hash(self.shopId, cookies)
 - return cookies
 - async def page_evaluate(self, page):
 - await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
 - await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
 - await page.evaluate(
 - '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
 - await page.evaluate(
 - '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
 - await page.waitFor(3000)
 - async def page_close(self, browser):
 - for _page in await browser.pages():
 - await _page.close()
 - await browser.close()
 - def input_time_random(self):
 - return random.randint(100, 151)
 - def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):
 - loop = asyncio.get_event_loop()
 - i_future = asyncio.ensure_future(self.login(username, password, url))
 - loop.run_until_complete(i_future)
 - return i_future.result()
 - if __name__ == '__main__':
 - Z = Login(shopId="001")
 - Z.run()
 
Celery 任務(wù)文件是這樣的
- # -*- coding: utf-8 -*-
 - from __future__ import absolute_import
 - import os
 - import sys
 - import time
 - from db.redisCurd import RedisQueue
 - from send_msg.weinxin import Send_msg
 - base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 - sys.path.append(base_dir)
 - from logger.logger import log_v
 - from celery import Task
 - from platLogin.login import Login # 登陸類
 - from celery import Celery
 - randomQueue = RedisQueue("cookie")
 - celery_app = Celery('task')
 - celery_app.config_from_object('celeryConfig')
 - S = Send_msg()
 - dl_dict = {
 - 'demo': {
 - 'cookie': '',
 - 'loginClass': 'Login',
 - }
 - }
 - # todo 這是三種運(yùn)行的狀態(tài)
 - class task_status(Task):
 - def on_success(self, retval, task_id, args, kwargs):
 - log_v.info('任務(wù)信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))
 - def on_failure(self, exc, task_id, args, kwargs, einfo):
 - log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))
 - def on_retry(self, exc, task_id, args, kwargs, einfo):
 - log_v.warning('task id:{} , arg:{} , retry ! info: {}'.format(task_id, args, exc))
 - # todo 隨便找個(gè)hash key作為輪詢對(duì)象, celery在win10系統(tǒng)可能不太穩(wěn)定,有時(shí)候會(huì)有連接斷開的情況
 - @celery_app.task(base=task_status)
 - def get_cookie_status(platName="demo"):
 - try:
 - # log_v.debug(f'[+] 輪詢 {platName} 定時(shí)器啟動(dòng) ..... Done')
 - randomQueue.get_hash(platName).decode()
 - log_v.debug(f'[+] 輪詢 {platName} 成功 ..... Done')
 - return "Erp 輪詢成功"
 - except:
 - return "Erp 輪詢失敗"
 - @celery_app.task(base=task_status)
 - def set_plat_cookie(platName="demo", shopId=None):
 - log_v.debug(f"[+] {platName} 正在登陸")
 - core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)
 - result = core.run()
 - return result
 
Celery 配置文件是這樣的
- from __future__ import absolute_import
 - import datetime
 - from kombu import Exchange, Queue
 - from celery.schedules import crontab
 - from urllib import parse
 - BROKER_URL = f'redis://root:{parse.quote("你的不規(guī)則密碼")}@主機(jī):6379/15'
 - # 導(dǎo)入任務(wù),如tasks.py
 - CELERY_IMPORTS = ('monitor.tasks',)
 - # 列化任務(wù)載荷的默認(rèn)的序列化方式
 - CELERY_TASK_SERIALIZER = 'json'
 - # 結(jié)果序列化方式
 - CELERY_RESULT_SERIALIZER = 'json'
 - CELERY_ACCEPT_CONTENT = ['json']
 - CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時(shí)區(qū),不指定默認(rèn)為 'UTC'
 - # CELERY_TIMEZONE='UTC'
 - CELERYBEAT_SCHEDULE = {
 - 'add-every-60-seconds': {
 - 'task': 'tasks.get_cookie_status',
 - 'schedule': datetime.timedelta(minutes=1), # 每 1 分鐘執(zhí)行一次
 - 'args': () # 任務(wù)函數(shù)參數(shù)
 - },
 - }
 
啟動(dòng)服務(wù)
- celery -A tasks beat -l INFO
 - celery -A tasks worker -l INFO -c 2
 
以 2 個(gè)線程啟動(dòng)消費(fèi)者隊(duì)列服務(wù)并啟用定時(shí)任務(wù),當(dāng)發(fā)現(xiàn)當(dāng)前平臺(tái)的 cookie 不可用時(shí),我會(huì)向 Celery 發(fā)送一個(gè)信號(hào)(就是調(diào)用了前面的set_plat_cookie 這個(gè)方法),消費(fèi)者得到這個(gè)任務(wù)這個(gè)就會(huì)執(zhí)行自動(dòng)化腳本以獲取 cookie 并儲(chǔ)存在 Redis 中,使用時(shí)在從 Redis 中獲取就能正常請(qǐng)求到該平臺(tái)的數(shù)據(jù)。
在空閑時(shí)間,Celery中的 get_cookie_status 方法會(huì)每隔一分鐘向 Redis 請(qǐng)求數(shù)據(jù),這就是我們?cè)O(shè)置的 1分鐘心跳。
這樣不管我們的 Celery 是否是后臺(tái)啟動(dòng),都不會(huì)出現(xiàn)假死、卡死的狀態(tài),則萬事大吉矣!!
6 總結(jié)
本文為了解決 Celery 在 windows 中的這種弊端,為 Celery 任務(wù)隊(duì)列設(shè)置一個(gè)心跳時(shí)間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊(duì)列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊(duì)列服務(wù)都不會(huì)出現(xiàn)假死、卡死的狀態(tài)。















 
 
 








 
 
 
 