在做项目的时候遇到过一个问题就是同一个worker无法并行的执行多个任务队列的任务,我想看看源码里是什么原因导致出现这个结果。

启动Worker

需要在命令行内输入rq worker

rq/cli.py的worker函数执行命令行参数解析等操作,关键代码在try…catch中。

  1. 首先执行cleanup_ghosts,当rq被突然终止时,会没有时间清理注册的worker,这个函数会在开始时清除ttl超时的worker,不然会在worker info里面还显示。
  2. 然后将自定义的异常处理导入
  3. 创建相连的queue对象
  4. 创建worker对象
  5. 调用worker的work方法,开始循环执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, verbose, quiet, sentry_dsn,
exception_handler, pid, queues, log_format, date_format, **options):
"""Starts an RQ worker."""

settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
queues = queues or settings.get('QUEUES', ['default'])
name = name or settings.get('NAME')

if pid:
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))

setup_loghandlers_from_args(verbose, quiet, date_format, log_format)

try:

cleanup_ghosts(cli_config.connection)
exception_handlers = []
for h in exception_handler:
exception_handlers.append(import_attribute(h))

if is_suspended(cli_config.connection):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)

queues = [cli_config.queue_class(queue,
connection=cli_config.connection,
job_class=cli_config.job_class)
for queue in queues]
worker = cli_config.worker_class(queues,
name=name,
connection=cli_config.connection,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_monitoring_interval=job_monitoring_interval,
job_class=cli_config.job_class,
queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None)

worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format)
except ConnectionError as e:
print(e)
sys.exit(1)

任务获取

任务执行的入口是worker.py的work方法。整个worker方法是一个不终止的循环,不断的从队列中取出任务,然后执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT):
self._install_signal_handlers() # 注册信号处理函数
did_perform_work = False
self.register_birth() # 向redis注册日期
self.set_state(WorkerStatus.STARTED)
qnames = self.queue_names()

try:
while True:
try:
self.check_for_suspension(burst)

if self.should_run_maintenance_tasks:
self.clean_registries()

if self._stop_requested:
self.log.info('Stopping on request')
break

timeout = None if burst else max(1, self.default_worker_ttl - 15)

result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
if burst:
self.log.info("RQ worker %r done, quitting", self.key)
break

job, queue = result
self.execute_job(job, queue) # 执行任务
self.heartbeat()

did_perform_work = True

except StopRequested:
break
finally:
if not self.is_horse:
self.register_death()
return did_perform_work
  1. 首先注册信号处理函数,处理中断进程和终止进程函数。

  2. 然后在register_birth()中,将当前worker的key(比如rq:worker:shaoshuaideMacBook-Pro.21131)作为一个hashmap保存,这个hashmap里保存了worker的创建时间、worker的last heart beat、依赖的队列们(默认是default队列名)。最后通过worker_registration.register(self, p)将这个worker的key放入队列所拥有的set中。这样能够快速查到当前队列有哪些worker在使用。

  3. 在worker对应的hashmap里添加状态{state: "started"}

  4. 进入超大while True循环中

    • 首先check_for_suspension, 判断是否收到暂停命令。暂停标志是保存在redis中key为rq:suspended,如果需要暂停,则在函数check_for_suspension中不断休眠。

    • 从队列中获取一个任务,使用redis BLPOP 队列1 队列2 ... 队列N左边出队一个Job,这里使用BLPOP所以设定了超时时间,同时redis保证了检查顺序,按照队列1->队列2->队列3。

    • 将任务交给worker执行,self.execute_job(job, queue)

任务执行

  1. 首先将任务设置为BUSY,此时worker的状态如下
1
2
3
127.0.0.1:6379> HGETALL rq:worker:shaoshuaideMacBook-Pro.21131
1) "state"
2) "busy"
  1. 调用fork_work_horse(job, queue),在这个函数中调用os.fork(),因此在windows中python rq是用不了的。

    • 对于子进程,继续执行self.main_work_horse(job, queue)

    • 对于父进程,保存子进程的pid就退出了fork_work_horse

1
2
3
4
5
def execute_job(self, job, queue):
self.set_state(WorkerStatus.BUSY)
self.fork_work_horse(job, queue) # 这里是子进程执行
self.monitor_work_horse(job) # 这里是父进程执行
self.set_state(WorkerStatus.IDLE)

子进程执行

