Node.js:一次接收太多 UDP 消息,丢失它们

2022-01-22 00:00:00 sockets node.js udp express javascript

我的节点服务器在一秒钟内接收到大约 400 条 UDP 消息,这一切正常工作,我能够处理全部 400 条.

但是,当我开始在一秒钟内收到大约 700 条 UDP 消息时,我丢失了 2-20 条消息,并且它们永远不会被解析:(p>

我在这里考虑了一些选项:

  1. 创建所有socket消息的队列,然后一个一个消费,虽然我不确定如何实现这个
    • 不知道如何实现
  2. 在 Node/Express/dgram 套接字中找到一个设置,我可以在其中增加内存大小/缓冲区大小,类似这样
    • 不过,我找不到这样的设置 :(
  3. 使用不同的 UDP 接收器,停止使用节点内置的套接字 UDP 接收器
    • 未找到其他接收者

这是我的 UDP 发件人的样子:

var dgram = require("dgram");var udpserver = dgram.createSocket("udp4");var seatStateStore = require("./SeatStateStore");udpserver.on("消息",功能(味精,rinfo){seatStateStore.parseMessage(msg.toString());});

有人有什么想法吗?我无法弄清楚这 3 个选项中的任何一个:/有人可以帮我吗?

节点 v0.10.29

Express v3.14.0

=================================

更新/解决方案

这是我最终使用的代码(稍微修改了@RoyHB 的解决方案):

var dgram = require("dgram");var udpserver = dgram.createSocket("udp4");var seatStateStore = require("./SeatStateStore");var Dequeue = require('dequeue');var FIFO = new Dequeue();提取器();udpserver.on("消息",功能(味精,rinfo){FIFO.push(msg.toString());});udpserver.bind(43278);函数提取器(){而 (FIFO.length > 0){var msg = FIFO.shift();seatStateStore.parseMessage(msg);}setImmediate(fetcher);//让这个函数连续运行}

解决方案

有一个 NPM 模块叫 node-出队.我经常使用它来处理与您类似的情况.

基本上,

  1. 您的程序将收到的消息推送到队列的末尾.
  2. 间隔计时器会定期激活另一个方法或函数(队列提取器),该方法或函数会检查队列中是否有消息,如果有,则提取一条或多条消息并进行处理.
  3. 或者(也许更好)不使用计时器来安排队列提取.而是使用节点 process.nextTick 方法.

或者,也许更好的是,您可以使用节点 process.nextTick 不断检查队列中的消息.

理想情况下,seatStateStore.parseMessage 会创建一个新对象来异步处理一条消息,以便 parseMessage 不会延迟返回,而实际消息处理在后台继续.(见示例代码底部)

下面的代码我没有测试过,是为了说明,不是为了运行

var FIFO = require ('dequeue');var seatStateStore = require("./SeatStateStore");var dgram = 要求(dgram");setInterval(fetcher, 1);var udpserver = dgram.createSocket("udp4");udpserver.on("消息",功能(味精,rinfo){FIFO.push(味精);});函数提取器(){while (FIFO.length > 0) {var msg = FIFO.shift();seatStateStore.parseMessage(msg);}}

** 或者(也许更好)**

var FIFO = require ('dequeue');var seatStateStore = require("./SeatStateStore");var dgram = 要求(dgram");提取器();var udpserver = dgram.createSocket("udp4");udpserver.on("消息",功能(味精,rinfo){FIFO.push(味精);});函数提取器(){while (FIFO.length > 0) {var msg = FIFO.shift();seatStateStore.parseMessage(msg);process.nextTick(fetcher);}}

seatStateProcessor.parseMessage 概要:

seatStateProcessor.parseMessage = function (msg) {proc = new asyncProcHandler(msg, function (err) {如果(错误){//处理错误}});}

My node server receives about 400 UDP messages in one second, and it all works, and I am able to process all 400 of them.

However, when I start to receive about 700 UDP messages in one second, I lose 2-20 of the messages, and they never get parsed :(

I have thought about some options here:

  1. Create a queue of all the socket messages, then consume one-by-one, although I'm not sure how to implement this
    • Can't figure out how to implement
  2. Find a setting in Node / Express / dgram socket where i can increase the memory size / buffer size, something like that
    • I couldn't find any settings like this, though :(
  3. Use a different UDP receiver, stop using node's build in socket UDP receiver
    • Didn't find other receivers

Here's what my UDP sender looks like:

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");

udpserver.on("message",
        function (msg, rinfo)
        {
        seatStateStore.parseMessage(msg.toString());
    });

Anyone have any ideas? I couldn't figure out any of the 3 options :/ Can someone help me out?

Node v0.10.29

Express v3.14.0

===============================

UPDATE / SOLUTION

Here's the code I ended up using (slightly modified @RoyHB 's solution):

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();

fetcher();

udpserver.on("message",
        function (msg, rinfo)
        {
           FIFO.push(msg.toString());
        });

udpserver.bind(43278);

function fetcher () {
    while (FIFO.length > 0) 
    {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
    setImmediate(fetcher); //make this function continuously run
}

解决方案

There is a NPM module called node-dequeue. I use it a lot for similar situations to yours.

basically,

  1. your program pushes received messages onto the end of the queue.
  2. an interval timer periodically activates another method or function ( a queue-fetcher) which checks to see if there are messages on the queue and if so, fetches one or more and processes it.
  3. Alternatively (maybe better) no timer is used to schedule queue fetches. Instead the node process.nextTick method is used.

Alternatively, maybe preferably, you can use node process.nextTick to continuously check the queue for messages.

Ideally, seatStateStore.parseMessage would create a new object to asynchronously process one message so that parseMessage returns without delay while the actual message processing continues in the background. (see bottom of example code )

I haven't tested the code below, it's meant to illustrate, not to run

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

setInterval(fetcher, 1);

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
}

** OR (maybe better) **

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

fetcher();

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
        process.nextTick(fetcher);
    }
}

Outline of seatStateProcessor.parseMessage:

seatStateProcessor.parseMessage = function (msg) {
    proc = new asyncProcHandler(msg, function (err) {
        if (err) {
            //handle the error
        }
    });
}

相关文章