版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
第python周期任務(wù)調(diào)度工具Schedule使用詳解目錄1.準(zhǔn)備2.基本使用參數(shù)傳遞獲取目前所有的作業(yè)取消所有作業(yè)標(biāo)簽功能設(shè)定作業(yè)截止時間3.高級使用裝飾器安排作業(yè)并行執(zhí)行日志記錄異常處理如果你想周期性地執(zhí)行某個Python腳本,最出名的選擇應(yīng)該是Crontab腳本,但是Crontab具有以下缺點(diǎn):
1.不方便執(zhí)行秒級任務(wù)。
2.當(dāng)需要執(zhí)行的定時任務(wù)有上百個的時候,Crontab的管理就會特別不方便。
還有一個選擇是Celery,但是Celery的配置比較麻煩,如果你只是需要一個輕量級的調(diào)度工具,Celery不會是一個好選擇。
在你想要使用一個輕量級的任務(wù)調(diào)度工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納Crontab的所有基本功能,那么Schedule模塊是你的不二之選。
使用它來調(diào)度任務(wù)可能只需要幾行代碼,感受一下:
importschedule
importtime
defjob():
print("I'mworking...")
schedule.every(10).minutes.do(job)
whileTrue:
schedule.run_pending()
time.sleep(1)
上面的代碼表示每10分鐘執(zhí)行一次job函數(shù),非常簡單方便。你只需要引入schedule模塊,通過調(diào)用scedule.every(時間數(shù)).時間類型.do(job)發(fā)布周期任務(wù)。
發(fā)布后的周期任務(wù)需要用run_pending函數(shù)來檢測是否執(zhí)行,因此需要一個While循環(huán)不斷地輪詢這個函數(shù)。
下面具體講講Schedule模塊的安裝和初級、進(jìn)階使用方法。
1.準(zhǔn)備
開始之前,你要確保Python和pip已經(jīng)成功安裝在電腦上,請選擇以下任一種方式輸入命令安裝依賴:
Windows環(huán)境打開Cmd(開始-運(yùn)行-CMD)。
MacOS環(huán)境打開Terminal(command+空格輸入Terminal)。
如果你用的是VSCode編輯器或Pycharm,可以直接使用界面下方的Terminal.
pipinstallschedule
2.基本使用
最基本的使用在文首已經(jīng)提到過,下面給大家展示更多的調(diào)度任務(wù)例子:
importschedule
importtime
defjob():
print("I'mworking...")
#每十分鐘執(zhí)行任務(wù)
schedule.every(10).minutes.do(job)
#每個小時執(zhí)行任務(wù)
schedule.every().hour.do(job)
#每天的10:30執(zhí)行任務(wù)
schedule.every().day.at("10:30").do(job)
#每個月執(zhí)行任務(wù)
schedule.every().monday.do(job)
#每個星期三的13:15分執(zhí)行任務(wù)
schedule.every().wednesday.at("13:15").do(job)
#每分鐘的第17秒執(zhí)行任務(wù)
schedule.every().minute.at(":17").do(job)
whileTrue:
schedule.run_pending()
time.sleep(1)
可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只運(yùn)行一次任務(wù)的話,可以這么配
importschedule
importtime
defjob_that_executes_once():
#此處編寫的任務(wù)只會執(zhí)行一次...
returnschedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
whileTrue:
schedule.run_pending()
time.sleep(1)
參數(shù)傳遞
如果你有參數(shù)需要傳遞給作業(yè)去執(zhí)行,你只需要這么做:
importschedule
defgreet(name):
print('Hello',name)
#do()將額外的參數(shù)傳遞給job函數(shù)
schedule.every(2).seconds.do(greet,name='Alice')
schedule.every(4).seconds.do(greet,name='Bob')
獲取目前所有的作業(yè)
如果你想獲取目前所有的作業(yè):
importschedule
defhello():
print('Helloworld')
schedule.every().second.do(hello)
all_jobs=schedule.get_jobs()
取消所有作業(yè)
如果某些機(jī)制觸發(fā)了,你需要立即清除當(dāng)前程序的所有作業(yè):
importschedule
defgreet(name):
print('Hello{}'.format(name))
schedule.every().second.do(greet)
schedule.clear()
標(biāo)簽功能
在設(shè)置作業(yè)的時候,為了后續(xù)方便管理作業(yè),你可以給作業(yè)打個標(biāo)簽,這樣你可以通過標(biāo)簽過濾獲取作業(yè)或取消作業(yè)。
importschedule
defgreet(name):
print('Hello{}'.format(name))
#.tag打標(biāo)簽
schedule.every().day.do(greet,'Andrea').tag('daily-tasks','friend')
schedule.every().hour.do(greet,'John').tag('hourly-tasks','friend')
schedule.every().hour.do(greet,'Monica').tag('hourly-tasks','customer')
schedule.every().day.do(greet,'Derek').tag('daily-tasks','guest')
#get_jobs(標(biāo)簽):可以獲取所有該標(biāo)簽的任務(wù)
friends=schedule.get_jobs('friend')
#取消所有daily-tasks標(biāo)簽的任務(wù)
schedule.clear('daily-tasks')
設(shè)定作業(yè)截止時間
如果你需要讓某個作業(yè)到某個時間截止,你可以通過這個方法:
importschedule
fromdatetimeimportdatetime,timedelta,time
defjob():
print('Boo')
#每個小時運(yùn)行作業(yè),18:30后停止
schedule.every(1).hours.until("18:30").do(job)
#每個小時運(yùn)行作業(yè),2030-01-0118:33today
schedule.every(1).hours.until("2030-01-0118:33").do(job)
#每個小時運(yùn)行作業(yè),8個小時后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
#每個小時運(yùn)行作業(yè),11:32:42后停止
schedule.every(1).hours.until(time(11,33,42)).do(job)
#每個小時運(yùn)行作業(yè),2025-5-1711:36:20后停止
schedule.every(1).hours.until(datetime(2025,5,17,11,36,20)).do(job)
截止日期之后,該作業(yè)將無法運(yùn)行。
立即運(yùn)行所有作業(yè),而不管其安排如何
如果某個機(jī)制觸發(fā)了,你需要立即運(yùn)行所有作業(yè),可以調(diào)用schedule.run_all():
importschedule
defjob_1():
print('Foo')
defjob_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
#立即運(yùn)行所有作業(yè),每次作業(yè)間隔10秒
schedule.run_all(delay_seconds=10)
3.高級使用
裝飾器安排作業(yè)
如果你覺得設(shè)定作業(yè)這種形式太啰嗦了,也可以使用裝飾器模式:
fromscheduleimportevery,repeat,run_pending
importtime
#此裝飾器效果等同于schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
defjob():
print("Iamascheduledjob")
whileTrue:
run_pending()
time.sleep(1)
并行執(zhí)行
默認(rèn)情況下,Schedule按順序執(zhí)行所有作業(yè)。其背后的原因是,很難找到讓每個人都高興的并行執(zhí)行模型。
不過你可以通過多線程的形式來運(yùn)行每個作業(yè)以解決此限制:
importthreading
importtime
importschedule
defjob1():
print("I'mrunningonthread%s"%threading.current_thread())
defjob2():
print("I'mrunningonthread%s"%threading.current_thread())
defjob3():
print("I'mrunningonthread%s"%threading.current_thread())
defrun_threaded(job_func):
job_thread=threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded,job1)
schedule.every(10).seconds.do(run_threaded,job2)
schedule.every(10).seconds.do(run_threaded,job3)
whileTrue:
schedule.run_pending()
time.sleep(1)
日志記錄
Schedule模塊同時也支持logging日志記錄,這么使用:
importschedule
importlogging
logging.basicConfig()
schedule_logger=logging.getLogger('schedule')
#日志級別為DEBUG
schedule_logger.setLevel(level=logging.DEBUG)
defjob():
print("Hello,Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()
效果如下:
DEBUG:schedule:Running*all*1jobswith0sdelayinbetween
DEBUG:schedule:RunningjobJob(interval=1,unit=seconds,do=job,args=(),kwargs={})
Hello,Logs
DEBUG:schedule:Deleting*all*jobs
異常處理
Schedule不會自動捕捉異常,它遇到異常會直接拋出,這會導(dǎo)致一個嚴(yán)重的問題:后續(xù)所有的作業(yè)都會被中斷執(zhí)行,因此我們需要捕捉到這些異常。
你可以手動捕捉,但是某些你預(yù)料不到的情況需要程序進(jìn)行自動捕獲,加一個裝飾器就能做到了:
importfunctools
defcatch_exceptions(cancel_on_failure=False):
defcatch_exceptions_d
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 制造企業(yè)節(jié)能降耗改造方案
- 企業(yè)數(shù)據(jù)治理制度與流程實(shí)施指南
- 制造型企業(yè)3S管理實(shí)施方案詳解
- 年吉林省經(jīng)濟(jì)管理干部學(xué)院單招職業(yè)適應(yīng)性考試題庫含答案詳解新
- 安全員A證考試題庫【培優(yōu)a卷】附答案詳解
- 安全員A證考試試卷附答案詳解(完整版)
- 安全員A證考試每日一練及一套完整答案詳解
- 安全員A證考試考前沖刺訓(xùn)練試卷及完整答案詳解(奪冠系列)
- 安全員A證考試練習(xí)題含完整答案詳解【名師系列】
- 安全員A證考試通關(guān)檢測卷完整版附答案詳解
- 移動式壓力容器安全技術(shù)監(jiān)察規(guī)程(TSG R0005-2011)
- 高速液壓夯實(shí)地基技術(shù)規(guī)程
- 醫(yī)防融合培訓(xùn)課件
- 23G409先張法預(yù)應(yīng)力混凝土管樁
- 2025年公司綜合管理部工作總結(jié)及2025年工作計(jì)劃
- 購買古琴合同范例
- 電力系統(tǒng)調(diào)頻輔助服務(wù)市場交易實(shí)施細(xì)則
- 風(fēng)電、光伏項(xiàng)目前期及建設(shè)手續(xù)辦理流程匯編
- DB41T 1522-2018 可燃?xì)怏w和有毒氣體報(bào)警儀檢查檢測技術(shù)規(guī)范
- QBT 1815-2002 指甲鉗行業(yè)標(biāo)準(zhǔn)
- 醫(yī)療機(jī)構(gòu)崗位聘用合同
評論
0/150
提交評論