此时子进程进入main_work_horse

  1. 首先设置忽略SIGINT中断信号(也就是ctrl+c),也就是说当前的work会完成,再退出rq。同时设置SIGTERM信号为默认处理函数(也就是直接退出),即cold shutdown。

  2. self.perform_job(job, queue)中执行任务。

    • 首先更新worker状态:
    1
    2
    3
    4
    5
    6
    7
    127.0.0.1:6379> HGETALL rq:worker:shaoshuaideMacBook-Pro.21131
    1) "state"
    2) "busy"
    3) "current_job"
    4) "8407214b-7cab-4fd4-b409-acfdc5676920"
    5) "last_heartbeat"
    6) "2019-05-30T07:36:58.802853Z"
    • redis的rq:wip:default中保存了当前运行的任务ID。

    • 然后在timeout的时间内运行

    1
    2
    with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
    rv = job.perform()
    • 将该任务的ttl去除,然后调用job的self._execute() => 调用self.func ,其中self.func是一个属性装饰器的函数,返回一个需要执行的函数=> 调用import_attribute(self.func_name)=>导入相关函数后执行(*self.args, **self.kwargs)

    • 执行结束后在redis中删除对应的依赖job信息,修改worker的一些统计信息,比如将当前的任务对象设为None、将成功的任务数加一、增加任务总执行时间、保存任务执行的信息,下面是当前完成任务的信息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    127.0.0.1:6379> hgetall rq:job:4d73fa5a-562a-4761-a3ca-393c66b2fe49
    1) "status"
    2) "started"
    3) "created_at"
    4) "2019-05-30T07:51:53.446755Z"
    5) "data"
    6) "x\x9ck`\x99*\xce\x00\x01\x1a=<%\xa9\xc5%\xf1E\xa5yz\x05\x05\x05S\xfc4k\xa7\x94L\xd1\x03\x00\x8f\x11\n5"
    7) "origin"
    8) "default"
    9) "description"
    10) "test_run.ppp()"
    11) "enqueued_at"
    12) "2019-05-30T07:51:53.447650Z"
    13) "timeout"
    14) "180"
    15) "started_at"
    16) "2019-05-30T07:53:36.856964Z"
    • 将job放入rq:finished:default中,同时为任务结果设置超时时间。

    • 最后os._exit(0),子进程完美执行结束。

父进程执行

父进程在获取子进程pid后,开始执行monitor_work_horse(job)。父进程会调用os.waitpid(self._horse_pid, 0)来等待子进程完成。

处理超时

worker心跳机制

对于worker来说,每次获取任务前会调用heartbeat(timeout, pipeline)来更新当前worker这个key的ttl,来防止worker超时。默认的ttl是420秒。同时在prepare_job_execution前也会更新heartbeat(),这个主要是用于dashboard判断这个worker是不是无响应。

job超时处理

在子进程执行目标任务时(main_work_horse=>perform_job=>job.perform中),会在前面加上下文管理器

1
2
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
rv = job.perform()

self.death_penalty_classvu 为UnixSignalDeathPenalty继承自BaseDeathPenalty,主要是在enter方法中注册针对SIGALRM信号的处理函数,使用signal.alarm(self._timeout)在默认的180秒后发出SIGALRM信号,之后抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class UnixSignalDeathPenalty(BaseDeathPenalty):

def handle_death_penalty(self, signum, frame):
raise self._exception('Task exceeded maximum timeout value '
'({0} seconds)'.format(self._timeout))

def setup_death_penalty(self):
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)

def cancel_death_penalty(self):
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)

class BaseDeathPenalty(object):
def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
self._timeout = timeout
self._exception = exception

def __enter__(self):
self.setup_death_penalty()

def __exit__(self, type, value, traceback):
try:
self.cancel_death_penalty()
except BaseTimeoutException:
pass
return False

def setup_death_penalty(self):
raise NotImplementedError()

def cancel_death_penalty(self):
raise NotImplementedError()

如果job运行出现异常将任务设为Failed、从started队列中删除任务、增加任务失败次数、将错误信息输出。

结束worker

在worker的work方法中使用self._install_signal_handlers()注册了针对SIGINT、SIGTERM的handler self.request_stop。此时收到了一个Ctrl+C,会进入request_stop来处理信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
def request_stop(self, signum, frame):
signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop)

self.handle_warm_shutdown_request()

if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True
self.set_shutdown_requested_date()
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
else:
raise StopRequested()

在这个方法中,会重新为SIGINT、SIGTERM绑定新的处理方法self.request_force_stop。

  1. 如果再次Ctrl+C,会cold shutdown,如果子进程还在运行,会直接kill它。
  2. 否则进行warn shutdown,如果子进程还在运行,设置self._stop_reqested,则不会继续从队列获取任务,同时等待子进程运行完毕。

总结

通过上面的分析,虽然我指定当前worker从几个queue中获取任务,但是rq的worker只在一个进程中执行,每次只执行一个Job。为了能够并行,对需要并行的任务多开worker。