python celery组件使用

Prepare

install:

1
pip install celery

选择broker,安装,这里假设使用Redis:

1
apt-get install redis-server

Configure

首先认真阅读官方celery文档的get start部分,如果有时间的话,最好全部看一边…

然后参考阅读别人的best practices,基本就可以干活了。

几个要点

  1. task相关的文件,最好都是用绝对导入;否则,应该在task function上面指定name;
  2. 如果需要root权限执行,需要在相关文件中加入platforms.C_FORCE_ROOT=True,但是最好别用root;
  3. 可以根据需要消除pickle的警告,设置CELERY_ACCEPT_CONTENT=['pickle',]
  4. 默认不发心跳,需要加上BROKER_HEARTBEAT=10,来消除心跳相关警告;
  5. Router

    router是不支持通配符的,如果需要,可以自己写一个自定义Router类。下面是一个celery.py的例子:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    from __future__ import absolute_import

    from celery import Celery, platforms
    from settings import CELERY_BROKER
    from kombu import Queue, Exchange


    class MyRouter(object):

    '''router for tasks using wildcard'''

    def route_for_task(self, task, *args, **kwargs):
    if task.startswith('writer'):
    return {'queue': 'async_writer', 'routing_key': 'async_writer'}
    elif task.startswith('caller'):
    return {'queue': 'async_caller', 'routing_key': 'async_caller'}
    else:
    return {'queue': 'default', 'routing_key': 'default'}


    QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('async_writer', Exchange('async_writer'),
    routing_key='async_writer'),
    Queue('async_caller', Exchange('async_caller'),
    routing_key='async_caller'),
    )

    platforms.C_FORCE_ROOT = True

    app = Celery('async',
    broker=CELERY_BROKER,
    include=['async.writer', 'async.caller', 'async.checker', ])

    app.conf.update(CELERY_ACCEPT_CONTENT=['pickle', ],
    CELERY_IGNORE_RESULT=True,
    CELERY_DISABLE_RATE_LIMITS=True,
    CELERY_DEFAULT_EXCHANGE='default',
    CELERY_DEFAULT_QUEUE='default',
    CELERY_DEFAULT_ROUTING_KEY='default',
    CELERY_DEFAULT_EXCHANGE_TYPE='topic',
    CELERY_TASK_SERIALIZER='pickle',
    CELERY_RESULT_SERIALIZER='pickle',
    BROKER_HEARTBEAT=10,
    CELERY_QUEUES=QUEUES,
    CELERY_ROUTES=(MyRouter(),),
    )

    if __name__ == "__main__":
    app.start()

Start

官方给出的init.d脚本不是很好用,下面是一个自己写的参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/bin/bash
#
# PushserverCore uWSGI Web Server init script
#
### BEGIN INIT INFO
# Provides: PushserverCore
# Required-Start: $remote_fs $remote_fs $network $syslog
# Required-Stop: $remote_fs $remote_fs $network $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Start PushserverCore Service for generic init daemon
# Description: PushserverCore Service thrift Server backend.
### END INIT INFO

NAME="Core Thrift Server"
PROJECT=PushserverCore
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/var/app/enabled/$PROJECT
DESC="PushserverCore"
APP_DIR=/var/app/enabled/$PROJECT/Core
APP_PATH=$APP_DIR/CoreServer.py
CELERY_LOG_PATH=/var/app/log/PushserverCore/celery.log

print_succ()
{
echo "$(tput setaf 2)$(tput bold)DONE$(tput sgr0)"
}

print_fail()
{
echo "$(tput setaf 1)$(tput bold)FAILED$(tput sgr0)"
}

stop_service()
{
echo "stoping $NAME..."
if pgrep -f $APP_PATH > /dev/null 2>&1; then
pkill -f $APP_PATH
fi
print_succ
}

start_service()
{
if pgrep -f $APP_PATH > /dev/null 2>&1; then
echo "$NAME service is already running."
return
else
echo "starting $NAME service..."
nohup python $APP_PATH >/dev/null 2>&1 &
fi
sleep 3
if pgrep -f $APP_PATH > /dev/null 2>&1; then
print_succ
else
print_fail
fi
}

stop_worker()
{
echo "stoping celery worker..."
if pgrep -f celery > /dev/null 2>&1;then
pkill -f celery
fi
print_succ
}

start_worker()
{
if pgrep -f celery > /dev/null 2>&1; then
echo "celery is already running"
return
else
echo "starting celery worker..."
celery -A async multi start writer caller default -Q:writer async_writer -Q:caller async_caller -Q:default default -c 7 -l INFO --workdir=$APP_DIR --logfile=$CELERY_LOG_PATH
fi
sleep 3
if pgrep -f celery > /dev/null 2>&1; then
print_succ
else
print_fail
fi
}

check_status()
{
if pgrep -f $APP_PATH > /dev/null 2>&1; then
echo "$NAME is running"
else
echo "$NAME is not running"
fi

if pgrep -f celery > /dev/null 2>&1; then
echo "celery worker is running"
else
echo "celery worker is not running"
fi

}

set -e

. /lib/lsb/init-functions

case "$1" in
start)
echo "Starting $DESC..."
start_service
start_worker
;;
stop)
echo "Stopping $DESC..."
stop_service
stop_worker
;;

restart)
echo "Restarting $DESC..."
stop_service
stop_worker
sleep 3
start_service
start_worker
echo "Checking..."
check_status
;;

status)
check_status
;;
*)
echo "Usage: $NAME {start|stop|restart|status}" >&2
exit 1
;;
esac

exit 0

重点需要关注的是celery multi start的用法,注意start后面跟的是worker的名字(取数据的worker),也可以简单的写3,然后-Q:worker_name queue_name,最后-c是实际的worker(干活的worker)的数目,-Q是给队列指定worker。例子中的语句,意思是启动3个worker,分别命名为writer, caller和default,然后启动3个队列,名字分别是async_writer, async_caller和default,每个worker分配7个进程用来干活。