Python的multiprocessi

2023-01-31 00:01:08 python multiprocessi


1.基本介绍

multiprocessing模块可以衍生出子进程。multiprocessing模块同时提供本地和远程的并发操作。multiprocessing模块不像threading模块那样会受到GIL全局解释器的限制,它使用进程代替线程。基于这样的特性,multiprocessing模块可以让程序员在一台服务器上使用多个处理器。

In [106]: from multiprocessing import Pool

In [107]: def f(x):
   .....:     return x*x;
   .....: 

In [108]: if __name__ == '__main__':
   .....:     p=Pool(5)
   .....:     print(p.map(f,[1,2,3]))
   .....:     
[1, 4, 9]



在multiprocessing模块中,子进程是通过一个Process对象生成的。然后调用start()函数

In [116]: from multiprocessing import Process

In [117]: def f(name):
   .....:     print 'hello',name
   .....:     

In [118]: if __name__ == '__main__':
   .....:     p=Process(target=f,args=('john',))
   .....:     p.start()
   .....:     p.join()
   .....:     
hello john


如果要查看各自的进程ID,可以使用以下代码

#!/usr/sbin/python

from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:',__name__
    if hasattr(os,'getppid'):
       print 'parent process:',os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello',name

if __name__ == '__main__':
   info('main line')
   p = Process(target=f,args=('john',))
   p.start()
   p.join()


main line
module name: __main__
parent process: 17148
process id: 18168
function f
module name: __main__
parent process: 18168
process id: 18169
hello john


2.进程间通信

multiprocessing模块支持Queues和Pipes两种方式来进行进程间通信

使用Queue

In [123]: from multiprocessing import Process,Queue

In [124]: def f(q):
   .....:     q.put([42,None,'hello'])
   .....:     

In [125]: if __name__ == '__main__':
   .....:     q=Queue()
   .....:     p=Process(target=f,args=(q,))
   .....:     p.start()
   .....:     print q.get()
   .....:     p.join()
   .....:     
[42, None, 'hello']


使用Queues,对于线程和进程来说都是安全



使用Pipe

In [136]: from multiprocessing import Process,Pipe

In [137]: def f(conn):
   .....:     conn.send([42,None,'hello'])
   .....:     conn.close()
   .....:     

In [138]: if __name__ == '__main__':
   .....:     parent_conn,child_conn=Pipe()
   .....:     p=Process(target=f,args=(child_conn,))
   .....:     p.start()
   .....:     print parent_conn.recv()
   .....:     p.join()
   .....:     
[42, None, 'hello']


Pipe()返回一对连接对象,这两个连接对象分别代表Pipe的两端。每个连接对象都有send()和recv()方法。需要注意的是如果两个不同的进程在同一时间对同一个Pipe的末端或者连接对象进行读写操作,那么Pipe中的数据可能被损坏。不同的进程在不同的末端同一时间读写数据不会造成数据损坏。


3.进程间同步

In [143]: from multiprocessing import Process,Lock

In [144]: def f(l,i):
               l.acquire()
               print 'hello world',i
               l.release()
   .....:     

In [145]: if __name__ == '__main__':
               lock=Lock()
               for num in range(10):
                   Process(target=f,args=(lock,num)).start()
   .....:         
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9



4.进程间共享状态信息

在进行并发编程的过程中,尽量不要使用共享状态。如果一定要在进程间共享数据,multiprocessing模块提供了一些方法。

共享内存

In [11]: from multiprocessing import Process,Value,Array

In [12]: def f(n,a):
   ....:     n.value=3.1415927
   ....:     for i in range(len(a)):
   ....:         a[i] = -a[i]
   ....:         

In [13]: if __name__ == '__main__':
   ....:     num=Value('d',0.0)
   ....:     arr=Array('i',range(10))
   ....:     p=Process(target=f,args=(num,arr))
   ....:     p.start()
   ....:     p.join()
   ....:     print num.value
   ....:     print arr[:]
   ....:     
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]




共享进程

In [27]: from multiprocessing import Process,Manager

In [28]: def f(d,l):
            d[1] = '1'
            d['2']=2
            d[0.25]=None
            l.reverse()
   ....:     

In [29]: if __name__ == '__main__':
    manager=Manager()
    d=manager.dict()
    l=manager.list(range(10))
    p=Process(target=f,args=(d,l))
    p.start()
    p.join()
    print d
    print l
   ....:     
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]



5.使用一组工作进程

使用Pool对象会创建一组worker进程

from multiprocessing import Pool, TimeoutErrorimport timeimport osdef f(x):
    return x*xif __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print pool.map(f, range(10))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print i

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print res.get(timeout=1)              # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print res.get(timeout=1)              # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout=1)
    except TimeoutError:
        print "We lacked patience and Got a multiprocessing.TimeoutError"



[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
25
36
64
49
81
16
400
27150
[27149, 27152, 27151, 27150]
We lacked patience and got a multiprocessing.TimeoutError



Pool对象的函数只能是创建它的进程可以使用。

multiprocessing模块的提供的函数需要子进程可以导入__main__模块









参考文档:

https://docs.Python.org/2/library/multiprocessing.html


相关文章