如何在当前的Python应用程序中使用多处理?

问题描述

我有一个应用程序,它从不同的目录读取数千个文件,它读取它们,对它们进行一些处理,然后将数据发送到数据库。我有一个问题,大约需要几分钟。1小时完成1个目录中的所有文件,我有19个目录(将来可能会更多)。现在是一个接一个目录地运行,我想并行运行所有东西,这样我就可以加快速度。

这是我的代码:

import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser

config = configparser.ConfigParser()                         
config.read('C:DesktopEnergyfile_cfg.ini')

source = config['PATHS']['source']
archive = config['PATHS']['archive']

mydb = mysql.connector.connect(
        host= config['DB']['host'],
        user = config['DB']['user'],
        passwd = config['DB']['passwd'],
        database= config['DB']['database']
    )

cursor = mydb.cursor()
select_antenna = "SELECT * FROM `antenna`"
cursor.execute(select_antenna)

mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)

# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")

for mp in mp_mysql:
    if mp in mp_server:
        subdir_paths = os.path.join(source, mp)

        for file in os.listdir(subdir_paths):
            file_paths = os.path.join(subdir_paths, file)
                        
            cr_time_s = os.path.getctime(file_paths)
            cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
                        
        all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
        full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
                
        if full_file_paths != []:
            newest_file_paths = max(full_file_paths, key=os.path.getctime)
                        
            for file in all_file_paths:
                if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120: 
                    with open(file, 'rt') as f:
                        reader = csv.reader(f, delimiter ='	')
                        line_data0 = list()
                        col = next(reader)
                        
                        for line in reader:
                            line.insert(0, mp)
                            line.insert(1, cr_time)
                            if line != []: #<----- Control empty directories.
                                line_data0.append(line)
                            
                                                                                                                                              
                        q1 = ("INSERT INTO microbeats"
                                     "(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
                                     "VALUES (%s, %s, %s,%s, %s, %s, %s)")

                        for line in line_data0:
                             cursor.execute(q1, line)  

解决方案

我使用的是多进程,其中每个进程都有自己的数据库连接。我已经对您的代码进行了最小程度的更改,以尝试并行处理目录。但是,我不确定诸如subdir_paths之类的变量是否命名正确,因为其名称末尾的&q;s表示它包含多个路径名。

建议此问题更适合于Code Review的原因是,假设您有一个已经在运行的程序,并且您只是在寻找性能改进(当然,这适用于上发布的问题中的很大一部分,因此标记为multiprocessing)。该类型的问题应该发布在https://codereview.stackexchange.com/上。

import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count

config = configparser.ConfigParser()
config.read('C:DesktopEnergyfile_cfg.ini')

source = config['PATHS']['source']
archive = config['PATHS']['archive']

def get_connnection():
    mydb = mysql.connector.connect(
            host= config['DB']['host'],
            user = config['DB']['user'],
            passwd = config['DB']['passwd'],
            database= config['DB']['database']
        )
    return mydb

def get_mp_list():
    select_antenna = "SELECT * FROM `antenna`"
    mydb = get_connection()
    cursor = mydb.cursor()
    cursor.execute(select_antenna)

    mp_mysql = [i[0] for i in cursor.fetchall()]
    mp_server = os.listdir(source)

    # microbeats clean.
    cursor.execute("TRUNCATE TABLE microbeats")
    mydb.commit()
    mydb.close()

    mp_list = [mp for mp in mp_mysql if mp in mp_server]
    return mp_list

def process_mp(mp):
    subdir_paths = os.path.join(source, mp)
    for file in os.listdir(subdir_paths):
        file_paths = os.path.join(subdir_paths, file)

        cr_time_s = os.path.getctime(file_paths)
        cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))

    all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
    full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.

    if full_file_paths != []:
        newest_file_paths = max(full_file_paths, key=os.path.getctime)

        mydb = get_connection()
        cursor = mydb.cursor()
        did_insert = False
        q1 = ("INSERT INTO microbeats"
                     "(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
                     "VALUES (%s, %s, %s,%s, %s, %s, %s)")
        for file in all_file_paths:
            if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
                with open(file, 'rt') as f:
                    reader = csv.reader(f, delimiter ='	')
                    line_data0 = list()
                    col = next(reader)

                    for line in reader:
                        line.insert(0, mp)
                        line.insert(1, cr_time)
                        if line != []: #<----- Control empty directories.
                            line_data0.append(line)

                    if line_data0:
                        cursor.executemany(q1, line_data0)
                        did_insert = True
        if did_insert:
            mydb.commit()
        mydb.close()

def main():
    mp_list = get_mp_list()
    pool = Pool(min(cpu_count(), len(mp_list)))
    results = pool.imap_unordered(process_mp, mp_list)
    while True:
        try:
            result = next(results)
        except StopIteration:
            break
        except BaseException as e:
            print(e)

if __name__ == '__main__':
    main()

相关文章