如何在当前的Python应用程序中使用多处理?
问题描述
我有一个应用程序,它从不同的目录读取数千个文件,它读取它们,对它们进行一些处理,然后将数据发送到数据库。我有一个问题,大约需要几分钟。1小时完成1个目录中的所有文件,我有19个目录(将来可能会更多)。现在是一个接一个目录地运行,我想并行运行所有东西,这样我就可以加快速度。
这是我的代码:
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
config = configparser.ConfigParser()
config.read('C:DesktopEnergyfile_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
cursor = mydb.cursor()
select_antenna = "SELECT * FROM `antenna`"
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
for mp in mp_mysql:
if mp in mp_server:
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter =' ')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
q1 = ("INSERT INTO microbeats"
"(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for line in line_data0:
cursor.execute(q1, line)
解决方案
我使用的是多进程,其中每个进程都有自己的数据库连接。我已经对您的代码进行了最小程度的更改,以尝试并行处理目录。但是,我不确定诸如subdir_paths
之类的变量是否命名正确,因为其名称末尾的&q;s表示它包含多个路径名。
建议此问题更适合于Code Review的原因是,假设您有一个已经在运行的程序,并且您只是在寻找性能改进(当然,这适用于上发布的问题中的很大一部分,因此标记为multiprocessing
)。该类型的问题应该发布在https://codereview.stackexchange.com/上。
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count
config = configparser.ConfigParser()
config.read('C:DesktopEnergyfile_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
def get_connnection():
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
return mydb
def get_mp_list():
select_antenna = "SELECT * FROM `antenna`"
mydb = get_connection()
cursor = mydb.cursor()
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
mydb.commit()
mydb.close()
mp_list = [mp for mp in mp_mysql if mp in mp_server]
return mp_list
def process_mp(mp):
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
mydb = get_connection()
cursor = mydb.cursor()
did_insert = False
q1 = ("INSERT INTO microbeats"
"(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter =' ')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
if line_data0:
cursor.executemany(q1, line_data0)
did_insert = True
if did_insert:
mydb.commit()
mydb.close()
def main():
mp_list = get_mp_list()
pool = Pool(min(cpu_count(), len(mp_list)))
results = pool.imap_unordered(process_mp, mp_list)
while True:
try:
result = next(results)
except StopIteration:
break
except BaseException as e:
print(e)
if __name__ == '__main__':
main()
相关文章