如何将一些变量绑定到具有conCurent.futures.ThreadPoolExecutor或MultiProcing.pool.ThreadPool的线程?
问题描述
我想做的事情是这样的:
class MyThread(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def run(self):
for i in range(3):
print("current id: {}".format(self._id))
def main():
ts = []
for i in range(5):
t = MyThread("localhost", 3001)
t.start()
ts.append(t)
for t in ts:
t.join()
我得到以下输出:
current id: 10
current id: 10
current id: 13
current id: 43
current id: 13
current id: 10
current id: 83
current id: 83
current id: 83
current id: 13
current id: 98
current id: 43
current id: 98
current id: 43
current id: 98
这就是我想要的输出。正如您所看到的,我的_id
在不同的线程中是不同的,但在单个线程中,我共享相同的_id
。(_id
只是这些变量之一,我还有许多其他类似的变量)。
现在,我想对multiprocessing.pool.ThreadPool
class MyProcessor():
def __init__(self, host, port):
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPool(5) as p:
p.map(MyProcessor("localhost", 3001), range(15))
但现在_id
将由所有线程共享:
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
和concurrent.futures.ThreadPoolExecutor
,我也尝试做同样的事情:
class MyProcessor():
def __init__(self, host, port):
# self.initsocket(host, port)
self._id = random.randint(0, 100)
def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
func = MyProcessor("localhost", 3001)
futures = [executor.submit(func, i) for i in range(15)]
for f in as_completed(futures):
pass
输出如下:
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
当然,我得到这个结果并不奇怪,因为我只调用了__init__
一次。但我想问的是:
如何使用concurrent.futures.ThreadPoolExecutor
和multiprocessing.pool.ThreadPool
(也请不再使用全局变量)执行相同的操作。
解决方案
这里有几个问题,我将尽最大努力解决所有这些问题。
在您给出的第一个示例中,您可以完全控制您创建的所有,因此每个线程在初始化式中都有一个唯一的ID。当然,问题是您一次启动所有线程,这对于大量线程来说可能是非常低效的。 在问题中的两个线程池示例中,您只为可调用对象初始化了一次ID,因此每个线程当然不会有单独的ID。正确的方法是在__call__
方法中为每个线程初始化一个ID:
class MyProcessor(): def __init__(self, host, port): self.initsocket(host, port) def __call__(self, i): id_ = random.randint(0, 100) print("current id: {}".format(id_)) return id_ * i def main(): func = MyProcessor("localhost", 3001) with ThreadPoolExecutor(max_workers=5) as executor: collections.deque(executor.map(MyProcessor, range(15)), maxlen=0)请注意,如果您只关心最终结果而不是中间的
Future
对象,那么您也可以通过使用map
方法来缩短concurrent.futures.ThreadPoolExecutor
示例。deque(..., maxlen=0)
调用是使用迭代器的标准用法。
考虑到您在注释中链接的要点,我理解您为什么希望拥有线程本地数据。然而,您当然不需要全局变量来实现该结果。以下是几个替代方案:
只需将您的thread-local数据添加到初始化器中的
self
,所有调用都可以访问它,而不是全局的:def __init__(self, host, port): self.thread_local = threading.local() def __call__(self, i): try: id_ = self.thread_local.id_ except AttributeError: id_ = random.randint(0, 100) ...
使用函数本地数据而不是线程本地数据。您使用的是线程本地数据,以避免将连接(在要点中)传递给一些私有函数。这并不是一种真正的需要,而是一种审美选择。您可以始终使用
def _send_data(self, conn, **kwargs)
和def _recv_data(self, conn)
,因为连接实际来自的唯一位置是__call__
。
虽然可能存在选项#1的情况,但我强烈建议您不要将其用于任何类型的线程池管理器。线程池可以重复使用相同的线程,以便从任务提交到的队列中按顺序运行任务。这意味着您将在本应打开其自身的任务中获得相同的连接。在您最初的示例中,您可以独立创建所有线程,但当您在回收的池线程上有多个MyProcessor
调用时,情况可能就不一样了。
相关文章