多处理未派生所有请求的进程

2022-04-10 00:00:00 python python-multiprocessing

问题描述

我正在使用Oraflex(一个用于离岸分析的有限元软件,但不应该是相关的)。我创建了一个脚本来检查我执行的模拟是否已成功完成(模拟可能会因为没有达到收敛而失败)。因为我谈论的是数千个文件,所以我尝试使用multiprocessing并行化该进程。下面是我的代码。很抱歉,我不能为您提供一个有效的示例,但我会尝试详细解释。我创建了multiprocessing.Process的派生类,并覆盖了run()以执行对模拟文件的检查。 然后,在__main__中,我设置了一些处理器,相应地拆分文件,并开始执行。

问题是这些进程并没有完全派生,而是在一个进程到另一个进程的随机时间量内产生。这是理所当然的吗,还是我错过了什么? 我不完全产卵的意思是,我看到:

[Info/Worker-1] child process calling self.run()

例如:

[Info/Worker-4] child process calling self.run()

程序运行约10分钟后。

事先感谢您的帮助/建议。

import os
import subprocess
import glob
import multiprocessing
import logging
import sys
import OrcFxAPI as of

class Worker(multiprocessing.Process):

    myJobs = []

    def setJobs(self, jobList):
        self.myJobs = jobList

    @staticmethod
    def changedExtensionFileName(oldFileName, newExtension):
        return '.'.join((os.path.splitext(oldFileName)[0], newExtension))

    def run(self):
        failed = []
        model = of.Model(threadCount=1)

        for job in self.myJobs:
            try:
                print('%s starting' % job)
                sys.stdout.flush()
                model.LoadSimulation(job)
                if model.state == of.ModelState.SimulationStoppedUnstable:
                    newJob = job.replace('.sim', '.dat')
                    failed.append(newJob)

                    with open('Failed_Sim.txt', 'a') as f:
                        f.write(f'{newJob}
')
                        f.close()

                    model.LoadData(newJob)
                    model.general.ImplicitConstantTimeStep /= 2
                    model.SaveData(newJob)
                    print(f'{job} has failed, reducing time step')

            except of.DLLError as err:
                print('%s ERROR: %s' % (job, err))
                sys.stdout.flush()
                with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                    f.write('%s error: %s' % (job, err))
                    f.close()
        return



if __name__ == '__main__':
    import re
    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)]    

    # begin multprocessing
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    corecount = 14 

    workers = []

    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    start = 0
    for coreNum in range(0, corecount):
        worker = Worker()
        workers.append(worker)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
        if end>len(sim_file):
            end = len(sim_file)
        worker.setJobs(sim_file[start:end])
        worker.start()
        start = end
        if start>=len(sim_file):
            break

    for worker in workers:
        worker.join()
    print('Done...')

解决方案

好的,所以没有人举手来回答这个问题(我不知道怎么做!),所以来了一个更大的重组建议...

def worker(inpData):
    #The worker process

    failed1 = []
    failed2 = []

    for job in inpData:   #I'm not sure of the data shape of the chunks, has your original method split them into coherent chunks capable of being processed independently? My step here could be wrong. 
        try:
            #print('%s starting' % job)  #Prints won't appear on console from worker processes from windows, so commented them all out
            model.LoadSimulation(job)
            if model.state == of.ModelState.SimulationStoppedUnstable:
                newJob = job.replace('.sim', '.dat')
                failed1.append(newJob)

                #I'd recommend we pass the list "failed" back to master and write to text from there, otherwise you could have several processes updating the text file at once, leading to possible loss of data
                #with open('Failed_Sim.txt', 'a') as f:
                #     f.write(f'{newJob}
')
                #     f.close()

                model.LoadData(newJob)
                model.general.ImplicitConstantTimeStep /= 2
                model.SaveData(newJob)
                #print(f'{job} has failed, reducing time step')   

            except of.DLLError as err:
                #print('%s ERROR: %s' % (job, err))
                #sys.stdout.flush()
                #with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                #    f.write('%s error: %s' % (job, err))
                #    f.close()
                failed2.append(job)

#Note I've made two failed lists to pass back, for both failure types

return failed1, failed2


if __name__ == "__main__":
    import re
    import multiprocessing as mp
    nCPUs = mp.cpu_count()

    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)] 

    #Make the chunks
    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    chunks = []
    start = 0
    for iChunk in range(0, nCPUs)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
            if end>len(sim_file):
                end = len(sim_file)
        chunk.append(sim_file[start:end])


    #Send to workers
    pool = mp.Pool(processes=nCPUs)
    futA = []

    for iChunk in range(0, nCPUs):
        futA.append(pool.apply_async(worker, args=(chunk[iChunk],))
    

    #Gather results
    if futA:
        failedDat = []
        failedSim = []
        for iChunk in range(0, len(futA)):
            resA, resB = futA[iChunk].get()
            failedDat.extend(resA)
            failedSim.extend(resB)
    pool.close()
            
    if failedDat:
        print("Following jobs failed, reducing timesteps:")
        print(failedDat)
    if failedSim:
        print("Following sims failed due to errors")
        print(failedSim) 

相关文章