更改多处理中的缓冲区大小。队列

问题描述

我有一个具有生产者和消费者的系统,它们通过一个无限大的队列连接在一起,但是如果消费者重复调用GET,直到抛出空异常,它不会清除队列。

我认为这是因为一旦套接字缓冲区满了,使用者端队列中将对象序列化到套接字中的线程就会被阻塞,因此它会等待,直到缓冲区有空间,然而,消费者可能会调用Get"太快",因此它认为队列是空的,而另一端的线程实际上有更多的数据要发送,但只是不能足够快地序列化它,以防止套接字对消费者来说看起来是空的。

我相信,如果我可以更改底层套接字(我是基于Windows的)上的缓冲区大小,这个问题就会得到缓解。据我所知,我需要做的是:

import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q
如果我执行上述操作,这是否意味着当多进程初始化队列时,它将使用我在已导入的多处理连接版本中设置的新缓冲区大小?对吗?

我也认为这只会影响Windows,因为Linux机器上没有使用BUFSIZE,因为它们的所有套接字默认设置为60KB?

以前有人试过这个吗?这会对Windows产生副作用吗?Windows上套接字缓冲区大小的基本限制是什么?

=要演示的代码示例=

# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep

total_length = 10**8

def supplier(q):
    print "Starting feeder"
    for i in range(total_length) :
        q.put(i)


if __name__=="__main__":

    queue = mp.Queue()

    p = mp.Process(target=supplier, args=(queue,))

    p.start()

    sleep(120)

    returned = []
    while True :
        try :
            returned.append(queue.get(block=False))
        except Empty :
            break

    print len(returned)
    print len(returned) == total_length

    p.terminate()
    sys.exit()

此示例在Windows上运行时,通常只会从队列中拉出大约160,000个项目,因为主线程清空缓冲区的速度比供应商重新填充缓冲区的速度更快,并最终在缓冲区为空时尝试从队列中拉出并报告它为空。

理论上,您可以通过增加缓冲区大小来改善此问题。我相信,在Windows系统上,顶部的两行将增加管道的默认缓冲区大小。

如果您在中注释它们,则此脚本将在退出之前提取更多数据,因为它具有更高的。我的主要问题是: 1)这真的起作用了吗? 2)有没有办法使这段代码在Windows和Linux中使用相同大小的底层缓冲区 3)为管道设置较大的缓冲区是否有意外的副作用。

我知道,一般来说,无法知道您是否已经从队列中提取了所有数据(-考虑到供应商长期运行且数据产生非常不均匀),但我正在寻找尽最大努力改进这一点的方法。


解决方案

更新:

Windows管道的有用链接,供将来需要的人使用(链接由OP,phil_20686提供): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx

原始:

BUFSIZE仅在平台为Win32时工作。

多进程.Queue构建在管道的顶部,如果更改BUFSIZE,则生成的队列将使用更新后的值。见下图:

class Queue(object):

    def __init__(self, maxsize=0):
        if maxsize <= 0:
            maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
        self._maxsize = maxsize
        self._reader, self._writer = Pipe(duplex=False)

当平台为Win32时,管道代码将调用以下代码:

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    address = arbitrary_address('AF_PIPE')
    if duplex:
        openmode = win32.PIPE_ACCESS_DUPLEX
        access = win32.GENERIC_READ | win32.GENERIC_WRITE
        obsize, ibsize = BUFSIZE, BUFSIZE
    else:
        openmode = win32.PIPE_ACCESS_INBOUND
        access = win32.GENERIC_WRITE
        obsize, ibsize = 0, BUFSIZE

    h1 = win32.CreateNamedPipe(
        address, openmode,
        win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
        win32.PIPE_WAIT,
        1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
        )

您可以看到,当duplex为FALSE时,输出缓冲区大小为0,缓冲区内大小为BUFSIZE。

InBuffer是为输入缓冲区保留的字节数。2**16=65536,这是一次不阻塞操作可以写入的最大字节数,但缓冲区大小的容量因系统不同而不同,即使在同一系统上也不同,所以设置管道的最大值会有什么副作用很难说。

相关文章