无法将获取的数据发送到我的 socket.io 流?

我正在尝试从单个 mysql-queries 切换到 mysql-pool 连接,因此用户可以共享一个 mysql-connection,但我对此完全不熟悉(也是 nodejs/socket.io 的新手).

I'm trying to switch from single mysql-queries to mysql-pool connection, so users can share one mysql-connection, but I'm not familiar with this at all (also new to nodejs/socket.io).

以下代码是我到目前为止每秒向数组中的套接字发送数据的代码:

The following code is what I've done so far to send data every second to the socket in an array:

var 
port                = process.env.OPENSHIFT_NODEJS_PORT || 8000,
ip                  = process.env.OPENSHIFT_NODEJS_IP || '127.0.0.1',
app                 = require('http').createServer(handler),
fs                  = require('fs'),
request             = require('request'),
mysql               = require('mysql'),
moment              = require('moment'),
tz                  = require('moment-timezone'),
pool                = mysql.createPool({
connectionLimit:    100,
host:               'xxx',
user:               'xxx',
password:           'xxx',
database:           'xxx',
debug:              false,
port:               3306}),
socketArray         = [],
POLLING_INTERVAL    = 1000,
pollingTimer;
moment.tz.setDefault("Europe/Berlin");

var io = require('socket.io').listen(app);
io.set('origins', '*:*');

function time()
{
output = new Date();
output = moment().format('(H:mm:ss.SS) ');
return output;
}

function handler(req,res) 
    {
    res.setHeader("Access-Control-Allow-Origin", "*");
    res.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
    res.statusCode = 200;
    res.connection.setTimeout(0);
    res.end();
}
app.listen(port,ip);

function pollingLoop () {
    if (socketArray.length === 0) {
        // no connections, wait and try again
        setTimeout(pollingLoop, POLLING_INTERVAL);
        return; // continue without sending mysql query
    }
    pool.getConnection(function(err,connection){ 
    if (err) { console.log({"code" : 100, "status" : "connection-db error"}); return; }   
    console.log('connected as id ' + connection.threadId);
    console.log('socketArray length: ' + socketArray.length);

    var selection = 
        "SELECT\
        a.`id`,a.`product_id` AS pid,a.`random` AS nr,a.`price`,a.`price_end` AS pe,\
        TIMESTAMPDIFF(SECOND,NOW(),a.`datetime`) AS duration,\
        ABS(TIMESTAMPDIFF(SECOND,NOW(),b.`date`)) AS hb\
        FROM `auctions` AS a\
        LEFT JOIN `auctions_bids` AS b ON b.`auction_id` = a.`id`\
        WHERE TIMESTAMPDIFF(SECOND,NOW(),a.`datetime`) > '-1'\
        GROUP BY a.`id`\
        ORDER BY `duration` DESC,`id` DESC LIMIT 15";   
    var streamArray = [], lg = '';                      

    var query = connection.query(selection, function(err, results, rows){
    lg += ('id: '+results[0].id+' ('+results[0].duration+') ');         

    if 
    (
    ((results[0].duration < 2 || results[0].duration <= results[0].nr) && (results[0].price <= results[0].pe)) 
    ||
    ((results[0].duration < 2 || results[0].duration <= results[0].nr) && (results[0].hb > 0 && results[0].hb < 30))
    )   
    {
    min = 3;
    max = 5;
    rand = Math.floor(Math.random()*(max-min+1)+min);
    price = results[0].price+0.01;
    price = price.toFixed(2);

    pool.query('UPDATE `auctions` SET `random` = ?,`price` = ?, `datetime` = DATE_ADD(`datetime`,INTERVAL(17-TIMESTAMPDIFF(SECOND,NOW(),`datetime`))SECOND) WHERE `id` = ?',[rand, price, results[0].id]);  
    console.log(time()+'UPDATED id '+results[0].id+': random ('+rand+') price ('+price+'€)');   
    }       
    streamArray.push(results[0]);        

    updateSockets({ streamArray: streamArray });    
    console.log("auctions pushed: " + streamArray); 
    connection.release();
    setTimeout(pollingLoop, POLLING_INTERVAL);  
    });
    console.log(time()+lg+' C: '+socketArray.length);
}); 
}
pollingLoop();

io.sockets.on('connection', function(socket) {  
    socket.on('disconnect', function() {
    clearTimeout(pollingTimer);
    var socketIndex = socketArray.indexOf(socket);
    console.log(time()+'SOCKET-ID = %s DISCONNECTED', socketIndex);
    if (~socketIndex) { socketArray.splice(socketIndex, 1); }
    });  
    console.log(time()+'NEW SOCKET CONNECTED!');
    socketArray.push(socket);
}); 

var updateSockets = function(data) {
    socketArray.forEach(function(tmpSocket) { tmpSocket.volatile.emit('stream', data); });
};

console.log(time()+'server.js executed\n');

