大數據

Celery基本使用

簡介

Celery 是一款非常簡單、靈活、可靠的分佈式系統,可用於處理大量消息,Celery架構如下圖,由消息隊列、任務執行單元、結果存儲三部分組成。

image.png

user:任務的生產者,可以是用戶(觸發任務)或者celery beat(產生週期任務)
broker:消息中間件,Redis或RabbitMQ,生產者產生的任務會先存放到broker
worker:任務執行單元,執行broker中的任務
store(backend):存儲任務結果

安裝

pip install -U Celery

目錄結構

使用下面簡單的目錄結構做演示,正式項目中的使用一般為多目錄結構,不同的任務放到不同的task文件。

proj
  - task.py
  - app.py
  - result.py

基本使用

創建任務

# task.py

import celery

broker = 'redis://127.0.0.1:6379/1'  # redis://:[email protected]:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)

# app = celery.Celery('test')  # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend

@app.task
def add(a, b):
    print(f'{a}+{b} = {a + b}')
    return a + b

• 實例化Celery對象
• 定義函數
• Celery對象task方法作為裝飾器
如果要使用RabbitMQ作為消息中間件,只需修改broker,無需關心如何操作Redis或RabbitMQ。

調用任務

# app.py

from task import add

result = add.delay(1, 2)
print(result.id)  # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b

task裝飾器返回一個celery Task對象,賦予了原函數Task的方法,delay方法用於調用異步任務,異步任務返回celery.result.AsyncResult對象,在執行任務的時候最主要的就是獲取ID,以後可以用ID去查任務執行狀態、結果等。

任務狀態

# result.py

from celery.result import AsyncResult
from task import app

async_result = AsyncResult(
    id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())

• status:任務執行狀態(PENDING、STARTED、RETRY、FAILURE、SUCCESS)
• result:任務的返回值或錯誤信息
• get():同步的方式查結果

以上所有操作基於broker能連接上
執行add.delay後去查任務狀態,一直處於PENDING,因為worker沒啟動,任務就存放在broker中一直沒有被執行。

broker

broker選擇的是Redis的數據庫1,前面觸發的任務存儲在key為celery的list中。

image.png

啟動worker

celery worker -A task -l info -P eventlet
  • -A:指定Celery對象的位置
  • -l:日誌級別
  • -P:默認使用prefork管理併發,windows不支持prefork

worker啟動後,可以看到部分配置信息、隊列、任務,然後就會執行broker中堆積的任務,並將結果保存到backend

- ** ---------- [config]
- ** ---------- .> app:         test:0x1ade6ea95f8
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . task.add

...

[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3

值得注意的是tasks,列表展示所有的celery任務,後面celery-beat還會用到。

[tasks]
  . task.add

backend

到Redis查看執行結果

image.png

監控

監控使用flower,自帶可視化web界面,使用pip安裝

pip install -U flower

啟動命令:

celery -A task flower (--port=5555) 
或
celery flower --broker=redis://127.0.0.1:6379/1

image.png

image.png

重試

可以設置任務執行失敗後是否進行重試,對哪些錯誤進行重試,重試次數、時間間隔等。

# task

@app.task(
    autoretry_for=(Exception,),  # 指定錯誤碼,Exception表示對所有錯誤進行重試
    max_retries=2,  # 重試次數
    retry_backoff=4,  # 重試時間間隔,retry_jitter為True時,時間間隔為1-retry_backoff之間隨機數
    # retry_jitter=False,  # 默認為True,retry_jitter=False時,第n次重試時間為上一次重試時間retry_backoff**n秒後
)
def send_msg(msg):
    return msg[5]
# app

res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')

res1正常執行,res2 IndexError重試

image.png

定時任務

普通函數使用task裝飾器後被封裝成celery任務,可以z作為異步任務調用,也可以作為定時任務調用,具體看調用方式。
調用定時任務的兩種方式:

countdown

add.apply_async(args=(1, 2), countdown=3, expires=5)

• args:函數的參數
• countdown:幾秒後執行
• expires:過期時間

eta

eta:datetime、utc時間,可以使用timedelta做時間運算,設置時間上更為靈活。

add.apply_async(
    args=(1, 2), 
    eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
    expires=20
)

週期任務:celery beat

如果定義了beat_schedule,在啟動celery-beat後就會週期性的產生任務放到broker。

# task.py
app.conf.beat_schedule = {
    'test_cycle_task': {  # 任務name
        # 執行task下的add函數
        'task': 'task.add',  # 啟動worker時監控到的任務 -> [tasks]
        # 'schedule': 5.0,  # 幾秒執行一次
        'schedule': timedelta(seconds=6),  # 多久執行一次
        # 'schedule': crontab(hour=0, minute=55),  # 每天定時執行
        'args': (2, 3)  # 傳遞參數
    },
    'task2': {}
}

celery-beat啟動命令:

celery beat -A task

子任務

任務執行成功或失敗後執行一個回調函數。

task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1執行成功,返回值傳遞給task2並且作為task2的第一個參數
task1出錯,ID傳遞給task3並且作為task3的第一個參數

執行:add.apply_async((1, 2), link=add.s(3))
[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]
[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]
[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999
9999417923s: 3
[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6
[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6

Leave a Reply

Your email address will not be published. Required fields are marked *