NodeJS进程在处理大量记录后挂起

编程入门 行业动态 更新时间:2024-10-05 21:21:28

NodeJS进程在处理大量记录后<a href=https://www.elefans.com/category/jswz/34/1771174.html style=挂起"/>

NodeJS进程在处理大量记录后挂起

我有一个带有以下代码片段的节点 JS 进程。我正在尝试在内存 (transactionObj) 中缓存大量交易(大约 45 万)。

 const transactionObj = {};
    const transactionCursor = target_db.collection("transaction").find();
     processTxns = 0; 
    for await (const transactionDoc of transactionCursor) {
      processTxns++;
      if (transactionDoc.orderId && transactionDoc.orderId[0]) {
        const orderIdString = String(transactionDoc.orderId[0].oid);
        if (!orderIdString) continue;
        const existingDoc = transactionObj[orderIdString];
        if (!existingDoc || existingDoc._id < transactionDoc._id) {
          transactionObj[orderIdString] = transactionDoc;
        }
      }
      console.log("Process Txns: "+processTxns)
    }

当我执行此代码时,进程在处理一定数量的记录后挂起,每次大约 16K 或 20k 不同。 我尝试使用 Map 但行为相同。我也试过传递论点

node --max-old-space-size=6144

但问题依然存在。此代码在具有 32GB RAM 的 Windows 服务器上运行。代码运行时我没有看到大量内存使用(低于 50%)

回答如下:

如果这些操作是内存密集型操作,您应该分块处理它们,一种选择是在查询中实现分页并获取有限数量的事务并进行处理。


const CHUNK_SIZE = 10;

async function* readChunksFromDB() {
  const numberOfTransactions = await target_db
    .collection("transaction")
    .count();
  const numberOfChunks = Math.ceil(numberOfTransactions / CHUNK_SIZE);

  for (let i = 0; i < numberOfChunks; i++) {
    const chunk = await target_db
      .collection("transaction")
      .find({ take: CHUNK_SIZE, skip: i * CHUNK_SIZE });

    yield chunk;
  }
}

async function processChunks() {
  const transactionObj = {};
  processTxns = 0;
  for await (const chunk of readChunksFromDB()) {
    for (const transactionDoc of chunk) {
      processTxns++;
      if (transactionDoc.orderId && transactionDoc.orderId[0]) {
        const orderIdString = String(transactionDoc.orderId[0].oid);
        if (!orderIdString) continue;
        const existingDoc = transactionObj[orderIdString];
        if (!existingDoc || existingDoc._id < transactionDoc._id) {
          transactionObj[orderIdString] = transactionDoc;
        }
      }
      console.log("Process Txns: " + processTxns);
    }
  }
}

或者使用具有自动背压机制的管道的 Streams。 Streams 是 NodeJS 推荐的用于管理大量操作数据的解决方案:

const { Readable, Transform } = require("stream");

const transactionsStream = Readable.from(readChunksFromDB());
const processTransactionStream = new Transform({
  transform(chunk, encoding, callback) {
    const transactions = JSON.parse(chunk.toString());
    console.log(transactions);
    transactions
      .map((transactionDoc) => {
        if (transactionDoc.orderId && transactionDoc.orderId[0]) {
          const orderIdString = String(transactionDoc.orderId[0].oid);
          if (!orderIdString) {
            return transactionDoc;
          }

          const existingDoc = transactionObj[orderIdString];
          if (!existingDoc || existingDoc._id < transactionDoc._id) {
            transactionObj[orderIdString] = transactionDoc;
          }

          return transactionDoc;
        }
      })
      .forEach((transaction) => {
        console.log(transaction);
        this.push(JSON.stringify(transaction));
      });

    callback();
  },
});

transactionsStream.pipe(processTransactionStream).on("data", (chunk) => {
    console.log(chunk.toString())
});

在这两种情况下,您都必须管理不同的阵列以进行重复数据删除,但您将在性能方面获得很多。

https://blog.appsignal/2022/02/02/use-streams-to-build-high-performing-nodejs-applications.html

更多推荐

NodeJS进程在处理大量记录后挂起

本文发布于:2024-05-30 20:05:37,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1770871.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:挂起   进程   NodeJS

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!