如何通过 python 多处理利用所有内核

2022-01-12 00:00:00 python multiprocessing

问题描述

我已经摆弄 Python 的 multiprocessing 功能一个多小时了,尝试使用 multiprocessing.Processmultiprocessing 并行化一个相当复杂的图形遍历函数.经理:

I have been fiddling with Python's multiprocessing functionality for upwards of an hour now, trying to parallelize a rather complex graph traversal function using multiprocessing.Process and multiprocessing.Manager:

import networkx as nx
import csv
import time 
from operator import itemgetter
import os
import multiprocessing as mp

cutoff = 1

exclusionlist = ["cpd:C00024"]

DG = nx.read_gml("KeggComplete.gml", relabel=True)

for exclusion in exclusionlist:
    DG.remove_node(exclusion)

# checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__),
                  'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
    os.makedirs(fn)

manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),
                    key=itemgetter(1),
                    reverse=True)

def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
    source = item[0]
    uniqueTreePaths = []

    if cutoff < 1:
        return

    visited = [source]
    stack = [iter(DG[source])]

    while stack:
        children = stack[-1]
        child = next(children, None)

        if child is None:
            stack.pop()
            visited.pop()
        elif child in memorizedPaths:
            for path in memorizedPaths[child]:
                newPath = (tuple(visited) + tuple(path))
                if (len(newPath) <= cutoff) and
                    (len(set(visited) & set(path)) == 0):
                    uniqueTreePaths.append(newPath)
            continue
        elif len(visited) < cutoff:
            if child not in visited:
                visited.append(child)
                stack.append(iter(DG[child]))

                if visited not in uniqueTreePaths:
                    uniqueTreePaths.append(tuple(visited))
        else: # len(visited) == cutoff:
            if (visited not in uniqueTreePaths) and
                (child not in visited):
                uniqueTreePaths.append(tuple(visited + [child]))
            stack.pop()
            visited.pop()
    # writes the absolute path of the node path file into the hash table
    filepaths[source] = str(fn) + "/" + str(source) + "path.txt"
    with open (filepaths[source], "wb") as csvfile2:
        writer = csv.writer(csvfile2, delimiter=" ", quotechar="|")
        for path in uniqueTreePaths:
            writer.writerow(path)

    memorizedPaths[source] = uniqueTreePaths

############################################################################

if __name__ == '__main__':
    start = time.clock()

    for item in degreelist:
        test = mp.Process(target=_all_simple_paths_graph,
                          args=(DG, cutoff, item, memorizedPaths, filepaths))
        test.start()
        test.join()

end = time.clock()
print (end-start)

目前 - 尽管运气和魔法 - 它有效(有点).我的问题是我只使用了 24 个内核中的 12 个.

Currently - though luck and magic - it works (sort of). My problem is I'm only using 12 of my 24 cores.

有人可以解释为什么会这样吗?也许我的代码不是最好的多处理解决方案,或者它是我架构的一个特性Intel Xeon CPU E5-2640 @ 2.50GHz x18 在 Ubuntu 13.04 x64 上运行?

Can someone explain why this might be the case? Perhaps my code isn't the best multiprocessing solution, or is it a feature of my architecture Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64?

我设法得到:

p = mp.Pool()
for item in degreelist:
    p.apply_async(_all_simple_paths_graph,
                  args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()

工作,但是,它非常慢!所以我假设我在工作中使用了错误的功能.希望它有助于澄清我想要完成的事情!

Working, however, it's VERY SLOW! So I assume I'm using the wrong function for the job. hopefully it helps clarify exactly what I'm trying to accomplish!

.map 尝试:

partialfunc = partial(_all_simple_paths_graph,
                      DG=DG,
                      cutoff=cutoff,
                      memorizedPaths=memorizedPaths,
                      filepaths=filepaths)
p = mp.Pool()
for item in processList:
    processVar = p.map(partialfunc, xrange(len(processList)))   
p.close()
p.join()

工作,比单核慢.是时候优化了!

Works, is slower than singlecore. Time to optimize!


解决方案

这里堆积太多,无法在注释中解决,所以,mpmultiprocessing 的地方:

Too much piling up here to address in comments, so, where mp is multiprocessing:

mp.cpu_count() 应该返回处理器的数量.但是测试一下.有些平台很时髦,而且这些信息并不总是很容易获得.Python 尽其所能.

mp.cpu_count() should return the number of processors. But test it. Some platforms are funky, and this info isn't always easy to get. Python does the best it can.

如果您启动 24 个进程,它们将完全按照您的指示执行 ;-) 看起来 mp.Pool() 对您来说最方便.您将要创建的进程数传递给其构造函数.mp.Pool(processes=None) 将使用 mp.cpu_count() 作为处理器数量.