但这不会向我发送任何数据到 WebSocket.这种方法(代码结构)甚至正确吗?以前我使用 query.on('results') 来获取这样的数据:

But this doesn't send me any data to the WebSocket. Is this approach (code-structure) even correct? Previously I used query.on('results') to get data like this:

var selection = "SELECT * FROM auctions";            
var query = mysql.query(selection), auctions = []; 
query.on('result', function(auction) {
console.log('id: '+auction.id+' ('+auction.duration+') ');
});

使用 auction.row 显示数据时效果很好,但如何在我的 mysql 池连接中执行此操作?

This worked fine showing data with auction.row but how to do this in my mysql pool connection?

另外几秒钟后,我收到一个错误,说 release() 甚至没有定义,但它列在 mysql-module 文档中......所以我认为我的整个逻辑过程是不正确.

Also after some seconds I'm getting an error that release() isn't even defined, but it's listed in the mysql-module documentation... so I think my whole logical process is somehow incorrect.

  1. 我应该使用 connection.end() 和 .release() 吗?因为
    连接永远不应该结束.
  2. 我还应该使用 setInterval(function () { mysql.query('SELECT
    1');}, 5000);
    在另一个 StackOverflow 问题中回答这里的连接活着吗?(nodejs mysql错误:连接丢失服务器关闭连接)
  1. Should I use connection.end() and .release() at all? Because the
    connection should never end.
  2. Should I still use setInterval(function () { mysql.query('SELECT
    1'); }, 5000);
    as answered in another StackOverflow question to keep the connection alive here? (nodejs mysql Error: Connection lost The server closed the connection)

(感谢对我的一些问题的任何提示或答案!一些答案总比没有好,因为我体验到这个主题根本没有得到太多答案.)

(Appreciate any tips or answers to even some of my questions! Better some answers than none, because I experienced that this topic isn't answered much at all.)

更新了我的整个代码(见上文).输出现在看起来像这样:http://s21.postimg.org/avsxa87rb/output.jpg

Updated my whole code (see above). Output looks like this now: http://s21.postimg.org/avsxa87rb/output.jpg

所以流获取数据,但在console.log 中什么都没有,并且有这个javascript 错误?

So the stream gets the data, but in the console.log is nothing and there's this javascript error?

推荐答案

您应该创建一个池,并在该池上使用 getConnection.然后,当您完成连接时,释放它.此外,您不需要为每个连接停止或启动 pollingLoop,一个循环就足够了.

You should be creating a pool, and using getConnection on that pool. Then, when you're done with the connection, release it. Additionally, you do not need to stop the pollingLoop or start it for each connection, one loop is enough.

我不理解带条件的 if 语句,所以我省略了它.它可能需要去其他地方.

I didn't understand the if statement with conditions, so i omitted it. It likely needs to go somewhere else.

var socketArr = [];

function handler(req, res) {
    res.statusCode = 200;
    res.connection.setTimeout(0);
    res.end();
}
app.listen(port, ip);
var pool = mysql.createPool({
    host     : 'example.org',
    user     : 'bob',
    password : 'secret'
});

function pollingLoop () {
    if (socketArr.length === 0) {
        // no connections, wait and try again
        setTimeout(pollingLoop, 1000);
        return; // continue without sending mysql query
    }
    pool.getConnection(function (err, connection) {
        if (err) {
            console.log({
                "code": 100,
                "status": "Error in connection database"
            });
            return;
        }
        console.log('connected as id ' + connection.threadId);
        var selection = "SELECT * FROM auctions";
        var streamArray = [],
            lg = '';
        var query = connection.query(selection, function (err, results, fields, rows) {
            lg += ('id: ' + results[0].id + ' (' + results[0].duration + ') ');
            /*if (conditions) {
                var query_update = connection.query('UPDATE `auctions` SET `price` = ? WHERE `id` = ?', [price, auction.id]);
                console.log(time() + 'UPDATED id ' + auction.id + ': price (' + price + '€)');
            }*/
            streamArray.push(results);
            updateSockets({
                streamArray: streamArray
            });
            console.log("auctions pushed: " + streamArray);
            connection.release();
            setTimeout(pollingLoop, 1000);
        });
        console.log(time() + lg + ' C: ' + socketArr.length);
    });
}
// start loop
pollingLoop();

io.sockets.on('connection', function (socket) {
    socket.on('disconnect', function () {
        var socketIndex = socketArr.indexOf(socket);
        console.log(time() + 'SOCKET-ID = %s DISCONNECTED', socketIndex);
        if (~socketIndex) {
            socketArr.splice(socketIndex, 1);
        }
    });
    console.log(time() + 'NEW SOCKET CONNECTED!');
    socketArr.push(socket);
});
var updateSockets = function (data) {
    socketArr.forEach(function (tmpSocket) {
        tmpSocket.volatile.emit('stream', data);
    });
};

相关文章