python celery组件使用
Prepare
install:
1 | pip install celery |
选择broker,安装,这里假设使用Redis:
1 | apt-get install redis-server |
Configure
首先认真阅读官方celery文档的get start部分,如果有时间的话,最好全部看一边…
然后参考阅读别人的best practices,基本就可以干活了。
几个要点
- task相关的文件,最好都是用绝对导入;否则,应该在task function上面指定name;
- 如果需要root权限执行,需要在相关文件中加入
platforms.C_FORCE_ROOT=True
,但是最好别用root; - 可以根据需要消除
pickle
的警告,设置CELERY_ACCEPT_CONTENT=['pickle',]
; - 默认不发心跳,需要加上
BROKER_HEARTBEAT=10
,来消除心跳相关警告; Router
router是不支持通配符的,如果需要,可以自己写一个自定义Router类。下面是一个celery.py
的例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50from __future__ import absolute_import
from celery import Celery, platforms
from settings import CELERY_BROKER
from kombu import Queue, Exchange
class MyRouter(object):
'''router for tasks using wildcard'''
def route_for_task(self, task, *args, **kwargs):
if task.startswith('writer'):
return {'queue': 'async_writer', 'routing_key': 'async_writer'}
elif task.startswith('caller'):
return {'queue': 'async_caller', 'routing_key': 'async_caller'}
else:
return {'queue': 'default', 'routing_key': 'default'}
QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('async_writer', Exchange('async_writer'),
routing_key='async_writer'),
Queue('async_caller', Exchange('async_caller'),
routing_key='async_caller'),
)
platforms.C_FORCE_ROOT = True
app = Celery('async',
broker=CELERY_BROKER,
include=['async.writer', 'async.caller', 'async.checker', ])
app.conf.update(CELERY_ACCEPT_CONTENT=['pickle', ],
CELERY_IGNORE_RESULT=True,
CELERY_DISABLE_RATE_LIMITS=True,
CELERY_DEFAULT_EXCHANGE='default',
CELERY_DEFAULT_QUEUE='default',
CELERY_DEFAULT_ROUTING_KEY='default',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_TASK_SERIALIZER='pickle',
CELERY_RESULT_SERIALIZER='pickle',
BROKER_HEARTBEAT=10,
CELERY_QUEUES=QUEUES,
CELERY_ROUTES=(MyRouter(),),
)
if __name__ == "__main__":
app.start()
Start
官方给出的init.d脚本不是很好用,下面是一个自己写的参考:
1 |
|
重点需要关注的是celery multi start的用法,注意start后面跟的是worker的名字(取数据的worker),也可以简单的写3,然后-Q:worker_name queue_name,最后-c是实际的worker(干活的worker)的数目,-Q是给队列指定worker。例子中的语句,意思是启动3个worker,分别命名为writer, caller和default,然后启动3个队列,名字分别是async_writer, async_caller和default,每个worker分配7个进程用来干活。