NodeJ流

编程入门 行业动态 更新时间:2024-10-24 12:27:38
本文介绍了NodeJ流-内存不足的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试处理3亿行数据流.一旦达到大约500万行,我将收到致命错误:CALL_AND_RETRY_LAST分配失败-内存不足处理. (该数字因机器而异,但一直在发生.)

I'm trying to process a stream of 300 million rows of data. Once I get to around 5 million rows I am getting a FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory. (The number varies by machine but is consistent happening).

您可以运行下面的代码来查看这种情况-我无法确定流固有的代码中的问题.我试图使这一过程变得愚蠢,但我做不到.

You can run the code below to see this happening - I can't tell if the issue in the code of inherent to streams. I tried to dumb the process but I couldn't do that.

有内存限制吗?我删除了所有其他代码,并将示例抄录"下来,以确保它不是一些反压力问题.

Is there a memory limit? I removed all other code and 'dumbed' the example down to make sure it wasn't some back pressure issue.

var Readable = require('stream').Readable; var Writable = require('stream').Writable; var util = require('util'); var tenMillion = 10000000; //var tenMillion = 5000000; //THIS WORKS var writeEvery = tenMillion / 10; /* * Create a really simple stream that will run 10 million times */ function Streamo(max) { Readable.call(this, { objectMode: true }); this._currentIndex = -1; this._maxIndex = max; } util.inherits(Streamo, Readable); Streamo.prototype._read = function () { this._currentIndex += 1; if (this._currentIndex % writeEvery == 0) { console.log(this._currentIndex + ' of ' + this._maxIndex) }; if (this._currentIndex < 0 || this._currentIndex >= this._maxIndex) { console.log("BOOM") this.push(null); return; } this.push(true); }; /* * Create a really simple Writable Stream to Count */ function Counta() { Writable.call(this, { objectMode: true, highWaterMark: (200 * 1024) }); this._count = 0; } util.inherits(Counta, Writable); Counta.prototype._write = function (chunk, enc, cb) { this._count++; if (this._count % writeEvery == 0) { console.log('_______________________________' + this._count) }; cb(); }; Counta.prototype.Count = function () { return this._count; } /* * Exercise It */ var s = new Streamo(tenMillion); var c = new Counta(); s.pipe(c); c.on('finish', function () { console.log("BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM ") });

推荐答案

这是当前流实现中的已知问题.

在流文档和代码中,有多个 位置暗示了_read()应该异步

In the streams documentation and code, there are multiple places where it is alluded to that _read() should be asynchronous.

因此,如果您实际上并未在_read()实现中执行某种(异步)I/O,那么您可能需要(至少偶尔)在push()之前调用setImmediate(),以保持调用堆栈不会变得太大.例如,这可以正常工作而不会崩溃:

So if you're not actually performing (async) i/o of some kind in your _read() implementation, then you may need to (at least occasionally) call setImmediate() before push()ing, to keep the call stack from getting too large. For example, this works without crashing:

Streamo.prototype._read = function (n) { this._currentIndex += 1; if (this._currentIndex % writeEvery == 0) { console.log(this._currentIndex + ' of ' + this._maxIndex) }; if (this._currentIndex < 0 || this._currentIndex >= this._maxIndex) { console.log("BOOM") this.push(null); return; } var self = this; if (this._currentIndex % writeEvery == 0) { setImmediate(function() { self.push(true); }); } else this.push(true); };

更多推荐

NodeJ流

本文发布于:2023-11-08 15:49:40,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1569801.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:NodeJ

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!