什么是处理node.js变换流的背压的正确方法?

编程入门 行业动态 更新时间:2024-10-10 10:31:58
本文介绍了什么是处理node.js变换流的背压的正确方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

这是我第一次编写node.js服务器端的冒险经历。到目前为止,这只是的乐趣,但是我很难理解实现与node.js流相关的正确方法。

These are my first adventures in writing node.js server side. It's been fun so far but I'm having some difficulty understanding the proper way to implement something as it relates to node.js streams.

出于测试和学习目的,我正在处理其内容为的大型文件zlib压缩。压缩内容是二进制数据,每个 数据包的长度为38个字节。我正在尝试创建一个结果文件看起来几乎与原始文件相同,只是每1024个38字节的数据包有一个未压缩的31字节标头。

For test and learning purposes I'm working with large files whose content is zlib compressed. The compressed content is binary data, each packet being 38 bytes in length. I'm trying to create a resulting file that looks almost identical to the original file except that there is an uncompressed 31 byte header for every 1024 38 byte packets.

+----------+----------+----------+----------+ | packet 1 | packet 2 | ...... | packet N | | 38 bytes | 38 bytes | ...... | 38 bytes | +----------+----------+----------+----------+

结果文件内容

resulting file content

+----------+--------------------------------+----------+--------------------------------+ | header 1 | 1024 38 byte packets | header 2 | 1024 38 byte packets | | 31 bytes | zlib compressed | 31 bytes | zlib compressed | +----------+--------------------------------+----------+--------------------------------+

正如您所看到的,这有点像翻译问题。意思是,我是将一些源流作为输入,然后将稍微转换为某些输出流。因此,实现 转换流是很自然的。

As you can see, it's somewhat of a translation problem. Meaning, I'm taking some source stream as input and then slightly transforming it into some output stream. Therefore, it felt natural to implement a Transform stream.

该课程只是尝试完成以下任务:

The class simply attempts to accomplish the following:

  • 将流作为输入
  • zlib膨胀数据块以计算数据包数量,将其中1024个放在一起,zlib放气,预先添加标题。
  • 通过管道通过 this.push(块)传递新生成的块。
  • Takes stream as input
  • zlib inflates the chunks of data to count the number of packets, putting together 1024 of them, zlib deflating, and prepending a header.
  • Passes the new resulting chunk on through the pipeline via this.push(chunk).
  • 用例如下:

    var fs = require('fs'); var me = require('./me'); // Where my Transform stream code sits var inp = fs.createReadStream('depth_1000000'); var out = fs.createWriteStream('depth_1000000.out'); inp.pipe(me.createMyTranslate()).pipe(out);

    问题

    假设转换对于这个用例来说是一个不错的选择,我似乎是在遇到可能的背压问题。我在 _transform 中对 this.push(chunk) 的调用一直返回 false 。为什么会这样,以及如何处理这些事情?

    Question(s)

    Assuming Transform is a good choice for this use case, I seem to be running into a possible back-pressure issue. My call to this.push(chunk) within _transform keeps returning false. Why would this be and how to handle such things?

    推荐答案

    我认为转换适用于此,但我会执行膨胀作为管道中的一个单独步骤。

    I think Transform is suitable for this, but I would perform the inflate as a separate step in the pipeline.

    这是一个快速且基本未经测试的例子:

    Here's a quick and largely untested example:

    var zlib = require('zlib'); var stream = require('stream'); var transformer = new stream.Transform(); // Properties used to keep internal state of transformer. transformer._buffers = []; transformer._inputSize = 0; transformer._targetSize = 1024 * 38; // Dump one 'output packet' transformer._dump = function(done) { // concatenate buffers and convert to binary string var buffer = Buffer.concat(this._buffers).toString('binary'); // Take first 1024 packets. var packetBuffer = buffer.substring(0, this._targetSize); // Keep the rest and reset counter. this._buffers = [ new Buffer(buffer.substring(this._targetSize)) ]; this._inputSize = this._buffers[0].length; // output header this.push('HELLO WORLD'); // output compressed packet buffer zlib.deflate(packetBuffer, function(err, compressed) { // TODO: handle `err` this.push(compressed); if (done) { done(); } }.bind(this)); }; // Main transformer logic: buffer chunks and dump them once the // target size has been met. transformer._transform = function(chunk, encoding, done) { this._buffers.push(chunk); this._inputSize += chunk.length; if (this._inputSize >= this._targetSize) { this._dump(done); } else { done(); } }; // Flush any remaining buffers. transformer._flush = function() { this._dump(); }; // Example: var fs = require('fs'); fs.createReadStream('depth_1000000') .pipe(zlib.createInflate()) .pipe(transformer) .pipe(fs.createWriteStream('depth_1000000.out'));

    更多推荐

    什么是处理node.js变换流的背压的正确方法?

    本文发布于:2023-11-25 16:53:46,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1630572.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:正确   方法   node   js

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!