当我得到结果时,Python ThreadPoolExecutor关闭所有线程

问题描述

我正在运行webscraper class谁的方法名称是self.get_with_random_proxy_using_chain

我正在尝试将多线程调用发送到同一URL,并且希望一旦有来自任何线程的结果,该方法就会返回响应并关闭其他仍在活动的线程。

到目前为止,我的代码看起来是这样的(可能很幼稚):

from concurrent.futures import ThreadPoolExecutor, as_completed
# class initiation etc

max_workers = cpu_count() * 5
urls = [url_to_open] * 50

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_url=[]
    for url in urls: # i had to do a loop to include sleep not to overload the proxy server
        future_to_url.append(executor.submit(self.get_with_random_proxy_using_chain,
                                     url,
                                     timeout,
                                     update_proxy_score,
                                     unwanted_keywords,
                                     unwanted_status_codes,
                                     random_universe_size,
                                     file_path_to_save_streamed_content))
        sleep(0.5)

    for future in as_completed(future_to_url):
            if future.result() is not None:
                return future.result()

但它运行所有线程。

有没有办法在第一个将来完成后关闭所有线程。 我使用的是Windows和Python3.7x

到目前为止,我找到了这个link,但我无法使其工作(程序仍会运行很长时间)。


解决方案

据我所知,期货不能取消运行。关于这一点已经有相当多的written。甚至还有一些变通办法。

但我建议仔细看看asyncio模块。它非常适合于这样的任务。

下面是一个简单的示例,当发出多个并发请求时,在收到第一个结果时,其余的请求将被取消。

import asyncio
from typing import Set

from aiohttp import ClientSession


async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()


async def wait_for_first_response(tasks):
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for p in pending:
        p.cancel()
    return done.pop().result()


async def request_one_of(*urls):
    tasks = set()
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.create_task(fetch(url, session))
            tasks.add(task)

        return await wait_for_first_response(tasks)


async def main():
    response = await request_one_of("https://wikipedia.org", "https://apple.com")
    print(response)

asyncio.run(main())

相关文章