数组的Node js流"/>
带有数组的Node js流
第一次使用节点流,我正在尝试将数组流式传输到Algolia。 Algolia提供的示例使用json文件。 /?language=javascript#example
我尝试对数组进行字符串化,然后像Algolia示例所描述的那样使用它。我不确定什么是最好的方法。我应该对数组进行字符串化,还是需要遍历数组并将其推入流中?后一种方法是否仍使用fs?这将在firebase函数上运行,因此存在资源限制。
const algoliasearch = require('algoliasearch')
const fs = require('fs');
const StreamArray = require('stream-json/streamers/StreamArray');
const client = algoliasearch('999999', '999999');
//const index = client.initIndex('d_DASH');
const index = client.initIndex('t_DASH');
exports.dashStream = async function (listings) {
let jsdoc = JSON.stringify(listings);
const stream = fs.createReadStream(jsdoc).pipe(StreamArray.withParser());
let chunks = [];
stream
.on('data', ({ value }) => {
console.log("on data...")
chunks.push(value);
if (chunks.length === 10000) {
stream.pause();
index
.saveObjects(chunks)
.then(res => {
chunks = [];
stream.resume();
})
.catch(err => console.error(err));
}
})
.on('end', () => {
console.log("on end...")
if (chunks.length) {
console.log(`stream over?`)
index.saveObjects(chunks,function (err, content){
return content.taskID.toString();
})
.catch(err => console.error(err));
}
})
.on('error', err => console.error(err));
}
代码需要完成对Algolia的写入并从Algolia响应中返回taskID。
回答如下:这为时已晚,但也许会对其他人有所帮助。我建议使用其他流和管道
buffer.stream.ts
将数据收集成大块并将其推送到写入流的流
import { Transform } from "stream";
// Using transform stream to collect your data into chunks
export class BufferStream extends Transform {
private readonly buffer: object[] = [];
constructor() {
super({ objectMode: true });
}
_transform(data: object, _encoding: string, callback: () => void) {
this.buffer.push(data);
// You chunk size goes here. I find myself usually using a 1000
if (this.buffer.length >= 3) {
this.push(this.buffer.splice(0));
}
callback();
}
_final(callback: () => void) {
// Pushing leftovers
this.push(this.buffer);
callback();
}
}
write.stream.ts
将处理写入目的地的流
在对流使用异步回调时必须格外小心
import { Writable } from 'stream';
export class WriteStream extends Writable {
constructor() {
super({ objectMode: true });
}
async _write(chunk: object[], _encoding: string, callback: () => void) {
// You have to handle your errors yourself in an asynchronous callback
try {
// await save(chunk);
console.log('Chunk:', chunk)
callback();
} catch (error) {
// nextTick to escape current stack
process.nextTick(() => this.emit('error', error));
}
}
}
全部粘在一起
import stream, { PassThrough } from "stream";
import { promisify } from "util";
import { BufferStream } from "./buffer.stream";
import { WriteStream } from "./write.stream";
const pipelineAsync = promisify(stream.pipeline);
(async () => {
// Imitating a stream with some data
const stream = new PassThrough({ objectMode: true });
for (let i = 0; i < 10; i++) {
stream.push(i);
}
stream.end();
await pipelineAsync(stream, new BufferStream(), new WriteStream());
})();
输出
Chunk: [ 0, 1, 2 ]
Chunk: [ 3, 4, 5 ]
Chunk: [ 6, 7, 8 ]
Chunk: [ 9 ] <-- Last chunk
根据我的经验,它可以自行处理背压。您不需要调用stream.pause()和.resume()。从总体上看,它看起来更干净
更多推荐
带有数组的Node js流
发布评论