为什么在将生成器传输到文件的NodeJS代码中,setInterval从不运行?
我在我的NodeJS代码中遇到了这种情况,它计算排列(来自here的代码),但无论如何,我没有从setInterval
获得任何输出。
const { Readable } = require('stream');
const { intervalToDuration, formatDuration, format } = require('date-fns');
const { subsetPerm } = require('./permutation');
function formatLogs(counter, permStart) {
const newLocal = new Date();
const streamTime = formatDuration(intervalToDuration({
end: newLocal.getTime(),
start: permStart.getTime()
}));
const formattedLogs = `wrote ${counter.toLocaleString()} patterns, after ${streamTime}`;
return formattedLogs;
}
const ONE_MINUTES_IN_MS = 1 * 60 * 1000;
let progress = 0;
let timerCallCount = 1;
let start = new Date();
const interval = setInterval(() => {
console.log(formatLogs(progress, start));
}, ONE_MINUTES_IN_MS);
const iterStream = Readable.from(subsetPerm(Object.keys(Array.from({ length: 200 })), 5));
console.log(`Stream started on: ${format(start, 'PPPPpppp')}`)
iterStream.on('data', () => {
progress++;
if (new Date().getTime() - start.getTime() >= (ONE_MINUTES_IN_MS * timerCallCount)) {
console.log(`manual timer: ${formatLogs(progress, start)}`)
timerCallCount++;
if (timerCallCount >= 3) iterStream.destroy();
}
});
iterStream.on('error', err => {
console.log(err);
clearInterval(interval);
});
iterStream.on('close', () => {
console.log(`closed: ${formatLogs(progress, start)}`);
clearInterval(interval);
})
console.log('done!');
但我发现它会打印‘Done!’然后脚本似乎结束了,即使我在on('data')
回调中放了一个console.log,我也会将数据打印到终端。但即使几个小时后,setInterval
中的console.log也不会运行,因为除了on('close',...)
的输出外,文件中没有其他内容。
输出日志如下:
> node demo.js
Stream started on: Sunday, January 30th, 2022 at 5:40:50 PM GMT+00:00
done!
manual timer: wrote 24,722,912 patterns, after 1 minute
manual timer: wrote 49,503,623 patterns, after 2 minutes
closed: wrote 49,503,624 patterns, after 2 minutes
节点指南中的计时器有一个名为‘leaving timeouts behind’的部分,它看起来很相关。但我认为使用interval.ref();
告诉脚本不要垃圾回收对象,直到对相同的超时对象调用.unref()
,在第二次读取时,这是不太正确的,也没有什么不同。
我使用NPM运行此文件,就像npm run noodle
一样,它只指向文件。
解决方案
生成器是同步的,并阻止事件循环
Readable.from一次性处理整个生成器,因此如果生成器是同步的且长时间运行,它会阻止事件循环。
下面是它运行的带注释的代码:
async function next() {
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() : // our generator is not asynchronous
iterator.next();
if (done) {
readable.push(null); // generator not done
} else {
const res = (value &&
typeof value.then === 'function') ?
await value :
value; // not a thenable
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) { // readable.push returns false if it's been paused, or some other irrelevant cases.
continue; // we continue to the next item in the iterator
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
break;
}
}
这里是readable.push的接口,它解释了如何保持生成器运行:
返回:如果可以继续推送其他数据块,则返回True;否则返回False。
没有人告诉NodeJS不要继续推送数据,所以它会继续。
在每次运行事件循环之间,Node.js会检查它是否正在等待任何异步I/O或计时器,如果没有,则干脆关闭。
我将此作为NodeJs Github Issue提出,并最终选择了此解决方案:
cosnt yieldEvery = 1e5;
function setImmediatePromise() {
return new Promise(resolve => setImmediate(resolve));
}
const iterStream = Readable.from(async function* () {
let i = 0
for await (const item of baseGenerator) {
yield item;
i++;
if (i % yieldEvery === 0) await setImmediatePromise();
}
}());
这在一定程度上受到了snyk.io blog的启发,它将更详细地介绍此问题。
相关文章