如何在进程之间共享一个类?

问题描述

我想要一个由所有进程共享和更新的全局对象,并且锁定最少.

I want to have global object which is shared and updated by all processes with minimum locking.

import multiprocessing

class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.value.update(1)
  print counter_proxy.value.value, 't%s' % thread_id, 
    multiprocessing.current_process().name
  return counter_proxy.value.value

def main():
  manager = multiprocessing.Manager()
  counter = manager.Value(Counter, Counter())
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value.value

if __name__ == '__main__':
  main()

结果是这个 - 不是 10 而是零.看起来对象的共享值没有更新.如何锁定和更新这样的值?

The result is this - not 10 but zero. It looks like the object's shared value is not updated. How can I lock and update such value?

0 t0 PoolWorker-2
0 t1 PoolWorker-3
0 t2 PoolWorker-5
0 t3 PoolWorker-8
0 t4 PoolWorker-9
0 t5 PoolWorker-2
0 t6 PoolWorker-7
0 t7 PoolWorker-4
0 t8 PoolWorker-6
0 t9 PoolWorker-3
Should be 10 but is 0.

<小时>

@dano 提供的当前最佳解决方案 - 我将自定义管理器与类代理混合在一起.


Current the best solution by @dano - I mixed custom manager with class proxy.

import multiprocessing
from multiprocessing.managers import BaseManager, NamespaceProxy


class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.update(1)

class CounterManager(BaseManager):
  pass

class CounterProxy(NamespaceProxy):
  _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'update')

  def update(self, value):
    callmethod = object.__getattribute__(self, '_callmethod')
    return callmethod(self.update.__name__, (value,))

CounterManager.register('Counter', Counter, CounterProxy)

def main():
  manager = CounterManager()
  manager.start()

  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value

if __name__ == '__main__':
  main()


解决方案

multiprocessing.Value 并非设计用于自定义类,它应该类似于 multiprocessing.sharedctypes.Value.相反,您需要创建一个 自定义管理器 并注册您的课程用它.如果您不直接访问 value ,而是通过方法修改/访问它,您的生活也会更轻松,这些方法将由为您的类创建的默认 Proxy 导出默认.常规属性(如 Counter.value)不是,因此如果没有额外的自定义,它们就无法访问.这是一个工作示例:

multiprocessing.Value isn't designed to be used with custom classes, it's supposed to be similar to a multiprocessing.sharedctypes.Value. Instead, you need to create a custom manager and register your class with it. Your life will also be easier if you don't access value directly, but modify/access it via methods, which will get exported by the default Proxy created for your class by default. Regular attributes (like Counter.value) aren't, so they aren't accessible without additional customization. Here's a working example:

import multiprocessing
from multiprocessing.managers import BaseManager

class MyManager(BaseManager): pass

def Manager():
    m = MyManager()
    m.start()
    return m 

class Counter(object):
  def __init__(self):
    self._value = 0

  def update(self, value):
    self._value += value

  def get_value(self):
      return self._value

MyManager.register('Counter', Counter)

def update(counter_proxy, thread_id):
  counter_proxy.update(1)
  print counter_proxy.get_value(), 't%s' % thread_id, 
    multiprocessing.current_process().name
  return counter_proxy

def main():
  manager = Manager()
  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func=update, args=(counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.get_value()

if __name__ == '__main__':
  main()

输出:

1 t0 PoolWorker-2
2 t1 PoolWorker-8
3 t2 PoolWorker-4
4 t3 PoolWorker-5
5 t4 PoolWorker-6
6 t5 PoolWorker-7
7 t6 PoolWorker-3
8 t7 PoolWorker-9
9 t8 PoolWorker-2
10 t9 PoolWorker-8
Should be 10 but is 10.

相关文章