Python多处理使用队列写入同一个文件

2022-01-21 00:00:00 python queue multiprocessing file-io

问题描述

我知道 Stack Exchange 上有很多关于将结果从多处理写入单个文件的帖子,我只阅读了这些帖子就开发了我的代码.我想要实现的是并行运行RevMapCoord"函数,并使用 multiprocess.queue 将其结果写入一个文件中.但是我在排队工作时遇到了问题.我的代码:

I know there are many post on Stack Exchange related to writing results from multiprocessing to single file and I have developed my code after reading only those posts. What I am trying to achieve is that run 'RevMapCoord' function in parallel and write its result in one single file using multiprocess.queue. But I am having problem while queuing my job. My Code:

def RevMapCoord(list):
    "Read a file, Find String and Do something"

def feed(queue, parlist):
    for par in parlist:
        print ('Echo from Feeder: %s' % (par))
        queue.put(par)
    print ('**Feeder finished queing**')

def calc(queueIn, queueOut):
     print ('Worker function started')
     while True:
         try:
             par = queueIn.get(block = False)
             res = RevMapCoord(final_res)
             queueOut.put((par,res))
         except:
             break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
         try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
         except:
            break
    fhandle.close()


feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))

feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
    p.start()
for p in calcProc:
    p.join()
writProc.start()
writProc.join ()

当我运行此代码时,脚本卡在feedProc.start()"步骤.屏幕的最后几行输出显示了feedProc.start()"末尾的打印语句:

When I run this code script stucks at "feedProc.start()" step. The last few output lines from screen shows print statement from the end of "feedProc.start()":

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**

但在执行下一行feedProc.join()"之前挂起.代码没有错误并继续运行但什么都不做(挂起).请告诉我我犯了什么错误.

But hangs before executing next line "feedProc.join ()". Code gives no error and keep on running but doing nothing(hangs). Please tell me what mistake I am making.


解决方案

我在Python3中使用'map_async'函数实现了多处理到单个文件的写入结果.这是我写的函数:

I achieved writing results from multiprocessing to a single file by uing 'map_async' function in Python3. Here is the function I wrote:

def PPResults(module,alist):##Parallel processing
    npool = Pool(int(nproc))    
    res = npool.map_async(module, alist)
    results = (res.get())###results returned in form of a list 
    return results

因此,我在a_list"中为该函数提供了一个参数列表,module"是一个执行处理并返回结果的函数.上述函数继续以列表的形式收集结果,并在处理完'a_list'中的所有参数后返回.结果可能不是正确的顺序,但由于顺序对我来说并不重要,所以效果很好.结果"列表可以迭代,并将单个结果写入文件中,例如:

So, I provide this function with a list of parameters in 'a_list' and 'module' is a function that does the processing and returns result. The above function keeps on collecting the results in form of list and returns back when all the parameters from 'a_list' have been processed. The results might not be correct order but as order was not important for me this worked well. The 'result' list can be iterated and individual results written in file like:

fh_out = open('./TestResults', 'w')
for i in results:##Write Results from list to file
    fh_out.write(i)

为了保持结果的顺序,我们可能需要使用类似于我在问题(上文)中提到的队列".虽然我能够修复代码,但我相信这里不需要提及.

To keep the order of the results we might need to use 'queues' similar to I mentioned in my question (above). Though I am being able to fix the code but I believe it is not required to be mentioned here.

谢谢

AK

相关文章