数据,直到id被改变?"/>
收集流中的数据,直到id被改变?
我有管线的流。
pipeline(readStream, transformStream, writeStream);
readStream传递给transformStream(在每个 "数据 "事件中) 像这样的对象。
{
id: 1,
name: 'John',
phone: 1000000,
}
我需要将电话存储在transform流中,直到该对象的id被改变, 然后我应该将这样的对象推送到流缓冲区。
{
id: 1,
name: 'John',
phones: [1000000, 1000001, 1000002],
}
所以在转换流后的数组:
[
{
id: 1,
name: 'John',
phone: 1000000,
},
{
id: 1,
name: 'John',
phone: 1000001,
},
{
id: 2,
name: 'Ray',
phone: 1000002,
},
{
id: 3,
name: 'Santa',
phone: 1000003,
},
]
变换流后的数组将是:
[
{
id: 1,
name: 'John',
phones: [1000000, 1000001],
},
{
id: 2,
name: 'Ray',
phones: [1000002],
},
{
id: 3,
name: 'Santa',
phones: [1000003],
},
]
我如何实现这个?
回答如下:这是一个简单的例子。主要的想法是跟踪之前的条目块,并将其与当前的条目进行比较,以决定我们是否应该更新的 phones
属性或将数据推送到下一个流。
const streamify = require('stream-array'); // using this package for testing purposes only
const Stream = require('stream');
const input = [
{
id: 1,
name: 'John',
phone: 1000000,
},
{
id: 1,
name: 'John',
phone: 1000001,
},
{
id: 2,
name: 'Ray',
phone: 1000002,
},
{
id: 3,
name: 'Santa',
phone: 1000003,
},
];
function createObjectMergeStream() {
let previousEntry = null;
return new Stream.Transform({
writableObjectMode: true,
transform: transformFunc,
flush(callback) {
callback(null, JSON.stringify(previousEntry)); // stringifying for demonstration only
}
});
function transformFunc(currentEntry, encoding, callback){
if (previousEntry === null) {
// if this is the first the stream is receiving
previousEntry = {
id: currentEntry.id,
name: currentEntry.name,
phones: [currentEntry.phone]
}
callback();
}
else if (previousEntry.id === currentEntry.id) {
// if the id's match, only update the phones array
previousEntry.phones.push(currentEntry.phone);
callback();
}
else {
// if this entry does not match the id of the previous entry
// stringifying for demonstration only
const output = JSON.stringify(previousEntry) + '\n';
previousEntry = {
id: currentEntry.id,
name: currentEntry.name,
phones: [currentEntry.phone]
}
callback(null, output);
}
}
}
// turn the `input` array into a readable stream
const inputStream = streamify(input);
// create our transform stream
const transformStream = createObjectMergeStream();
// take objects from input array, process the objects in our transform stream then print them to the console
inputStream.pipe(transformStream).pipe(process.stdout);
输出
{"id":1,"name":"John","phones":[1000000,1000001]}
{"id":2,"name":"Ray","phones":[1000002]}
{"id":3,"name":"Santa","phones":[1000003]}
更多推荐
收集流中的数据,直到id被改变?
发布评论