关于celery的的基础介绍及安装使用参见python任务调度模块celery。
多worker和多队列
首先是多worker和多队列的原理及流程图。
一般情况下对于多worker和多队列的配置文件单独写在一个配置文件,方便管理和配置。
定义任务列表
multique.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16from celery import Celery
app = Celery()
app.config_from_object("celeryconfig")
def taskA(x, y):
return x*y
def taskB(x, y, z):
return x+y+z
def add(x, y):
return x+y
配置文件
celeryconfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from kombu import Queue, Exchange
BROKER_URL = "redis://118.24.18.158:6380/1"
CELERY_RESULT_BACKEND = "redis://118.24.18.158:6380/2"
CELERY_QUEUES = {
Queue("default", Exchange("default"), routing_key="default"),
Queue("for_task_A", Exchange("for_task_A"), routing_key="for_task_A"),
Queue("for_task_B", Exchange("for_task_B"), routing_key="for_task_B")
}
CELERY_ROUTES = {
"multique.taskA": {"queue": "for_task_A", "routing_key": "for_task_A"},
"multique.taskB": {"queue": "for_task_B", "routing_key": "for_task_B"}
}
启动celery worker监听
1 | celery -A multique worker -l=info -n workerA.%h -Q for_task_A |
调用任务
multicelery.py
1
2
3
4
5
6
7
8
9
10
11import time
from queue1.multique import *
re1 = taskA.delay(10, 20)
re2 = taskB.delay(100, 200, 300)
re3 = add.delay(1000, 2000)
time.sleep(2)
print(re1.result) #输出结果:200
print(re2.result) #输出结果:600
print(re3.status) #输出结果:PENDING
print(re3.result) #输出结果:None
我们看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。1
celery -A multique worker -l info -n worker.%h -Q celery
再次调用任务,状态应该为SUCCESS,结果为3000。
celery定时任务
celery定时任务,Celery Beat
进程通过读取配置文件的内容,周期性的将定时任务发往任务队列。
以上面多worker的异步任务为例,配置文件celeryconfig.py
中添加CELERYBEAT_SCHEDULE
变量,添加内容如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18CELERY_TIMEZONE = 'UTC' #指定时区,默认为UTC
CELERYBEAT_SCHEDULE = {
'taskA_schedule': {
'task': 'multique.taskA',
'schedule': 2, #每2s执行一次
'args': (5, 6) #传递函数参数
},
'taskB_scheduler': {
'task': "multique.taskB",
"schedule": 10,
"args":(10, 20, 30)
},
'add_schedule': {
"task": "multique.add",
"schedule": 5,
"args": (1, 2)
}
}
参数说明
- task
指定任务的名字 - schedule
设定任务的调度方式(设定任务如何重复执行),可以是一个表示秒的整数,也可以是一个 timedelta 对象,或者是一个 crontab 对象 - args
任务的参数列表 - kwargs
任务的参数字典 - options
所有 apply_async 所支持的参数启动celery worker进程
在项目根目录执行命令1
celery -A celeryapp worker -l=info #celeryapp为项目文件所在的package名称
启动celery beat进程
启动Celery Beat进程,定时将任务发送到Broker,在项目根目录执行下面命令1
celery beat -A celeryapp
之后在启动的worker窗口可以看到任务定时执行的情况。
启动worker和beat进程也可以放在同一个命令中执行1
celery -B -A celeryapp worker --loglevel=info
更多celery定时任务相关内容点击Periodic Tasks查看官方介绍。