如何在 Python 中对类实例使用多处理?

2022-01-12 00:00:00 python multiprocessing decorator

问题描述

我正在尝试创建一个类,它可以运行一个单独的进程来完成一些需要很长时间的工作,从一个主模块启动一堆这些,然后等待它们全部完成.我想启动一次流程,然后继续为它们提供要做的事情,而不是创建和破坏流程.例如,也许我有 10 台服务器运行 dd 命令,然后我希望它们都 scp 文件等.

我的最终目标是为每个系统创建一个类,以跟踪与其关联的系统的信息,例如 IP 地址、日志、运行时等.但是该类必须能够启动系统命令然后在系统命令运行时将执行返回给调用者,以便稍后跟进系统命令的结果.

我的尝试失败了,因为我无法通过管道将类的实例方法通过 pickle 发送到子进程.那些是不可腌制的.因此,我尝试以各种方式修复它,但我无法弄清楚.如何修补我的代码来做到这一点?如果你不能发送任何有用的东西,多处理有什么好处?

是否有关于多处理与类实例一起使用的良好文档?我可以让多处理模块工作的唯一方法是使用简单的功能.每次在类实例中使用它的尝试都失败了.也许我应该改为传递事件?我还不明白该怎么做.

导入多处理导入系统重新进口类 ProcessWorker(multiprocessing.Process):"""此类作为单独的进程运行,以并行执行工作人员的命令一旦启动,它会继续运行,监控任务队列,直到发送无""""def __init__(self, task_q, result_q):multiprocessing.Process.__init__(self)self.task_q = task_qself.result_q = 结果_q返回定义运行(自我):"""multiprocessing.Process 提供的重载函数.调用 start() 信号"""proc_name = self.nameprint '%s: Launched' % (proc_name)而真:next_task_list = self.task_q.get()如果 next_task 为无:# 毒丸表示关机print '%s: Exiting' % (proc_name)self.task_q.task_done()休息下一个任务 = 下一个任务列表[0]打印 '%s: %s' % (proc_name, next_task)args = next_task_list[1]kwargs = next_task_list[2]答案 = next_task(*args, **kwargs)self.task_q.task_done()self.result_q.put(答案)返回# ProcessWorker 类结束类工人(对象):"""启动子进程以在单独的进程中运行派生类的命令,坐下来听某事做这个基类被每个派生工作者调用"""def __init__(self, config, index=None):self.config = 配置self.index = 索引# 为任何有索引值的东西启动 ProcessWorker如果 self.index 不是无:self.task_q = multiprocessing.JoinableQueue()self.result_q = multiprocessing.Queue()self.process_worker = ProcessWorker(self.task_q, self.result_q)self.process_worker.start()打印到这里"# 进程应该正在运行并监听要执行的函数返回def enqueue_process(target): # 没有 self,因为它是一个装饰器"""用于将此类对象中的命令目标放入 task_q注意:任何用 this 修饰的函数都必须使用 fetch_results() 来获取目标任务的结果值"""def 包装器(自我,*args,**kwargs):self.task_q.put([target, args, kwargs]) # FAIL: target 是类实例方法,不能腌制!返回包装def fetch_results(self):"""在所有进程都由多个模块生成之后,此命令被每个人调用以检索调用的结果.这会阻塞,直到队列中的项目执行完成"""self.task_q.join() # 等待它完成return self.result_q.get() # 返回结果@enqueue_processdef run_long_command(自我,命令):print "我正在运行 number % as process "%number, self.name# 在这里,我将启动一个子进程来运行一个长时间运行的系统命令# p = Popen(命令)等# p.wait() 等返回def 关闭(自我):self.task_q.put(无)self.task_q.join()如果 __name__ == '__main__':config = [一些价值",其他东西"]指数 = 7工人= []对于范围内的 i (5):工人=工人(配置,索引)worker.run_long_command("ls/")工人.追加(工人)对于工人中的工人:worker.fetch_results()# 做更多的工作...(这实际上会在另一个类的分发器中完成)对于工人中的工人:工人.close()

我尝试将 ProcessWorker 类和多处理队列的创建移到 Worker 类之外,然后尝试手动腌制工作者实例.即使这样也不起作用,我得到一个错误

<块引用>

RuntimeError: 队列对象应该只在进程之间共享通过继承

.但我只是将这些队列的引用传递给工作实例?我缺少一些基本的东西.这是主要部分的修改代码:

如果 __name__ == '__main__':config = [一些价值",其他东西"]指数 = 7工人= []对于范围内的 i (1):task_q = multiprocessing.JoinableQueue()result_q = multiprocessing.Queue()process_worker = ProcessWorker(task_q, result_q)worker = Worker(config, index, process_worker, task_q, result_q)something_to_look_at = pickle.dumps(worker) # FAIL: 不喜欢排队??process_worker.start()worker.run_long_command("ls/")

解决方案

与其尝试发送方法本身(这是不切实际的),不如尝试发送要执行的方法的名称.p>

假设每个worker运行相同的代码,这只是一个简单的getattr(self, task_name).

我会传递元组 (task_name, task_args),其中 task_args 是直接提供给任务方法的 dict:

next_task_name, next_task_args = self.task_q.get()如果下一个任务名称:任务 = getattr(self, next_task_name)答案 = 任务(**next_task_args)...别的:#毒丸,关机休息

I am trying to create a class than can run a separate process to go do some work that takes a long time, launch a bunch of these from a main module and then wait for them all to finish. I want to launch the processes once and then keep feeding them things to do rather than creating and destroying processes. For example, maybe I have 10 servers running the dd command, then I want them all to scp a file, etc.

My ultimate goal is to create a class for each system that keeps track of the information for the system in which it is tied to like IP address, logs, runtime, etc. But that class must be able to launch a system command and then return execution back to the caller while that system command runs, to followup with the result of the system command later.

My attempt is failing because I cannot send an instance method of a class over the pipe to the subprocess via pickle. Those are not pickleable. I therefore tried to fix it various ways but I can't figure it out. How can my code be patched to do this? What good is multiprocessing if you can't send over anything useful?

Is there any good documentation of multiprocessing being used with class instances? The only way I can get the multiprocessing module to work is on simple functions. Every attempt to use it within a class instance has failed. Maybe I should pass events instead? I don't understand how to do that yet.

import multiprocessing
import sys
import re

class ProcessWorker(multiprocessing.Process):
    """
    This class runs as a separate process to execute worker's commands in parallel
    Once launched, it remains running, monitoring the task queue, until "None" is sent
    """

    def __init__(self, task_q, result_q):
        multiprocessing.Process.__init__(self)
        self.task_q = task_q
        self.result_q = result_q
        return

    def run(self):
        """
        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
        """
        proc_name = self.name
        print '%s: Launched' % (proc_name)
        while True:
            next_task_list = self.task_q.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % (proc_name)
                self.task_q.task_done()
                break
            next_task = next_task_list[0]
            print '%s: %s' % (proc_name, next_task)
            args = next_task_list[1]
            kwargs = next_task_list[2]
            answer = next_task(*args, **kwargs)
            self.task_q.task_done()
            self.result_q.put(answer)
        return
# End of ProcessWorker class

class Worker(object):
    """
    Launches a child process to run commands from derived classes in separate processes,
    which sit and listen for something to do
    This base class is called by each derived worker
    """
    def __init__(self, config, index=None):
        self.config = config
        self.index = index

        # Launce the ProcessWorker for anything that has an index value
        if self.index is not None:
            self.task_q = multiprocessing.JoinableQueue()
            self.result_q = multiprocessing.Queue()

            self.process_worker = ProcessWorker(self.task_q, self.result_q)
            self.process_worker.start()
            print "Got here"
            # Process should be running and listening for functions to execute
        return

    def enqueue_process(target):  # No self, since it is a decorator
        """
        Used to place an command target from this class object into the task_q
        NOTE: Any function decorated with this must use fetch_results() to get the
        target task's result value
        """
        def wrapper(self, *args, **kwargs):
            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
        return wrapper

    def fetch_results(self):
        """
        After all processes have been spawned by multiple modules, this command
        is called on each one to retreive the results of the call.
        This blocks until the execution of the item in the queue is complete
        """
        self.task_q.join()                          # Wait for it to to finish
        return self.result_q.get()                  # Return the result

    @enqueue_process
    def run_long_command(self, command):
        print "I am running number % as process "%number, self.name

        # In here, I will launch a subprocess to run a  long-running system command
        # p = Popen(command), etc
        # p.wait(), etc
        return 

    def close(self):
        self.task_q.put(None)
        self.task_q.join()

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(5):
        worker = Worker(config, index)
        worker.run_long_command("ls /")
        workers.append(worker)
    for worker in workers:
        worker.fetch_results()

    # Do more work... (this would actually be done in a distributor in another class)

    for worker in workers:
        worker.close() 

Edit: I tried to move the ProcessWorker class and the creation of the multiprocessing queues outside of the Worker class and then tried to manually pickle the worker instance. Even that doesn't work and I get an error

RuntimeError: Queue objects should only be shared between processes through inheritance

. But I am only passing references of those queues into the worker instance?? I am missing something fundamental. Here is the modified code from the main section:

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(1):
        task_q = multiprocessing.JoinableQueue()
        result_q = multiprocessing.Queue()
        process_worker = ProcessWorker(task_q, result_q)
        worker = Worker(config, index, process_worker, task_q, result_q)
        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
        process_worker.start()
        worker.run_long_command("ls /")

解决方案

Instead of attempting to send a method itself (which is impractical), try sending a name of a method to execute.

Provided that each worker runs the same code, it's a matter of a simple getattr(self, task_name).

I'd pass tuples (task_name, task_args), where task_args were a dict to be directly fed to the task method:

next_task_name, next_task_args = self.task_q.get()
if next_task_name:
  task = getattr(self, next_task_name)
  answer = task(**next_task_args)
  ...
else:
  # poison pill, shut down
  break

相关文章