python线程-迭代n个任务时总是有x个活动线程

2022-01-21 00:00:00 python list multithreading queue

问题描述

我基本上想做的是以下几点:

What I basically want to do is the following:

import threading
import Queue

def test_thread(elem, q):
    q.put(elem ** 2)

a = [1,2,3,4,5,6,7,8]
q = Queue.Queue()
results = []
for x in range(8):
    print x
    threading.Thread(target=test_thread, args=(a[x], q)).start()
    results.append(q.get())

但不是一次运行所有线程,我只想并行运行 2 个线程并迭代列表.一旦一个线程完成,就应该处理列表中的下一个值.我找不到示例,也不确定如何为此构建循环.

But instead of running all threads at once I want to run only say 2 in parallel adn iterate over the list. Once one thread is done the next value from the list should be processed. I could't find an example and I am not sure how to construct the loops for this.

另外,我不明白队列的行为.我本来希望所有平方数都在队列中.但相反,只有一个值?(上面的代码已更改为将所有结果存储在结果"中).非常感谢提示、评论、关键字.

In addition, I do not understand the behavior of the Queue. I would have expected that ALL squared numbers are in the queue. But instead there is only one value? (The code above has changed to store all results in "results"). Hints, comments, keywords are highly appreciated.

第二个问题:

抱歉,我以为 q.get() 会返回所有结果.但它只是像时尚一样给出队列中的元素.

Sorry, I thought q.get() will return all the results. But it just gives the elements in a queue like fashion.


解决方案

你可以为此使用线程池:

You can use a thread pool for this:

import threading
from multiprocessing.pool import ThreadPool

def test_thread(elem):
    return elem ** 2

a = [1,2,3,4,5,6,7,8]
pool = ThreadPool(2) # 2 worker threads
results = []
for x in range(8):
    print x
    results.append(pool.apply_async(test_thread, args=(a[x],)))

results = [result.get() for result in results]
# You can also replace this for loop altogether using pool.map
# and get the same result:
# results = pool.map(test_thread, range(8))
print(results)

输出:

0
1
2
3
4
5
6
7
[1, 4, 9, 16, 25, 36, 49, 64]

ThreadPool 类是 <代码>多处理模块.它也可以通过 multiprocessing.dummy.Pool.它允许您创建一个线程池来处理任意数量的工作项,同时始终将同时处理的工作项的数量限制为您指定的数量.您可以使用普通 multiprocessing.Pool 来了解它的 API.完全一样,只是到处都是进程",你用线程"替换它.

The ThreadPool class is a mostly undocumented part of the multiprocessing module. It can also be access via multiprocessing.dummy.Pool. It allows you to create a pool of threads to handle any number of work items, while always limiting the number of work items being concurrently processed to something you specify. You can use the documentation for the normal multiprocessing.Pool to learn about its API. It's exactly the same, except everywhere it says "process", you replace it with "thread".

我不确定我是否遵循您关于 Queue.Queue 的问题的第二部分.在 for 循环的每次迭代中,您将一个项目放入 test_thread 内的 Queue 中,然后使用 results.append(q.get()).因此,虽然 Queue 中的项目一次不会超过一项,但它被用于传输最终出现在 results 列表中的所有值 - 每个值一个range(8) 列表中的项目.

I'm not sure I follow the second part of your question about Queue.Queue. On each iteration of your for loop, you're putting one item into the Queue inside test_thread, and then consuming it inside the for loop using results.append(q.get()). So while there is never more than one item in the Queue at a time, it is being used to transfer all the values that end up in the results list - one for each item in the range(8) list.

相关文章