python rq worker
在做项目的时候遇到过一个问题就是同一个worker无法并行的执行多个任务队列的任务,我想看看源码里是什么原因导致出现这个结果。
启动Worker
需要在命令行内输入rq worker
在rq/cli.py
的worker函数执行命令行参数解析等操作,关键代码在try…catch
中。
- 首先执行
cleanup_ghosts
,当rq被突然终止时,会没有时间清理注册的worker,这个函数会在开始时清除ttl超时的worker,不然会在worker info里面还显示。 - 然后将自定义的异常处理导入
- 创建相连的queue对象
- 创建worker对象
- 调用worker的work方法,开始循环执行任务
1 | def worker(cli_config, burst, logging_level, name, results_ttl, |
任务获取
任务执行的入口是worker.py的work方法。整个worker方法是一个不终止的循环,不断的从队列中取出任务,然后执行。
1 | def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, |
首先注册信号处理函数,处理中断进程和终止进程函数。
然后在register_birth()中,将当前worker的key(比如rq:worker:shaoshuaideMacBook-Pro.21131)作为一个hashmap保存,这个hashmap里保存了worker的创建时间、worker的last heart beat、依赖的队列们(默认是default队列名)。最后通过
worker_registration.register(self, p)
将这个worker的key放入队列所拥有的set中。这样能够快速查到当前队列有哪些worker在使用。在worker对应的hashmap里添加状态
{state: "started"}
进入超大
while True
循环中首先
check_for_suspension
, 判断是否收到暂停命令。暂停标志是保存在redis中key为rq:suspended
,如果需要暂停,则在函数check_for_suspension
中不断休眠。从队列中获取一个任务,使用redis
BLPOP 队列1 队列2 ... 队列N
左边出队一个Job,这里使用BLPOP
所以设定了超时时间,同时redis保证了检查顺序,按照队列1->队列2->队列3。将任务交给worker执行,
self.execute_job(job, queue)
任务执行
- 首先将任务设置为BUSY,此时worker的状态如下
1 | 127.0.0.1:6379> HGETALL rq:worker:shaoshuaideMacBook-Pro.21131 |
调用
fork_work_horse(job, queue)
,在这个函数中调用os.fork()
,因此在windows中python rq
是用不了的。对于子进程,继续执行
self.main_work_horse(job, queue)
对于父进程,保存子进程的pid就退出了
fork_work_horse
了
1 | def execute_job(self, job, queue): |
子进程执行
此时子进程进入main_work_horse
。
首先设置忽略SIGINT中断信号(也就是ctrl+c),也就是说当前的work会完成,再退出rq。同时设置SIGTERM信号为默认处理函数(也就是直接退出),即cold shutdown。
在
self.perform_job(job, queue)
中执行任务。- 首先更新worker状态:
1
2
3
4
5
6
7
8127.0.0.1:6379> HGETALL rq:worker:shaoshuaideMacBook-Pro.21131
1) "state"
2) "busy"
3) "current_job"
4) "8407214b-7cab-4fd4-b409-acfdc5676920"
5) "last_heartbeat"
6) "2019-05-30T07:36:58.802853Z"redis的
rq:wip:default
中保存了当前运行的任务ID。然后在timeout的时间内运行
1
2with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
rv = job.perform()将该任务的ttl去除,然后调用job的
self._execute()
=> 调用self.func
,其中self.func是一个属性装饰器的函数,返回一个需要执行的函数=> 调用import_attribute(self.func_name)
=>导入相关函数后执行(*self.args, **self.kwargs)
。执行结束后在redis中删除对应的依赖job信息,修改worker的一些统计信息,比如将当前的任务对象设为None、将成功的任务数加一、增加任务总执行时间、保存任务执行的信息,下面是当前完成任务的信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17127.0.0.1:6379> hgetall rq:job:4d73fa5a-562a-4761-a3ca-393c66b2fe49
1) "status"
2) "started"
3) "created_at"
4) "2019-05-30T07:51:53.446755Z"
5) "data"
6) "x\x9ck`\x99*\xce\x00\x01\x1a=<%\xa9\xc5%\xf1E\xa5yz\x05\x05\x05S\xfc4k\xa7\x94L\xd1\x03\x00\x8f\x11\n5"
7) "origin"
8) "default"
9) "description"
10) "test_run.ppp()"
11) "enqueued_at"
12) "2019-05-30T07:51:53.447650Z"
13) "timeout"
14) "180"
15) "started_at"
16) "2019-05-30T07:53:36.856964Z"将job放入
rq:finished:default
中,同时为任务结果设置超时时间。最后
os._exit(0)
,子进程完美执行结束。
父进程执行
父进程在获取子进程pid后,开始执行monitor_work_horse(job)
。父进程会调用os.waitpid(self._horse_pid, 0)
来等待子进程完成。
处理超时
worker心跳机制
对于worker来说,每次获取任务前会调用heartbeat(timeout, pipeline)
来更新当前worker这个key的ttl,来防止worker超时。默认的ttl是420秒。同时在prepare_job_execution
前也会更新heartbeat()
,这个主要是用于dashboard判断这个worker是不是无响应。
job超时处理
在子进程执行目标任务时(main_work_horse
=>perform_job
=>job.perform
中),会在前面加上下文管理器
1 | with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): |
self.death_penalty_class
vu 为UnixSignalDeathPenalty
继承自BaseDeathPenalty
,主要是在enter方法中注册针对SIGALRM信号的处理函数,使用signal.alarm(self._timeout)
在默认的180秒后发出SIGALRM信号,之后抛出异常。
1 | class UnixSignalDeathPenalty(BaseDeathPenalty): |
如果job运行出现异常将任务设为Failed、从started队列中删除任务、增加任务失败次数、将错误信息输出。
结束worker
在worker的work方法中使用self._install_signal_handlers()
注册了针对SIGINT、SIGTERM的handler self.request_stop。此时收到了一个Ctrl+C,会进入request_stop来处理信号。
1 | def request_stop(self, signum, frame): |
在这个方法中,会重新为SIGINT、SIGTERM绑定新的处理方法self.request_force_stop。
- 如果再次Ctrl+C,会cold shutdown,如果子进程还在运行,会直接kill它。
- 否则进行warn shutdown,如果子进程还在运行,设置self._stop_reqested,则不会继续从队列获取任务,同时等待子进程运行完毕。
总结
通过上面的分析,虽然我指定当前worker从几个queue中获取任务,但是rq的worker只在一个进程中执行,每次只执行一个Job。为了能够并行,对需要并行的任务多开worker。
- 本文链接:https://dowob.cn/2019/05/29/python-rq-worker/
- 版权声明:本站所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!