5 工作进程

启动

# 在前台启动celery
celery -A proj worker -l info

# 可在一台机器上启动多个工作进程
# 注意:设置不同的 --hostname(-n)
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

--hostname (-n) 可用的参数变量
# %h: Hostname, including domain name.
# %n: Hostname only.
# %d: Domain name only.
注意如果是supervisor则需要转义%

停止

可以使用TERM信号来停止,工作进程会完成它当前执行的程序,你需要等待它完成。如果想立即停止则可发送KILL信号,但是会造成任务丢失,除非该任务启用了acks_late选项。此外不能正常终止它的子进程,所以需要使用:

pkill -9 -f 'celery worker'
或
ps auxww | grep 'celery worker' | a wk '{print $2}' | xargs kill -9

重启

celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
celery multi restart 1 --pidfile=/var/run/celery/%n.pid

也可以发送HUP信号来重启:

kill -HUP $pid
# 在mac下不可用

信号

TERM Warm shutdown, wait for tasks to complete.

QUIT Cold shutdown, terminate ASAP

USR1 Dump traceback for all active threads.

USR2 Remote debug, see

文件类参数 选项

[--logfile] [--pidfile] [--statedb]

%p: Full node name.
%h: Hostname, including domain name.
%n: Hostname only.
%d: Domain name only.
%i: Prefork pool process index or 0 if MainProcess.
%I: Prefork pool process index with separator.

多进程 选项

%i – Pool process index or 0 if MainProcess.
%I – Pool process index with separator.

并发

默认是使用多进程方式来处理,可用Eventlet。

远程控制

远程控制是通过使用高优先级别的广播队列,可发送给所有也可指定工作进程。命令可以有响应,调用者可以等待并收集回复。因为无法知道集群中有多少个工作进程,所以也没办法知道有多少个工作进程会发送回复,所有有一个等待回复的超时时间,默认是1秒钟,可以根据需求更改这个值。另可指定收到回复的最大数量,可指定接收者。

broadcast

用于把命令发送到工作进程,其他的一些命令可能也是在后台使用broadcast。

app.control.broadcast('rate_limit', 
    arguments={'task_name': 'myapp.mytask',  
    'rate_limit': '200/m'})

# 这样默认是异步的 不等待回执,可指定reply=True

app.control.broadcast('rate_limit', {
    'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)

# 可指定接收的工作进程 destination

app.control.broadcast('rate_limit', {
    'task_name': 'myapp.mytask',
    'rate_limit': '200/m'}, reply=True,
                           destination=['worker1@example.com'])

命令

revoke 取消

当一个工作进程收到了取消命令,会跳过执行该任务,但是如果已经开始执行了则不会终止,除非开启了terminate选项(terminate关闭的是进程,而该进程可能已经开始处理其他任务了,所以应该在任务卡住的情况下使用该选项)。

例子:

result.revoke()

AsyncResult(id).revoke()

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True)

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True, signal='SIGKILL')

一次取消多个任务:
app.control.revoke([
    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
    'f565793e-b041-4b2b-9ca4-dca22762a55d',
    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

持久化取消请求

取消任务的方式是广播消息给所有工作进程,工作进程会在内存中维护一个取消状态的任务列表,当工作进程启动时会和集群中的其他工作进程同步。由于是在内存中保存的,所以如果所有的工作进程都重启了那么这个列表就消失了。可以使用-statedb选项来指定一个文件来存储。

celery -A proj worker -l info --statedb=/var/run/celery/worker.state

如果是多进程的话需得标示节点

celery multi start 2 -l info --statedb=/var/run/celery/%n.state

Time Limits

一个任务默认是可以一直运行的,如果有很多任务在等待,则会阻塞工作进程,无法处理新的任务。最好的方式是指定时间限制。

时间限制分为两种soft hard,soft会抛出异常,任务中可以捕获它并进行处理,hard则是不能捕获的,强制结束该任务。
-time-limit -soft-time-limit

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

app.control.time_limit('tasks.crawl_the_web',
                       soft=60, hard=120, reply=True)

Rate Limits

# 每分钟最多执行200个任务
app.control.rate_limit('myapp.mytask', '200/m')

app.control.rate_limit('myapp.mytask', '200/m',
          destination=['celery@worker1.example.com'])

如果设置了worker_disable_rate_limits则不会受影响

Max tasks per child setting

设置一个工作进程可执行的最大任务数量,可以处理有资源泄露的情况

Max memory per child setting

设置一个工作进程可使用的最大内存大小,可以处理有资源泄露的情况

Autoscaling

自动伸缩,可设置worker的最大数量和最小数量
–autoscale=10,3

Queues

指定工作进程只在某个队列中获取任务:

celery -A proj worker -l info -Q foo,bar,baz

如果队列之前没有创建过则可以使用task_create_missing_queues选项指定生成新队列。也可以在运行时通过add_consumer、cancel_consumer来控制。

celery -A proj control add_consumer foo
celery -A proj control add_consumer foo -d celery@worker1.local
app.control.add_consumer('foo', reply=True)
app.control.add_consumer('foo', reply=True,
                        destination=['worker1@example.com'])
>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])

告知worker处理foo队列

celery -A proj control cancel_consumer foo
celery -A proj control cancel_consumer foo -d celery@worker1.local
app.control.cancel_consumer('foo', reply=True)

告知worker取消处理foo队列

celery -A proj inspect active_queues
celery -A proj inspect active_queues -d celery@worker1.local
app.control.inspect().active_queues()
app.control.inspect(['worker1.local']).active_queues()

查看

Inspecting workers

i = app.control.inspect()
i = app.control.inspect(['worker1.example.com', 'worker2.example.com'])
i = app.control.inspect('worker1.example.com')

i.registered()
i.active()
i.scheduled()
i.reserved()
celery -A proj inspect stats

Additional Commands

关闭

app.control.broadcast('shutdown')
app.control.broadcast('shutdown', destination='worker1@example.com')

ping

app.control.ping(timeout=0.5)

监控管理

app.control.enable_events()
app.control.disable_events()

Writing your own remote control commands

转载自:https://blog.csdn.net/weixin_33973609/article/details/88248218

You may also like...