BrokenPipeError:[Errno 32]Python多处理

问题描述

我在做一个Web抓取项目,但处理数据花费了很多时间,我想了一个替代方法来抓取被抓取的产品的源代码,然后单独处理数据。

我所做的是,将每个产品的源代码分别存储在数组中的元组中,并将该数组数据保存在文本文件中,以供以后进一步处理。我将数据保存为10,000个产品的大块。每个文本文件大小约为10 GB。

当我开始使用多处理来处理数据时,我不断地遇到BrokenPipeError:[Error 32],最初我是在Windows机器上处理数据,我研究了一下,发现Linux在管理内存方面更好,这个错误是因为处理过程中完全使用了内存。

最初,我将处理后的数据存储在数组中(不是在运行时为每个产品保存数据),我在堆栈论坛上读到需要保存处理后的数据,由于处理后的数据占用了所有内存,我相应地更改了代码,将map更改为IMAP,尽管它运行的时间更长,但仍然得到相同的错误。

以下是我的代码,我不会发布完整的处理步骤,因为这只会增加代码长度。

要注意的是,每个产品在处理时都有大量的数组数据,每个单独的数组最多有18000个元素。

我使用的是八核处理器,内存为16 GB,固态硬盘为500 GB。

如有任何帮助,我们将不胜感激。谢谢!

import xml.etree.cElementTree as ET
from lxml import html  
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
import pathos.multiprocessing as mp
import multiprocessing
import ast

global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]


def processData(data):

    vehicalData=[]
    oemData=[]
    appendIndex=0

    #geting product link form incoming data list (tupile)
    p=data[0][1]
    #geting html source code form incoming data list(tupile)
    #converting it to html element
    source_code=html.fromstring(data[0][0])

    #processing data
    try:
        firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
        firstOem=firstOem[0].text_content().strip()
    except:
        firstOem=''
    try:
        name=source_code.xpath("//div[@id='right_title']/h1")
        name=name[0].text_content().strip()
    except:
        name=''

    #saving data in respective arrays
    vehicalData.append([firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive])
    for q in dayQtyPrice:
        vehicalData[appendIndex].append(q)
    vehicalData[appendIndex].append(specString)
    vehicalData[appendIndex].append(subAssembltString)
    vehicalData[appendIndex].append(parentAssemblyString)
    vehicalData[appendIndex].append(otherProductString)
    vehicalData[appendIndex].append(description)
    vehicalData[appendIndex].append(placement)
    for dma in makeModelArray:
        vehicalData[appendIndex].append(dma)        
    oemData.append([firstOem,name,productType,brand,mfgNumber,p])   
    for o in oemArray:
        oemData[appendIndex].append(o)

    print('Done !',p,len(vehicalData[0]),len(oemData[0]))

    #returning both arrays
    return (vehicalData,oemData)

def main():
    productLinks=[]
    vehicalData=[]
    oemData=[]
    
    #opening text file for processing list data
    with open('test.txt', encoding='utf-8') as f:
        string=f.read()

    sourceDataList=ast.literal_eval(string)
    print('Number of products:',len(sourceDataList))

    #creating pool and initiating multiprocessing
    p = mp.Pool(4)  # Pool tells how many at a time

    #opening and saving data at run time
    vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
    vehicalOutSheet=vehicalOutBook.active
    oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
    oemOutSheet=oemOutBook.active
    for d in p.imap(processData, sourceDataList):
        v=d[0][0][:18000]
        o=d[1][0][:18000]
        vehicalOutSheet.append(v)
        oemOutSheet.append(o)

    p.terminate()
    p.join()

    #saving data
    vehicalOutBook.save('vehical_data_file.xlsx')
    oemOutBook.save('oem_data_file.xlsx')

if __name__=='__main__':
    main()

解决方案

我不熟悉pathos.multiprocessing.Pool类,但让我们假设它的工作原理与multiprocess.pool.Pool类大致相同。问题是,test.txt中的数据格式似乎必须读取整个文件才能使用ast.liter_eval进行解析,因此使用imap不会节省存储空间。

有效地使用imap(或imap_unordered),而不是在文件test.txt中存储表示(JSON?)在list中,存储由换行符分隔的多个产品表示,这些换行符可以单独解析,以便可以逐行读取和解析文件,以生成单个产品。您应该大致了解需要提交给imap的行数和任务数。这样做的原因是,当您有大量任务时,使用默认的块大小参数值1以外的其他值会更有效。我在下面包含了一个函数,用于根据map函数将使用的行数计算块大小值。此外,您的辅助函数processData似乎使用了超过需要的一级嵌套列表。我还恢复了使用标准multiprocessing.pool.Pool类,因为我或多或少知道它是如何工作的。

注意:我看不到processData中变量makeModelArrayoemArray的定义位置。

import xml.etree.cElementTree as ET
from lxml import html
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
#import pathos.multiprocessing as mp
import multiprocessing
import ast

global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]


def processData(data):

    #geting product link form incoming data list (tupile)
    p=data[0][1]
    #geting html source code form incoming data list(tupile)
    #converting it to html element
    source_code=html.fromstring(data[0][0])

    #processing data
    try:
        firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
        firstOem=firstOem[0].text_content().strip()
    except:
        firstOem=''
    try:
        name=source_code.xpath("//div[@id='right_title']/h1")
        name=name[0].text_content().strip()
    except:
        name=''

    #saving data in respective arrays
    vehicalData = [firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive]
    for q in dayQtyPrice:
        vehicalData,append(q)
    vehicalData,append(specString)
    vehicalData.append(subAssembltString)
    vehicalData.append(parentAssemblyString)
    vehicalData.append(otherProductString)
    vehicalData.append(description)
    vehicalData.append(placement)
    for dma in makeModelArray:
        vehicalData.append(dma)
    oemData = [firstOem,name,productType,brand,mfgNumber,p]
    for o in oemArray:
        oemData.append(o)

    #print('Done !',p,len(vehicalData),len(oemData))

    #returning both arrays
    return (vehicalData,oemData)

def generate_source_data_list():
    #opening text file for processing list data
    with open('test.txt', encoding='utf-8') as f:
        for line in f:
            # data for just one product:
            yield ast.literal_eval(line)

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize


def main():
    #creating pool and initiating multiprocessing
    # use pool size equal to number of cores you have:
    pool_size = multiprocessing.cpu_count()
    # Approximate number of elements generate_source_data_list() will yield:
    NUM_TASKS = 100_000 # replace with actual number
    p = multiprocessing.Pool(pool_size)
    chunksize = compute_chunksize(NUM_TASKS, pool_size)

    #opening and saving data at run time
    vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
    vehicalOutSheet=vehicalOutBook.active
    oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
    oemOutSheet=oemOutBook.active
    for d in p.imap(processData, generate_source_data_list(), chunksize=chunksize):
        v = d[0][:18000]
        o = d[1][:18000]
        vehicalOutSheet.append(v)
        oemOutSheet.append(o)

    p.terminate()
    p.join()

    #saving data
    vehicalOutBook.save('vehical_data_file.xlsx')
    oemOutBook.save('oem_data_file.xlsx')

if __name__=='__main__':
    main()

最终的电子表格仍需要大量存储空间!现在,如果您输出两个csv文件,情况就不同了--您可以边写边写。

相关文章