如何将python dict与多处理同步

2022-01-12 00:00:00 python dictionary multiprocessing

问题描述

我正在使用 Python 2.6 和多处理模块进行多线程处理.现在我想要一个同步的字典(我真正需要的唯一原子操作是一个值上的 += 运算符).

I am using Python 2.6 and the multiprocessing module for multi-threading. Now I would like to have a synchronized dict (where the only atomic operation I really need is the += operator on a value).

我应该用 multiprocessing.sharedctypes.synchronized() 调用来包装字典吗?或者是另一种方式?

Should I wrap the dict with a multiprocessing.sharedctypes.synchronized() call? Or is another way the way to go?


解决方案

简介

似乎有很多扶手椅建议,但没有工作示例.这里列出的答案都没有建议使用多处理,这有点令人失望和不安.作为 python 爱好者,我们应该支持我们的内置库,虽然并行处理和同步从来都不是一件小事,但我相信通过适当的设计可以让它变得微不足道.这在现代多核架构中变得极为重要,怎么强调都不为过!也就是说,我对多处理库还很不满意,因为它仍处于起步阶段,有很多陷阱、错误,并且面向函数式编程(我讨厌).目前,由于多处理在在服务器运行时无法共享新创建的对象.管理器对象的注册"类方法只会在管理器(或其服务器)启动之前实际注册一个对象.废话不多说,更多代码:

Intro

There seems to be a lot of arm-chair suggestions and no working examples. None of the answers listed here even suggest using multiprocessing and this is quite a bit disappointing and disturbing. As python lovers we should support our built-in libraries, and while parallel processing and synchronization is never a trivial matter, I believe it can be made trivial with proper design. This is becoming extremely important in modern multi-core architectures and cannot be stressed enough! That said, I am far from satisfied with the multiprocessing library, as it is still in its infancy stages with quite a few pitfalls, bugs, and being geared towards functional programming (which I detest). Currently I still prefer the Pyro module (which is way ahead of its time) over multiprocessing due to multiprocessing's severe limitation in being unable to share newly created objects while the server is running. The "register" class-method of the manager objects will only actually register an object BEFORE the manager (or its server) is started. Enough chatter, more code:

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()

在上面的代码示例中,Server.py 使用了 multiprocessing 的 SyncManager,它可以提供同步的共享对象.此代码无法在解释器中运行,因为多处理库对于如何找到每个注册对象的可调用"非常敏感.运行 Server.py 将启动一个自定义的 SyncManager,该 SyncManager 共享同步字典以供多个进程使用,并且可以连接到同一台机器上的客户端,或者如果在环回以外的 IP 地址上运行,则可以连接到其他机器.在这种情况下,服务器在端口 5000 上的环回 (127.0.0.1) 上运行.使用 authkey 参数在操作 syncdict 时使用安全连接.按下任意键时,管理器将关闭.

In the above code example, Server.py makes use of multiprocessing's SyncManager which can supply synchronized shared objects. This code will not work running in the interpreter because the multiprocessing library is quite touchy on how to find the "callable" for each registered object. Running Server.py will start a customized SyncManager that shares the syncdict dictionary for use of multiple processes and can be connected to clients either on the same machine, or if run on an IP address other than loopback, other machines. In this case the server is run on loopback (127.0.0.1) on port 5000. Using the authkey parameter uses secure connections when manipulating syncdict. When any key is pressed the manager is shutdown.

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"

客户端还必须创建一个自定义的 SyncManager,注册syncdict",这一次不传入可调用来检索共享字典.然后,它使用自定义的 SycnManager 使用端口 5000 上的环回 IP 地址 (127.0.0.1) 和在 Server.py 中启动的与管理器建立安全连接的 authkey 进行连接.它通过调用管理器上注册的可调用对象来检索共享字典同步字典.它会提示用户以下内容:

The client must also create a customized SyncManager, registering "syncdict", this time without passing in a callable to retrieve the shared dict. It then uses the customized SycnManager to connect using the loopback IP address (127.0.0.1) on port 5000 and an authkey establishing a secure connection to the manager started in Server.py. It retrieves the shared dict syncdict by calling the registered callable on the manager. It prompts the user for the following:

  1. syncdict 中要操作的键
  2. 每次循环增加键访问的值的数量
  3. 每个周期的睡眠时间(以秒为单位)

客户端然后检查密钥是否存在.如果不是,它会在同步字典上创建密钥.客户端然后进入一个无限"循环,在该循环中它通过增量更新键的值,休眠指定的数量,并打印同步字典,仅重复此过程,直到发生 KeyboardInterrupt (Ctrl+C).

The client then checks to see if the key exists. If it doesn't it creates the key on the syncdict. The client then enters an "endless" loop where it updates the key's value by the increment, sleeps the amount specified, and prints the syncdict only to repeat this process until a KeyboardInterrupt occurs (Ctrl+C).

  1. 必须在管理器启动之前调用管理器的注册方法,否则即使对管理器的 dir 调用会显示它确实具有已注册的方法,也会出现异常.
  2. 必须使用方法而不是 dict 分配来完成对 dict 的所有操作(由于多处理共享自定义对象的方式,syncdict["blast"] = 2 会惨遭失败)
  3. 使用 SyncManager 的 dict 方法可以缓解烦人的问题 #2,但烦人的问题 #1 会阻止 SyncManager.dict() 返回的代理被注册和共享.(SyncManager.dict() 只能在管理器启动后调用,并且注册只会在管理器启动之前起作用,因此 SyncManager.dict() 仅在进行函数式编程并将代理作为参数传递给进程时有用,如文档示例)
  4. 服务器和客户端都必须注册,尽管从直觉上看,客户端似乎只能在连接到管理器后才能弄清楚(请将此添加到您的愿望清单多处理开发人员中)

结束

我希望你和我一样喜欢这个相当彻底但稍微费时的答案.我很难弄清楚为什么我在多处理模块上苦苦挣扎,Pyro 让它变得轻而易举,现在多亏了这个答案,我已经一针见血了.我希望这对 python 社区如何改进多处理模块有用,因为我相信它有很大的希望,但在它的起步阶段还没有实现.尽管描述了令人讨厌的问题,但我认为这仍然是一个非常可行的选择并且非常简单.您也可以使用 SyncManager.dict() 并将其作为参数传递给 Processes,就像文档显示的那样,根据您的要求,这可能是一个更简单的解决方案,这对我来说是不自然的.

Closing

I hope you enjoyed this quite thorough and slightly time-consuming answer as much as I have. I was having a great deal of trouble getting straight in my mind why I was struggling so much with the multiprocessing module where Pyro makes it a breeze and now thanks to this answer I have hit the nail on the head. I hope this is useful to the python community on how to improve the multiprocessing module as I do believe it has a great deal of promise but in its infancy falls short of what is possible. Despite the annoying problems described I think this is still quite a viable alternative and is pretty simple. You could also use SyncManager.dict() and pass it to Processes as an argument the way the docs show and it would probably be an even simpler solution depending on your requirements it just feels unnatural to me.

相关文章