关于使用 Queue()/deque() 和类变量进行通信和“毒丸"的进程与线程

问题描述

我想创建一个在 While True 循环中永远运行的线程或进程.

I would like to create either a Thread or a Process which runs forever in a While True loop.

我需要以队列的形式向工作人员发送和接收数据,无论是 multiprocessing.Queue() 还是 collections.deque().我更喜欢使用 collections.deque(),因为它明显更快.

I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.

我还需要最终能够杀死工作人员(因为它在 while True 循环中运行.这是我整理的一些测试代码,以尝试了解线程、进程、队列和 deque 之间的区别..

I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..

import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque

class ThreadingTest(Thread):

    def __init__(self, q):
        super(ThreadingTest, self).__init__()
        self.q = q
        self.toRun = False

    def run(self):
        print("Started Thread")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Thread deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Thread Queue: " + str(i))

    def stop(self):
        print("Trying to stop Thread")
        self.toRun = False
        while self.isAlive():
            time.sleep(0.1)
        print("Stopped Thread")

class ProcessTest(Process):

    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.toRun = False
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Process deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Process Queue: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.toRun = False
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    q = Queue()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        if type(q) == type(deque()):
            q.append(i)
        elif type(q) == type(Queue()):
            q.put_nowait(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    if type(q) == type(deque()):
        print(q)
    elif type(q) == type(Queue()):
        while q.qsize() > 0:
            print(str(q.get_nowait()))

如您所见,t1 可以是 ThreadingTest 或 ProcessTest.此外,传递给它的队列可以是 multiprocessing.Queue 或 collections.deque.

As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.

ThreadingTest 与 Queue 或 deque() 一起使用.当调用 stop() 方法时,它也会正确终止 run().

ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.

Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])

ProcessTest 只有在队列类型为 multiprocessing.Queue 时才能从队列中读取.它不适用于 collections.deque.此外,我无法使用 stop() 终止进程.

ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().

Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process

我想知道为什么?另外,在进程中使用双端队列的最佳方法是什么?而且,我将如何使用某种 stop() 方法来终止进程.

I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.


解决方案

你不能使用 collections.deque 在两个 multiprocessing.Process 实例之间传递数据,因为 collections.deque 不是进程感知的.multiprocessing.Queue 在内部将其内容写入 multiprocessing.Pipe,这意味着其中的数据可以在一个进程中排队并在另一个进程中检索.collections.deque 没有那种管道,所以它不会工作.当您在一个进程中写入 deque 时,另一个进程中的 deque 实例完全不会受到影响;它们是完全独立的实例.

You can't use a collections.deque to pass data between two multiprocessing.Process instances, because collections.deque is not process-aware. multiprocessing.Queue writes its contents to a multiprocessing.Pipe internally, which means that data in it can be enqueued in once process and retrieved in another. collections.deque doesn't have that kind of plumbing, so it won't work. When you write to the deque in one process, the deque instance in the other process won't be affected at all; they're completely separate instances.

您的 stop() 方法也发生了类似的问题.您正在主进程中更改 toRun 的值,但这根本不会影响子进程.它们是完全独立的实例.结束孩子的最好方法是向 Queue 发送一些哨兵.当你在child中获得哨兵时,跳出无限循环:

A similar issue is happening to your stop() method. You're changing the value of toRun in the main process, but this won't affect the child at all. They're completely separate instances. The best way to end the child would be to send some sentinel to the Queue. When you get the sentinel in the child, break out of the infinite loop:

def run(self):
    print("Started Process")
    self.toRun = True
    while self.toRun:
        if type(self.q) == type(deque()):
            if self.q:
                i = self.q.popleft()
                print("Process deque: " + str(i))
        elif type(self.q) == type(Queue()):
            if not self.q.empty():
                i = self.q.get_nowait()
                if i is None:  
                    break  # Got sentinel, so break
                print("Process Queue: " + str(i))

def stop(self):
    print("Trying to stop Process")
    self.q.put(None)  # Send sentinel
    while self.is_alive():
        time.sleep(0.1)
    print("Stopped Process")

如果你确实需要两个进程之间的 deque 语义,你可以使用 自定义 multiprocessing.Manager() 以在 Manager 进程中创建共享 deque,以及您的每个 Process 实例都会获得一个 Proxy:

If you actually do need deque semantics between two process, you can use a custom multiprocessing.Manager() to create a shared deque in a Manager process, and each of your Process instances will get a Proxy to it:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque

SyncManager.register('deque', deque)

def Manager():
    m = SyncManager()
    m.start()
    return m

class ProcessTest(Process):
    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if self.q._getvalue():
                i = self.q.popleft()
                if i is None:
                    break
                print("Process deque: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.q.append(None)
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    m = Manager()
    q = m.deque()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        q.append(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    print(q)

请注意,这可能不会比 multiprocessing.Queue 快,因为每次访问 deque 都会产生 IPC 成本.对于以您的方式传递消息来说,它也是一种不太自然的数据结构.

Note that this probably isn't going to be faster than a multiprocessing.Queue, though, since there's an IPC cost for every time you access the deque. It's also a much less natural data structure for passing messages the way you are.

相关文章