同时运行具有多个参数的函数并聚合复杂结果

问题描述

设置

这是我发布的有关访问多个进程的结果的问题的第二部分。
有关第一部分,请单击此处:Link to Part One

我有一个复杂的数据集,需要同时与各种约束集进行比较,但我遇到了多个问题。第一个问题是从我的多个进程中获得结果,第二个问题是使任何东西都不能只是一个极其简单的函数来并发运行。

示例

我有多个约束集需要与某些数据进行比较,我希望同时执行此操作,因为我有很多约束集。在本例中,我将只使用两组约束。

Jupyter笔记本

创建一些约束和数据示例

# Create a set of constraints
constraints = pd.DataFrame([['2x2x2', 2,2,2],['5x5x5',5,5,5],['7x7x7',7,7,7]],
                     columns=['Name','First', 'Second', 'Third'])
constraints.set_index('Name', inplace=True)

# Create a second set of constraints
constraints2 = pd.DataFrame([['4x4x4', 4,4,4],['6x6x6',6,6,6],['7x7x7',7,7,7]],
                      columns=['Name','First', 'Second', 'Third'])
constraints2.set_index('Name', inplace=True)

# Create some sample data
items = pd.DataFrame([['a', 2,8,2],['b',5,3,5],['c',7,4,7]], columns=['Name','First', 'Second', 'Third'])
items.set_index('Name', inplace=True)

按顺序运行

如果我按顺序运行它,我可以得到我想要的结果,但对于我实际处理的数据,它可能需要12个小时以上。下面是按顺序运行的结果,这样您就可以知道我想要的结果是什么。

# Function
def seq_check_constraint(df_constraints_input, df_items_input):
    df_constraints = df_constraints_input.copy()
    df_items = df_items_input.copy()
    
    df_items['Product'] = df_items.product(axis=1)
    df_constraints['Product'] = df_constraints.product(axis=1)
    
    for constraint in df_constraints.index:
        df_items[constraint+'Product'] = df_constraints.loc[constraint,'Product']
        
    for constraint in df_constraints.index:
        for item in df_items.index:
                col_name = constraint+'_fits'
                df_items[col_name] = False
                df_items.loc[df_items['Product'] < df_items[constraint+'Product'], col_name] = True
    
    df_res = df_items.iloc[:: ,7:]
    return df_res
constraint_sets = [constraints, constraints2, ...]
results = {}
counter = 0

for df in constrain_sets:
    res = seq_check_constraint(df, items)
    results['constraints'+str(counter)] = res

或更丑:

df_res1 = seq_check_constraint(constraints, items)
df_res2 = seq_check_constraint(constraints2, items)

results = {'constraints0':df_res1, 'constraints1': df_res2}
作为按顺序运行这些命令的结果,我最终得到如下所示的:

我最终希望得到DataFrame的词典或列表,或者能够将DataFrame全部追加在一起。我获得结果的顺序对我来说并不重要,我只希望将它们全部放在一起,并需要能够对它们进行进一步分析。

我尝试的内容

这就引出了我对多处理的尝试,据我所知,您可以使用队列或管理器来处理共享数据和内存,但我两者都不能工作。我还在努力获取我的函数,该函数接受两个参数在Pool中执行。

以下是我的代码,目前使用的是上面的相同示例数据:

函数

def check_constraint(df_constraints_input, df_items_input):
    df_constraints = df_constraints_input.copy()
    df_items = df_items_input.copy()
    
    df_items['Product'] = df_items.product(axis=1)  # Mathematical Product
    df_constraints['Product'] = df_constraints.product(axis=1)
    
    for constraint in df_constraints.index:
        df_items[constraint+'Product'] = df_constraints.loc[constraint,'Product']
        
    for constraint in df_constraints.index:
        for item in df_items.index:
                col_name = constraint+'_fits'
                df_items[col_name] = False
                df_items.loc[df_items['Product'] < df_items[constraint+'Product'], col_name] = True
    
    df_res = df_items.iloc[:: ,7:]
    return df_res

Jupyter笔记本

df_manager = mp.Manager()
df_ns = df_manager.Namespace()
df_ns.constraint_sets = [constraints, constraints2]


print('---Starting pool---')

if __name__ == '__main__':
    with mp.Pool() as p:
        print('--In the pool--')
        res = p.map_async(mpf.check_constraint, (df_ns.constraint_sets, itertools.repeat(items)))
        print(res.get())

和我当前的错误:

TypeError: check_constraint() missing 1 required positional argument: 'df_items_input'


解决方案

最简单的方法是创建一个元组列表(其中一个元组表示函数的一组参数)并将其传递给starmap

df_manager = mp.Manager()
df_ns = df_manager.Namespace()
df_ns.constraint_sets = [constraints, constraints2]


print('---Starting pool---')

if __name__ == '__main__':
    with mp.Pool() as p:
        print('--In the pool--')
        check_constraint_args = []
        for constraint in constraint_sets:
            check_constraint_args.append((constraint, items))
        res = p.starmap(mpf.check_constraint, check_constraint_args)
        print(res.get())

相关文章