ZeroMQ 是否有数据到达时的通知/回调事件/消息?

2022-01-12 00:00:00 sockets multithreading zeromq c++ mfc

我正在尝试将 ZMQ 集成到严重依赖 MFC 套接字 (CASyncSocket) 的现有 Windows 应用程序中.

I am trying to integrate ZMQ into an existing windows application that relies heavily on MFC sockets (CASyncSocket).

我有一个 CWinThread 派生的 UI 线程(没有 GUI),它使用 CAsyncSocket 与服务器异步通信.我想添加一条 ZMQ inproc 通信线路来处理从服务器接收的数据(基于 REQ/REP)与应用程序中的其他线程的通信.

I've got a CWinThread derived UI thread (without a GUI) that communicates with a server asynchronously using CAsyncSocket. I would like to add a ZMQ inproc communication line to handle communicating the data received from the server (on a REQ/REP basis) to other threads within the application.

使用 CAsyncSocket,只要在要接收的套接字上有新数据可用时,MFC 框架就会调用 OnReceive 方法(这可能是对核心 MFC 专家的过度简化).

Using CAsyncSocket, the OnReceive method is called by the MFC framework whenever new data is available on the socket to be received (that might be an over-simplification to the hardcore MFC gurus out there).

ZMQ 有这样的机制吗?或者我是否必须添加一个额外的专用 WorkerThread,由 UI 线程启动以处理我与应用程序其余部分的 ZMQ 通信?两条管道上的流量都很小,所以我真的不想创建 2 个单独的线程,如果我可以用 1 个.

Is there any such mechanism in ZMQ? Or do I have to add an additional dedicated WorkerThread that the UI thread launches to handle my ZMQ communications to the rest of the app? The traffic on both pipelines is minimal so I really don't want to have to create 2 separate threads if I can get by with 1.

请注意,我已经掌握了基础知识,只是在同步方面遇到了问题.如果我对 ZMQ 使用阻塞接收/发送,它会耗尽我的 CAsycSocket,因为 Windows 消息永远不会被线程处理,导致有时永远不会从 ZMQ 应该传递的服务器获取数据.但是如果我使用非阻塞 ZMQ 调用,那么线程经常会处于空闲状态,因为它不知道读取 ZMQ 套接字.

Note, I've got the basics working, I'm just having problems with synchronization. If I use blocking recv/send with ZMQ, it starves out my CAsycSocket because the windows messages never get processed by the thread resulting in sometimes never getting the data from the server that the ZMQ is supposed to be delivering. But if I use non-blocking ZMQ calls, then the thread frequently ends up sitting idle because it doesn't know to read off the ZMQ socket.

推荐答案

最终,答案是否.当前没有关于数据何时到达您可以链接到的 ZeroMQ 的回调/通知.我也找不到任何添加此功能的分支.

Ultimately, the answer is no. There are no current callback/notifications for when data arrives in ZeroMQ that you can link into. I was also unable to find any fork that adds this functionality.

在单个线程中使用 MFC 套接字框架提供的传统 OnReceive 调用并添加第二个线程专用于 ZMQ 时,我无法让 ZMQ 工作,这破坏了使用它的全部目的(它被用于线程同步).

I was unable to get ZMQ working while using the traditional OnReceive calls provide by the MFC socket framework within a single thread and adding a 2nd thread to dedicate to ZMQ defeated the whole purpose for using it (it is being used for thread synchronization).

我的实现最终放弃了 MFC 套接字,并为我的 inproc 服务器(用于与其他线程通信)以及我的 TCP(非 ZMQ)服务器连接使用 ZMQ,并使用阻塞轮询调用 (zmq_poll())在 OnIdle() 方法中(每次返回 1 以创建一个繁忙的循环).阻塞民意调查

My implementation that works ended up dropping MFC sockets and using ZMQ for both my inproc server (for communicating with other thread) as well as my TCP (non-ZMQ) server connection and using a blocking polling call (zmq_poll()) in the OnIdle() method (returning 1 every time to create a busy loop). The blocking poll

BOOL CMyThreaClass::OnIdle(LONG lCount)
{
    UNREFERENCED_PARAMETER(lCount);

    zmq_pollitem_t items [] = {
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
    };
    const int iZMQInfiniteTimeout(-1);
    iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
    TRACE("zmq_poll result: %d
", iResult);

    if (items[0].revents & ZMQ_POLLIN)
    {
        sMyStruct sMessage;
        iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
        TRACE("inproc recv result: %d
", iResult);
        //  Process inproc messages
        iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
        TRACE("inproc send result: %d
", iResult);
    }
    if (items[1].revents & ZMQ_POLLIN)
    {
        // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
        uint8_t id [256];
        size_t id_size = 256;
        iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
        TRACE("getsockopt poll result %d:id %d
", iResult, id);
        iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
        // now get our actual data
        char szBuffer[1024];
        int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
        if (iBytesReceived > 0)
        {
            // process TCP data
        }
    }
}

注意:此答案需要使用 ZMQ 4 或更高版本,因为早期版本的 ZMQ 不会与常规 TCP 套接字连接进行通信.

Note: This answer requires using ZMQ 4 or later since earlier versions of ZMQ will not communicate with a regular TCP socket connection.

相关文章