NodeJS Streams"/>
NodeJS Streams
我有以下问题:
我在管道中使用 NodeJS 流来读取大文件,进行一些转换,然后将其写入可写流。棘手的部分是我希望能够在满足特定条件时停止读取文件,但仍然完成处理已经读取的块。
在下面的示例中,我正在读取一个文本文件,我的转换流(secondStream)将文本转换为大写并发送到下一个流。但是如果它找到一个文本,那就意味着它应该停止从文本文件中读取,我认为这意味着读取流应该停止读取块。
我尝试了几种解决方案,但不会说我在这里有点困惑。到目前为止,我得到了以下代码。但是,通过使用
firstStream.destroy();
使管道抛出错误
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
我能够通过在管道上捕获并忽略它来“避免”这个错误,但老实说,这对我来说听起来不安全或不正确。
const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");
let shouldStop = false;
const firstStream = fs.createReadStream("./lg.txt");
const secondStream = new Transform({
transform(chunk, encoding, callback) {
const foundText = chunk.toString().search("CHAPTER 9") !== -1;
if (foundText) {
shouldStop = true;
}
const transformed = chunk.toString().toUpperCase();
callback(null, transformed);
},
});
const lastStream = process.stdout;
firstStream.on("data", () => {
if (shouldStop) {
console.log("should pause");
firstStream.destroy();
}
});
await pipeline(firstStream, secondStream, lastStream).catch(
(err) => undefined
); // Feels wrong to me
有什么更好的方法吗?我错过了什么吗?
提前谢谢各位朋友!
回答如下:在您的转换流中,您可以“吃掉”或“跳过”找到目标文本之后的任何数据。这样,你可以保留所有其他
pipeline()
逻辑。它不会立即终止,而是只会读取到输入流的末尾,但会跳过目标文本之后的所有数据。这允许流正常完成。
const secondStream = new Transform({
transform(chunk, encoding, callback) {
if (shouldStop) {
// eat any remaining data
callback(null, "");
} else {
const text = chunk.toString();
const foundText = text.search("CHAPTER 9") !== -1;
if (foundText) {
// set flag to eat remaining data
shouldStop = true;
}
callback(null, text.toUpperCase());
}
},
});
pipeline()
函数还支持中止控制器,这是一种中止管道的支持方法,同时仍然适当地清理所有内容。当你中止时,pipeline()
将以拒绝的承诺结束,但你可以检查拒绝是否是因为你的中止,如果是,你可以得到你的中止消息。
在您的代码中,可以这样实现:
const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");
const firstStream = fs.createReadStream("./lg.txt");
const ac = new AbortController();
const signal = ac.signal;
const secondStream = new Transform({
transform(chunk, encoding, callback) {
const text = chunk.toString();
const foundText = text.search("CHAPTER 9") !== -1;
callback(null, text.toUpperCase());
if (foundText) {
ac.abort(new Error("reading terminated, match found"));
}
},
});
const lastStream = process.stdout;
pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
console.log("\nall done without match");
}).catch((err) => {
if (err.code === "ABORT_ERR") {
console.log(`\n${signal.reason.message}`);
} else {
console.log(err);
}
});
注意: 在另一个主题上,您的代码容易受到跨块边界的搜索字符串的攻击,因此不会被检测到。避免该问题的通常方法是在运行匹配搜索之前保留每个块的最后 N 个字符并将其添加到下一个块之前,其中 N 是搜索字符串的长度 - 1。这确保您不会错过搜索跨越块的字符串。您将不得不调整输出以不包含前置文本。由于这不是您问题的症结所在,我没有添加该逻辑并将其留给您,但这是可靠匹配所必需的。
更多推荐
NodeJS Streams
发布评论