挂起"/>
NodeJS进程在处理大量记录后挂起
我有一个带有以下代码片段的节点 JS 进程。我正在尝试在内存 (transactionObj) 中缓存大量交易(大约 45 万)。
const transactionObj = {};
const transactionCursor = target_db.collection("transaction").find();
processTxns = 0;
for await (const transactionDoc of transactionCursor) {
processTxns++;
if (transactionDoc.orderId && transactionDoc.orderId[0]) {
const orderIdString = String(transactionDoc.orderId[0].oid);
if (!orderIdString) continue;
const existingDoc = transactionObj[orderIdString];
if (!existingDoc || existingDoc._id < transactionDoc._id) {
transactionObj[orderIdString] = transactionDoc;
}
}
console.log("Process Txns: "+processTxns)
}
当我执行此代码时,进程在处理一定数量的记录后挂起,每次大约 16K 或 20k 不同。 我尝试使用 Map 但行为相同。我也试过传递论点
node --max-old-space-size=6144
但问题依然存在。此代码在具有 32GB RAM 的 Windows 服务器上运行。代码运行时我没有看到大量内存使用(低于 50%)
回答如下:如果这些操作是内存密集型操作,您应该分块处理它们,一种选择是在查询中实现分页并获取有限数量的事务并进行处理。
const CHUNK_SIZE = 10;
async function* readChunksFromDB() {
const numberOfTransactions = await target_db
.collection("transaction")
.count();
const numberOfChunks = Math.ceil(numberOfTransactions / CHUNK_SIZE);
for (let i = 0; i < numberOfChunks; i++) {
const chunk = await target_db
.collection("transaction")
.find({ take: CHUNK_SIZE, skip: i * CHUNK_SIZE });
yield chunk;
}
}
async function processChunks() {
const transactionObj = {};
processTxns = 0;
for await (const chunk of readChunksFromDB()) {
for (const transactionDoc of chunk) {
processTxns++;
if (transactionDoc.orderId && transactionDoc.orderId[0]) {
const orderIdString = String(transactionDoc.orderId[0].oid);
if (!orderIdString) continue;
const existingDoc = transactionObj[orderIdString];
if (!existingDoc || existingDoc._id < transactionDoc._id) {
transactionObj[orderIdString] = transactionDoc;
}
}
console.log("Process Txns: " + processTxns);
}
}
}
或者使用具有自动背压机制的管道的 Streams。 Streams 是 NodeJS 推荐的用于管理大量操作数据的解决方案:
const { Readable, Transform } = require("stream");
const transactionsStream = Readable.from(readChunksFromDB());
const processTransactionStream = new Transform({
transform(chunk, encoding, callback) {
const transactions = JSON.parse(chunk.toString());
console.log(transactions);
transactions
.map((transactionDoc) => {
if (transactionDoc.orderId && transactionDoc.orderId[0]) {
const orderIdString = String(transactionDoc.orderId[0].oid);
if (!orderIdString) {
return transactionDoc;
}
const existingDoc = transactionObj[orderIdString];
if (!existingDoc || existingDoc._id < transactionDoc._id) {
transactionObj[orderIdString] = transactionDoc;
}
return transactionDoc;
}
})
.forEach((transaction) => {
console.log(transaction);
this.push(JSON.stringify(transaction));
});
callback();
},
});
transactionsStream.pipe(processTransactionStream).on("data", (chunk) => {
console.log(chunk.toString())
});
在这两种情况下,您都必须管理不同的阵列以进行重复数据删除,但您将在性能方面获得很多。
https://blog.appsignal/2022/02/02/use-streams-to-build-high-performing-nodejs-applications.html
更多推荐
NodeJS进程在处理大量记录后挂起
发布评论