簡介
Celery 是一款非常簡單、靈活、可靠的分佈式系統,可用於處理大量消息,Celery架構如下圖,由消息隊列、任務執行單元、結果存儲三部分組成。
user:任務的生產者,可以是用戶(觸發任務)或者celery beat(產生週期任務)
broker:消息中間件,Redis或RabbitMQ,生產者產生的任務會先存放到broker
worker:任務執行單元,執行broker中的任務
store(backend):存儲任務結果
安裝
pip install -U Celery
目錄結構
使用下面簡單的目錄結構做演示,正式項目中的使用一般為多目錄結構,不同的任務放到不同的task文件。
proj
- task.py
- app.py
- result.py
基本使用
創建任務
# task.py
import celery
broker = 'redis://127.0.0.1:6379/1' # redis://:[email protected]:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)
# app = celery.Celery('test') # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend
@app.task
def add(a, b):
print(f'{a}+{b} = {a + b}')
return a + b
• 實例化Celery對象
• 定義函數
• Celery對象task方法作為裝飾器
如果要使用RabbitMQ作為消息中間件,只需修改broker,無需關心如何操作Redis或RabbitMQ。
調用任務
# app.py
from task import add
result = add.delay(1, 2)
print(result.id) # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b
task裝飾器返回一個celery Task對象,賦予了原函數Task的方法,delay方法用於調用異步任務,異步任務返回celery.result.AsyncResult對象,在執行任務的時候最主要的就是獲取ID,以後可以用ID去查任務執行狀態、結果等。
任務狀態
# result.py
from celery.result import AsyncResult
from task import app
async_result = AsyncResult(
id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())
• status:任務執行狀態(PENDING、STARTED、RETRY、FAILURE、SUCCESS)
• result:任務的返回值或錯誤信息
• get():同步的方式查結果
以上所有操作基於broker能連接上
執行add.delay後去查任務狀態,一直處於PENDING,因為worker沒啟動,任務就存放在broker中一直沒有被執行。
broker
broker選擇的是Redis的數據庫1,前面觸發的任務存儲在key為celery的list中。
啟動worker
celery worker -A task -l info -P eventlet
- -A:指定Celery對象的位置
- -l:日誌級別
- -P:默認使用prefork管理併發,windows不支持prefork
worker啟動後,可以看到部分配置信息、隊列、任務,然後就會執行broker中堆積的任務,並將結果保存到backend
- ** ---------- [config]
- ** ---------- .> app: test:0x1ade6ea95f8
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. task.add
...
[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3
值得注意的是tasks,列表展示所有的celery任務,後面celery-beat還會用到。
[tasks]
. task.add
backend
到Redis查看執行結果
監控
監控使用flower,自帶可視化web界面,使用pip安裝
pip install -U flower
啟動命令:
celery -A task flower (--port=5555)
或
celery flower --broker=redis://127.0.0.1:6379/1
重試
可以設置任務執行失敗後是否進行重試,對哪些錯誤進行重試,重試次數、時間間隔等。
# task
@app.task(
autoretry_for=(Exception,), # 指定錯誤碼,Exception表示對所有錯誤進行重試
max_retries=2, # 重試次數
retry_backoff=4, # 重試時間間隔,retry_jitter為True時,時間間隔為1-retry_backoff之間隨機數
# retry_jitter=False, # 默認為True,retry_jitter=False時,第n次重試時間為上一次重試時間retry_backoff**n秒後
)
def send_msg(msg):
return msg[5]
# app
res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')
res1正常執行,res2 IndexError重試
定時任務
普通函數使用task裝飾器後被封裝成celery任務,可以z作為異步任務調用,也可以作為定時任務調用,具體看調用方式。
調用定時任務的兩種方式:
countdown
add.apply_async(args=(1, 2), countdown=3, expires=5)
• args:函數的參數
• countdown:幾秒後執行
• expires:過期時間
eta
eta:datetime、utc時間,可以使用timedelta做時間運算,設置時間上更為靈活。
add.apply_async(
args=(1, 2),
eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
expires=20
)
週期任務:celery beat
如果定義了beat_schedule,在啟動celery-beat後就會週期性的產生任務放到broker。
# task.py
app.conf.beat_schedule = {
'test_cycle_task': { # 任務name
# 執行task下的add函數
'task': 'task.add', # 啟動worker時監控到的任務 -> [tasks]
# 'schedule': 5.0, # 幾秒執行一次
'schedule': timedelta(seconds=6), # 多久執行一次
# 'schedule': crontab(hour=0, minute=55), # 每天定時執行
'args': (2, 3) # 傳遞參數
},
'task2': {}
}
celery-beat啟動命令:
celery beat -A task
子任務
任務執行成功或失敗後執行一個回調函數。
task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1執行成功,返回值傳遞給task2並且作為task2的第一個參數
task1出錯,ID傳遞給task3並且作為task3的第一個參數
執行:add.apply_async((1, 2), link=add.s(3))
[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]
[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]
[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999
9999417923s: 3
[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6
[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6