通信队列结束

2022-01-21 00:00:00 python multithreading queue

问题描述

我正在学习使用队列模块,但对于如何让队列消费者线程知道队列已完成有点困惑.理想情况下,我想在消费者线程中使用 get() ,如果队列被标记为完成",则让它抛出异常.有没有比附加一个标记值来标记队列中的最后一项更好的方法来传达这一点?

I'm learning to use the Queue module, and am a bit confused about how a queue consumer thread can be made to know that the queue is complete. Ideally I'd like to use get() from within the consumer thread and have it throw an exception if the queue has been marked "done". Is there a better way to communicate this than by appending a sentinel value to mark the last item in the queue?


解决方案

原创(大部分已更改;请参阅下面的更新)

基于 建议(谢谢!)/users/136829/glenn-maynard">Glenn Maynard 和其他人,我决定汇总实现 close 方法的 Queue.Queue 的后代.它以原始(未打包)模块的形式提供.当我有更多时间时,我会清理一下并妥善包装它.目前该模块仅包含 CloseableQueue 类和 Closed 异常类.我计划将其扩展为还包括 Queue.LifoQueueQueue.PriorityQueue 的子类.

original (most of this has changed; see updates below)

Based on some of the suggestions (thanks!) of Glenn Maynard and others, I decided to roll up a descendant of Queue.Queue that implements a close method. It's available in the form of a primitive (unpackaged) module. I'll clean this up a bit and package it properly when I have a bit more time. For now the module only contains the CloseableQueue class and the Closed exception class. I'm planning to expand it to also include subclasses of Queue.LifoQueue and Queue.PriorityQueue.

它目前处于相当初步的状态,也就是说,虽然它通过了它的测试套件,但我还没有真正将它用于任何事情.你的旅费可能会改变.我会通过令人振奋的消息不断更新这个答案.

It's in a pretty preliminary state currently, which is to say that although it passes its test suite, I haven't actually used it for anything yet. Your mileage may vary. I'll keep this answer updated with exciting news.

CloseableQueue 类与 Glenn 的建议稍有不同,因为关闭队列会阻止未来的 put,但不会阻止未来的 get直到队列被清空.这对我来说最有意义;似乎可以将清除队列的功能添加为与可关闭性功能正交的单独的 mixin*.所以基本上使用CloseableQueue,通过关闭队列表明最后一个元素已被put.还有一个选项可以通过将 last=True 传递给最终的 put 调用来自动执行此操作.队列清空后对 put 的后续调用,以及对 get 的后续调用,以及与这些描述匹配的未完成的阻塞调用,将引发 Closed 例外.

The CloseableQueue class differs a bit from Glenn's suggestion in that closing the queue will prevent future puts, but not prevent future gets until the queue is emptied. This made the most sense to me; it seemed like functionality to clear the queue could be added as a separate mixin* that would be orthogonal to the closeability functionality. So basically with CloseableQueue, by closing the queue you indicate that the last element has been put. There's also an option to do this atomically by passing last=True to the final put call. Subsequent calls to put, and subsequent calls to get once the queue is emptied, as well as outstanding blocked calls matching those descriptions, will raise the Closed exception.

这对于单个生产者为一个或多个消费者生成数据的情况非常有用,但对于消费者正在等待特定项目或一组项目的多方安排也很有用.特别是它没有提供一种方法来确定所有生产者是否都已完成生产.要想实现这一点,需要提供某种方式来注册生产者(.open()?),以及一种表明生产者注册本身已关闭的方法.

This is mostly useful for situations where a single producer is generating data for one or more consumers, but it could also be useful for a multi-multi arrangement where consumers are waiting for a particular item or set of items. In particular it doesn't provide a way to determine that all of a number of producers have finished production. Getting that working would entail the provision of some way to register producers (.open()?), as well as a way to indicate that producer registration is itself closed.

非常欢迎提出建议和/或代码审查.我还没有写很多并发代码,但希望测试套件足够彻底,代码通过它的事实表明代码的质量,而不是套件缺乏质量.我能够重用队列模块的测试套件中的一堆代码:文件本身包含在这个模块中,并用作各种子类和例程的基础,包括回归测试.这可能(希望)有助于避免测试部门完全无能.代码本身只是覆盖了 Queue.getQueue.put 并进行了相当小的更改,并添加了 closeclosed 方法.

Suggestions and/or code reviews are quite welcome. I haven't written a whole lot of concurrency code, but hopefully the test suite is thorough enough that the fact that the code passes it is an indication of the code's quality, rather than the suite's lack thereof. I was able to reuse a bunch of the code from the Queue module's test suite: the file itself is included in this module and used as a basis for various subclasses and routines, including regression testing. This probably (hopefully) helped to avoid complete ineptitude in the testing department. The code itself just overrides Queue.get and Queue.put with fairly minimal changes, and adds the close and closed methods.

我有意避免在代码本身和测试套件中使用任何新奇的东西,如上下文管理器,以保持代码与 Queue 模块本身一样向后兼容,这相当重要确实倒退了.我可能会在某个时候添加 __enter____exit__ 方法;否则,contextlib 的 closure 函数应该适用于 CloseableQueue 实例.

I've sort of intentionally avoided using any new-fangled fanciness like context managers in both the code itself and in the test suite in an effort to keep the code as backwards-compatible as is the Queue module itself, which is considerably backwards indeed. I'll probably add __enter__ and __exit__ methods at some point; otherwise, the contextlib's closing function should be applicable to a CloseableQueue instance.

*:这里我松散地使用术语mixin".由于 Queue 模块的类是旧式的,因此需要使用类工厂函数来混合混合;有一些限制;在 Guido 禁止的情况下提供无效.

*: Here I use the term "mixin" loosely. As the Queue module's classes are old-style, mixins would need to be mixed using class factory functions; some restrictions apply; offer void where prohibited by Guido.

CloseableQueue 模块现在提供 CloseableLifoQueueCloseablePriorityQueue类.我还添加了一些便利功能来支持迭代.仍然需要将其作为适当的包进行返工.有一个类工厂函数可以方便地对其他 Queue.Queue 派生类进行子类化.

The CloseableQueue module now provides CloseableLifoQueue and CloseablePriorityQueue classes. I've also added some convenience functions to support iteration. Still need to rework it as a proper package. There's a class factory function to allow for convenient subclassing of other Queue.Queue-derived classes.

CloseableQueue 现在可以通过 PyPI 获得,例如与

CloseableQueue is now available via PyPI, e.g. with

$ easy_install CloseableQueue

欢迎评论和批评,尤其是来自此答案的匿名投票者.

Comments and criticism are welcome, especially from this answer's anonymous downvoter.

相关文章