python任务调度模块celery(二)

关于celery的的基础介绍及安装使用参见python任务调度模块celery

多worker和多队列

首先是多worker和多队列的原理及流程图。

celery-multi-queue
一般情况下对于多worker和多队列的配置文件单独写在一个配置文件,方便管理和配置。

定义任务列表

multique.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")

@app.task
def taskA(x, y):
return x*y

@app.task
def taskB(x, y, z):
return x+y+z

@app.task
def add(x, y):
return x+y

配置文件

celeryconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from kombu import Queue, Exchange

BROKER_URL = "redis://118.24.18.158:6380/1"
CELERY_RESULT_BACKEND = "redis://118.24.18.158:6380/2"

CELERY_QUEUES = {
Queue("default", Exchange("default"), routing_key="default"),
Queue("for_task_A", Exchange("for_task_A"), routing_key="for_task_A"),
Queue("for_task_B", Exchange("for_task_B"), routing_key="for_task_B")
}

CELERY_ROUTES = {
"multique.taskA": {"queue": "for_task_A", "routing_key": "for_task_A"},
"multique.taskB": {"queue": "for_task_B", "routing_key": "for_task_B"}
}

启动celery worker监听

1
2
celery -A multique worker -l=info -n workerA.%h -Q for_task_A
celery -A multique worker -l=info -n workerB.%h -Q for_task_B

调用任务

multicelery.py

1
2
3
4
5
6
7
8
9
10
11
import time
from queue1.multique import *

re1 = taskA.delay(10, 20)
re2 = taskB.delay(100, 200, 300)
re3 = add.delay(1000, 2000)
time.sleep(2)
print(re1.result) #输出结果:200
print(re2.result) #输出结果:600
print(re3.status) #输出结果:PENDING
print(re3.result) #输出结果:None

我们看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

1
celery -A multique worker -l info -n worker.%h -Q celery

再次调用任务,状态应该为SUCCESS,结果为3000。

celery定时任务

celery定时任务,Celery Beat进程通过读取配置文件的内容,周期性的将定时任务发往任务队列。
以上面多worker的异步任务为例,配置文件celeryconfig.py 中添加CELERYBEAT_SCHEDULE变量,添加内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CELERY_TIMEZONE = 'UTC' #指定时区,默认为UTC
CELERYBEAT_SCHEDULE = {
'taskA_schedule': {
'task': 'multique.taskA',
'schedule': 2, #每2s执行一次
'args': (5, 6) #传递函数参数
},
'taskB_scheduler': {
'task': "multique.taskB",
"schedule": 10,
"args":(10, 20, 30)
},
'add_schedule': {
"task": "multique.add",
"schedule": 5,
"args": (1, 2)
}
}

参数说明

  • task
    指定任务的名字
  • schedule
    设定任务的调度方式(设定任务如何重复执行),可以是一个表示秒的整数,也可以是一个 timedelta 对象,或者是一个 crontab 对象
  • args
    任务的参数列表
  • kwargs
    任务的参数字典
  • options
    所有 apply_async 所支持的参数

    启动celery worker进程

    在项目根目录执行命令
    1
    celery -A celeryapp worker -l=info    #celeryapp为项目文件所在的package名称

启动celery beat进程

启动Celery Beat进程,定时将任务发送到Broker,在项目根目录执行下面命令

1
celery beat -A celeryapp

之后在启动的worker窗口可以看到任务定时执行的情况。
启动worker和beat进程也可以放在同一个命令中执行

1
celery -B -A celeryapp worker --loglevel=info

更多celery定时任务相关内容点击Periodic Tasks查看官方介绍。

Recommended Posts