具有多处理功能的 Celery 并行分布式任务
问题描述
我有一个 CPU 密集型 Celery 任务.我想使用跨大量 EC2 实例的所有处理能力(核心)来更快地完成这项工作(具有多处理功能的 celery 并行分布式任务 - 我认为).
I have a CPU intensive Celery task. I would like to use all the processing power (cores) across lots of EC2 instances to get this job done faster (a celery parallel distributed task with multiprocessing - I think).
线程、多处理、分布式计算、分布式并行处理这些术语都是我的术语试图更好地理解.
The terms, threading, multiprocessing, distributed computing, distributed parallel processing are all terms I'm trying to understand better.
示例任务:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
使用上面的代码(如果可能的话,还有一个例子)之前人们会如何使用 Celery 分配这个任务,方法是允许使用所有的计算云中所有可用机器的 CPU 能力?
Using the code above (with an example if possible) how one would ago about distributed this task using Celery by allowing this one task to be split up utilising all the computing CPU power across all available machine in the cloud?
解决方案
你的目标是:
- 将您的工作分配给多台机器(分布式计算/分布式并行处理)
- 将给定机器上的工作分配给所有 CPU(多处理/线程)
Celery 可以很容易地为您做到这两点.首先要了解的是,每个 celery worker 都是 默认配置以运行与系统上可用的 CPU 内核一样多的任务:
Celery can do both of these for you fairly easily. The first thing to understand is that each celery worker is configured by default to run as many tasks as there are CPU cores available on a system:
Concurrency 是用于处理的 prefork 工作进程的数量你的任务同时进行,当所有这些都忙于做新的工作时任务必须等待其中一项任务完成才能完成进行处理.
Concurrency is the number of prefork worker process used to process your tasks concurrently, when all of these are busy doing work new tasks will have to wait for one of the tasks to finish before it can be processed.
默认并发数是该机器上的 CPU 数量(包括核心),您可以使用 -c 选项指定自定义编号.没有推荐值,因为最佳数量取决于因素的数量,但如果您的任务主要受 I/O 限制,那么您可以尝试增加它,实验表明增加超过CPU 数量的两倍很少有效,并且可能会降级性能.
The default concurrency number is the number of CPU’s on that machine (including cores), you can specify a custom number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than twice the number of CPU’s is rarely effective, and likely to degrade performance instead.
这意味着每个单独的任务无需担心使用多处理/线程来利用多个 CPU/内核.相反,celery 会同时运行足够多的任务来使用每个可用的 CPU.
This means each individual task doesn't need to worry about using multiprocessing/threading to make use of multiple CPUs/cores. Instead, celery will run enough tasks concurrently to use each available CPU.
除此之外,下一步是创建一个任务来处理您的 list_of_millions_of_ids
的某些子集.这里有几个选项 - 一个是让每个任务处理一个 ID,因此您运行 N 个任务,其中 N == len(list_of_millions_of_ids)
.这将保证工作在所有任务中均匀分配,因为永远不会有一个工作人员提前完成并只是等待的情况;如果它需要工作,它可以从队列中拉出一个 id.您可以使用 celery group
执行此操作(如 John Doe 所述).
With that out of the way, the next step is to create a task that handles processing some subset of your list_of_millions_of_ids
. You have a couple of options here - one is to have each task handle a single ID, so you run N tasks, where N == len(list_of_millions_of_ids)
. This will guarantee that work is evenly distributed amongst all your tasks since there will never be a case where one worker finishes early and is just waiting around; if it needs work, it can pull an id off the queue. You can do this (as mentioned by John Doe) using the celery group
.
tasks.py:
@app.task
def process_ids(item):
id = item #long complicated equation here
database.objects(newid=id).save()
并执行任务:
from celery import group
from tasks import process_id
jobs = group(process_ids(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()
另一种选择是将列表分成更小的部分并将这些部分分发给您的工人.这种方法存在浪费一些周期的风险,因为您最终可能会导致一些工人在等待,而其他工人仍在工作.然而,celery 文档说明这种担忧通常是没有根据的:
Another option is to break the list into smaller pieces and distribute the pieces to your workers. This approach runs the risk of wasting some cycles, because you may end up with some workers waiting around while others are still doing work. However, the celery documentation notes that this concern is often unfounded:
有些人可能会担心将任务分块会导致并行性,但这对于繁忙的集群和在练习,因为您避免了消息传递的开销大大提高性能.
Some may worry that chunking your tasks results in a degradation of parallelism, but this is rarely true for a busy cluster and in practice since you are avoiding the overhead of messaging it may considerably increase performance.
因此,由于减少了消息传递开销,您可能会发现将列表分块并将块分配给每个任务的效果更好.您也可以通过这种方式减轻数据库的负载,通过计算每个 id,将其存储在一个列表中,然后在完成后将整个列表添加到数据库中,而不是一次只做一个 id.分块方法看起来像这样
So, you may find that chunking the list and distributing the chunks to each task performs better, because of the reduced messaging overhead. You can probably also lighten the load on the database a bit this way, by calculating each id, storing it in a list, and then adding the whole list into the DB once you're done, rather than doing it one id at a time. The chunking approach would look something like this
tasks.py:
@app.task
def process_ids(items):
for item in items:
id = item #long complicated equation here
database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
然后开始任务:
from tasks import process_ids
jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()
您可以尝试一下哪种分块大小可以获得最佳结果.您希望找到一个最佳点,在此减少消息传递开销,同时保持足够小的大小,这样您最终不会让工作人员比另一个工作人员更快地完成他们的工作块,然后无所事事地等待.
You can experiment a bit with what chunking size gives you the best result. You want to find a sweet spot where you're cutting down messaging overhead while also keeping the size small enough that you don't end up with workers finishing their chunk much faster than another worker, and then just waiting around with nothing to do.
相关文章