带有批处理的Java BlockingQueue?
我对与 Java BlockingQueue 相同的数据结构感兴趣,但它必须能够批处理队列中的对象.换句话说,我希望生产者能够将对象放入队列,但让消费者阻塞 take()
直到队列达到一定大小(批量大小).
I am interested in a data structure identical to the Java BlockingQueue, with the exception that it must be able to batch objects in the queue. In other words, I would like the producer to be able to put objects into the queue, but have the consumer block on take()
untill the queue reaches a certain size (the batch size).
然后,一旦队列达到批量大小,生产者必须阻塞 put()
直到消费者消耗完队列中的所有元素(在这种情况下,生产者将开始生产再次和消费者阻塞,直到再次达到批处理).
Then, once the queue has reached the batch size, the producer must block on put()
untill the consumer has consumed all of the elements in the queue (in which case the producer will start producing again and the consumer block untill the batch is reached again).
是否存在类似的数据结构?或者我应该写它(我不介意),我只是不想浪费我的时间,如果那里有什么.
Does a similar data structure exist? Or should I write it (which I don't mind), I just don't want to waste my time if there is something out there.
更新
也许可以澄清一下:
情况总是如下.可以有多个生产者向队列中添加项目,但从队列中获取项目的消费者永远不会超过一个.
The situation will always be as follows. There can be multiple producers adding items to the queue, but there will never be more than one consumer taking items from the queue.
现在,问题在于这些设置中有多个并行和串行.换句话说,生产者为多个队列生产项目,而消费者本身也可以是生产者.这可以更容易地被认为是生产者、消费者-生产者,最后是消费者的有向图.
Now, the problem is that there are multiple of these setups in parallel and serial. In other words, producers produce items for multiple queues, while consumers in their own right can also be producers. This can be more easily thought of as a directed graph of producers, consumer-producers, and finally consumers.
生产者应该阻塞直到队列为空(@Peter Lawrey)的原因是因为它们中的每一个都将在一个线程中运行.如果您让它们在空间可用时简单地产生,您最终会遇到一种情况,即您有太多线程试图一次处理太多事情.
The reason that producers should block until the queues are empty (@Peter Lawrey) is because each of these will be running in a thread. If you leave them to simply produce as space becomes available, you will end up with a situation where you have too many threads trying to process too many things at once.
也许将其与执行服务结合可以解决问题?
Maybe coupling this with an execution service could solve the problem?
推荐答案
我建议你使用 BlockingQueue.drainTo(Collection, int).您可以将它与 take() 一起使用,以确保获得最少数量的元素.
I would suggest you use BlockingQueue.drainTo(Collection, int). You can use it with take() to ensure you get a minimum number of elements.
使用这种方法的优点是您的批量大小会随着工作负载而动态增长,并且生产者不必在消费者忙碌时进行阻塞.即它会针对延迟和吞吐量进行自我优化.
The advantage of using this approach is that your batch size grows dynamically with the workload and the producer doesn't have to block when the consumer is busy. i.e. it self optimises for latency and throughput.
要完全按照要求实现(我认为这是一个坏主意),您可以使用 SynchronousQueue 和繁忙的消费线程.
To implement exactly as asked (which I think is a bad idea) you can use a SynchronousQueue with a busy consuming thread.
即消费线程做了一个
list.clear();
while(list.size() < required) list.add(queue.take());
// process list.
只要消费者忙,生产者就会阻塞.
The producer will block when ever the consumer is busy.
相关文章