为什么在将生成器传输到文件的NodeJS代码中,setInterval从不运行?

我在我的NodeJS代码中遇到了这种情况,它计算排列(来自here的代码),但无论如何,我没有从setInterval获得任何输出。

const { Readable } = require('stream');
const { intervalToDuration, formatDuration, format } = require('date-fns');
const { subsetPerm } = require('./permutation');

function formatLogs(counter, permStart) {
    const newLocal = new Date();
    const streamTime = formatDuration(intervalToDuration({
        end: newLocal.getTime(),
        start: permStart.getTime()
    }));
    const formattedLogs = `wrote ${counter.toLocaleString()} patterns, after ${streamTime}`;
    return formattedLogs;
}

const ONE_MINUTES_IN_MS = 1 * 60 * 1000;

let progress = 0;
let timerCallCount = 1;
let start = new Date();
const interval = setInterval(() => {
    console.log(formatLogs(progress, start));
}, ONE_MINUTES_IN_MS);

const iterStream = Readable.from(subsetPerm(Object.keys(Array.from({ length: 200 })), 5));

console.log(`Stream started on: ${format(start, 'PPPPpppp')}`)
iterStream.on('data', () => {
    progress++;
    if (new Date().getTime() - start.getTime() >= (ONE_MINUTES_IN_MS * timerCallCount)) {
        console.log(`manual timer: ${formatLogs(progress, start)}`)
        timerCallCount++;
        if (timerCallCount >= 3) iterStream.destroy();
    }
});

iterStream.on('error', err => {
    console.log(err);
    clearInterval(interval);
});

iterStream.on('close', () => {
    console.log(`closed: ${formatLogs(progress, start)}`);
    clearInterval(interval);
})

console.log('done!');

但我发现它会打印‘Done!’然后脚本似乎结束了,即使我在on('data')回调中放了一个console.log,我也会将数据打印到终端。但即使几个小时后,setInterval中的console.log也不会运行,因为除了on('close',...)的输出外,文件中没有其他内容。

输出日志如下:

> node demo.js

Stream started on: Sunday, January 30th, 2022 at 5:40:50 PM GMT+00:00
done!
manual timer: wrote 24,722,912 patterns, after 1 minute
manual timer: wrote 49,503,623 patterns, after 2 minutes
closed: wrote 49,503,624 patterns, after 2 minutes

节点指南中的计时器有一个名为‘leaving timeouts behind’的部分,它看起来很相关。但我认为使用interval.ref();告诉脚本不要垃圾回收对象,直到对相同的超时对象调用.unref(),在第二次读取时,这是不太正确的,也没有什么不同。

我使用NPM运行此文件,就像npm run noodle一样,它只指向文件。


解决方案

生成器是同步的,并阻止事件循环

Readable.from一次性处理整个生成器,因此如果生成器是同步的且长时间运行,它会阻止事件循环。

下面是它运行的带注释的代码:

async function next() {
    for (;;) {
      try {
        const { value, done } = isAsync ?
          await iterator.next() : // our generator is not asynchronous
          iterator.next();

        if (done) {
          readable.push(null); // generator not done
        } else {
          const res = (value &&
            typeof value.then === 'function') ?
            await value :
            value; // not a thenable
          if (res === null) {
            reading = false;
            throw new ERR_STREAM_NULL_VALUES();
          } else if (readable.push(res)) { // readable.push returns false if it's been paused, or some other irrelevant cases.
            continue; // we continue to the next item in the iterator
          } else {
            reading = false;
          }
        }
      } catch (err) {
        readable.destroy(err);
      }
      break;
    }
  }

这里是readable.push的接口,它解释了如何保持生成器运行:

返回:如果可以继续推送其他数据块,则返回True;否则返回False。

没有人告诉NodeJS不要继续推送数据,所以它会继续。

在每次运行事件循环之间,Node.js会检查它是否正在等待任何异步I/O或计时器,如果没有,则干脆关闭。

我将此作为NodeJs Github Issue提出,并最终选择了此解决方案:

cosnt yieldEvery = 1e5;

function setImmediatePromise() {
    return new Promise(resolve => setImmediate(resolve));
}

const iterStream = Readable.from(async function* () {
    let i = 0
    for await (const item of baseGenerator) {
        yield item;
        i++;
        if (i % yieldEvery === 0) await setImmediatePromise();
    }
}());

这在一定程度上受到了snyk.io blog的启发,它将更详细地介绍此问题。

相关文章