If you start 24 processes, they'll do exactly what you tell them to do ;-) Looks like mp.Pool() would be most convenient for you. You pass the number of processes you want to create to its constructor. mp.Pool(processes=None) will use mp.cpu_count() for the number of processors.

然后,您可以在您的 Pool 实例上使用例如 .imap_unordered(...) 来跨进程传播您的 degreelist.或者,也许其他一些 Pool 方法更适合您 - 实验.

Then you can use, for example, .imap_unordered(...) on your Pool instance to spread your degreelist across processes. Or maybe some other Pool method would work better for you - experiment.

如果你不能将问题放到 Pool 的世界观中,你可以改为创建一个 mp.Queue 来创建一个工作队列,.put()'ing 节点(或节点切片,以减少开销)在主程序中工作,并将工作人员写入 .get() 工作项队列.询问您是否需要示例.请注意,您需要在所有真实"工作项之后将标记值(每个进程一个)放在队列中,以便工作进程可以测试标记以了解它们何时完成.

If you can't bash the problem into Pool's view of the world, you could instead create an mp.Queue to create a work queue, .put()'ing nodes (or slices of nodes, to reduce overhead) to work on in the main program, and write the workers to .get() work items off that queue. Ask if you need examples. Note that you need to put sentinel values (one per process) on the queue, after all the "real" work items, so that worker processes can test for the sentinel to know when they're done.

仅供参考,我喜欢队列,因为它们更明确.许多其他人更喜欢 Pool,因为它们更神奇 ;-)

FYI, I like queues because they're more explicit. Many others like Pools better because they're more magical ;-)

这是一个可执行的原型.这显示了一种将 imap_unorderedPoolchunksize 一起使用的方法,不需要更改任何函数签名.当然,您必须插入您的真实代码;-) 请注意,init_worker 方法允许每个处理器仅传递大部分"参数一次,而不是为 中的每个项目传递一次度列表.减少进程间通信量对于提高速度至关重要.

Here's an executable prototype for you. This shows one way to use imap_unordered with Pool and chunksize that doesn't require changing any function signatures. Of course you'll have to plug in your real code ;-) Note that the init_worker approach allows passing "most of" the arguments only once per processor, not once for every item in your degreeslist. Cutting the amount of inter-process communication can be crucial for speed.

import multiprocessing as mp

def init_worker(mps, fps, cut):
    global memorizedPaths, filepaths, cutoff
    global DG

    print "process initializing", mp.current_process()
    memorizedPaths, filepaths, cutoff = mps, fps, cut
    DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)

def work(item):
    _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)

def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
    pass # print "doing " + str(item)

if __name__ == "__main__":
    m = mp.Manager()
    memorizedPaths = m.dict()
    filepaths = m.dict()
    cutoff = 1 ##
    # use all available CPUs
    p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
                                                   filepaths,
                                                   cutoff))
    degreelist = range(100000) ##
    for _ in p.imap_unordered(work, degreelist, chunksize=500):
        pass
    p.close()
    p.join()

我强烈建议完全按原样运行它,这样您就可以看到它的速度非常快.然后稍微添加一些东西,看看它是如何影响时间的.例如,只需添加

I strongly advise running this exactly as-is, so you can see that it's blazing fast. Then add things to it a bit a time, to see how that affects the time. For example, just adding

   memorizedPaths[item] = item

to _all_simple_paths_graph() 大大减慢了它的速度.为什么?因为字典随着每次添加而变得越来越大,并且这个进程安全的字典必须在所有进程之间同步(在幕后).同步的单位是整个 dict"——mp 机器无法利用内部结构对共享 dict 进行增量更新.

to _all_simple_paths_graph() slows it down enormously. Why? Because the dict gets bigger and bigger with each addition, and this process-safe dict has to be synchronized (under the covers) among all the processes. The unit of synchronization is "the entire dict" - there's no internal structure the mp machinery can exploit to do incremental updates to the shared dict.

如果您负担不起这笔费用,那么您不能为此使用 Manager.dict().聪明的机会比比皆是;-)

If you can't afford this expense, then you can't use a Manager.dict() for this. Opportunities for cleverness abound ;-)

相关文章