NodeJS Bull停止作业上的队列作业失败

编程入门 行业动态 更新时间:2024-10-23 21:28:16
本文介绍了NodeJS Bull停止作业上的队列作业失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我的NodeJS项目中有多个公牛队列,如果执行了先前的队列,这些队列将运行成功地. 我正在尝试在此处验证一些电子邮件地址.

I have multiple Bull Queues in my NodeJS project which will run if previous queue is executed successfully. I'm trying to verify some email addresses here.

  • 检查电子邮件格式(formatQueue)

  • Check the Email format (formatQueue)

    使用npm email-existence程序包(existenceQueue)的电子邮件存在 p>

  • Email Existence using npm email-existence package (existenceQueue)

    formatQueue花费的时间更少,可以运行RegEx并验证Email格式.但是email-existence程序包大约需要5到10秒才能完成.

    The formatQueue is less time taking process, which wil run the RegEx and validate the Email format. but The email-existence package takes around 5-10 seconds to complete.

    formatQueue和existenceQueue可以正常工作.但是当我一次添加超过1000个作业时,existenceQueue失败并显示以下错误

    formatQueue and existenceQueue works properly if there are less jobs like 20-100. but when I Add more than that around 1000 jobs at a time, existenceQueue failes with below error

    myemail@email job stalled more than allowable limit

    我检查了问题这里和这里,我认为该过程花费的时间太长了,因此在这里.但这对我没有帮助.

    I checked the issue HERE and HERE, I thought the process is taking too long to respond, so added limiter as refered HERE. But that does not help me.

    如果任何队列中的作业失败,则它不处理下一个作业.它将在那里停止,其他作业将保持在waiting状态.

    If a job in any of the queue fails, Its not processing the next job. It will stop there and the other jobs will stay in waiting state.

    我的代码类似于以下代码.请帮助我解决这个问题.

    My code is something similar to below code. please help me with the issue.

    Queue.js

    var formatQueue = new Queue('format', "redis-db-url"); var existenceQueue = new Queue('existence', "redis-db-url"); // ------------ function for adding to queue ------------ module.exports.addToQueue = (emails) => { emails.forEach(element => { formatQueue.add(element, { attempts: 3, backoff: 1000 }); }); } // ------------ Queue Process ------------- // Format Test Process formatQueue.process(function(job, done){ FormatTest.validate(job.data, (err, data) => { if(err) done(); else{ job.data = data; done(); } }); }); // Existence Test Process formatQueue.process(function(job, done){ ExistenceTest.validate(job.data, (err, data) => { if(err) done(); else{ job.data = data; done(); } }); }); // ------------ On Cmplete Handlers ------------ formatQueue.on('completed', function(job){ if(job.data.is_well_format){ existenceQueue.add(job.data, { attempts: 3, backoff: 1000 }); }else QueueModel.lastStep(job.data) }); existenceQueue.on('completed', function(job){ QueueModel.lastStep(job.data) }); // ------------ To update the emaile ------------ module.exports.lastStep = (data) => { Emails.updateEmail(data, (err, updated) => { if(!err) { formatQueue.clean('completed'); existenceQueue.clean('completed'); } }) }

    ---------更新---------

    处理器花费了太多时间来响应,因此由于我正在使用超时,因此作业变得stalled或失败.

    The processor was taking too much time to respond so the job was getting stalled or getting failed since i was using timeout.

    我试图在bull文档中的其他processor文件itsef中运行process,我已如下添加文件.

    I'm trying to run the process in different processor file itsef as its in bull documentation, I've added the file as below.

    // -------- Queue.js ---------- formatQueue.process(__dirname+"/processors/format-worker.js"); // On Cmplete Handler formatQueue.on('completed', function(job, result){ console.log(result, "Format-Complete-job"); // result is undefined if(job.data.is_well_format){ existenceQueue.add(job.data, { attempts: 3, backoff: 1000 }); }else QueueModel.lastStep(job.data) }); // -------- Queue.js ends --------- //format-worker.js Validator = require("../../validators"); module.exports = (job) => { Validator.Format.validate(job.data, (data) => { job.data = data; return Promise.resolve(data); }); }

    现在我以前使用的Job Complete上完成了,我曾经使用更新的Job参数来获取Job数据.现在,我没有更新的工作数据.文档中的第二个参数,即result是undefined. 现在,在这种情况下,如何获取更新的作业数据.

    Now On Job complete which i was using before, I used to get job data with updated job parameters. Now I'm not getting updated job data. and the second parameter which is there in the documentation i.e result is undefined. Now how can I get the updated job data in this case.

    推荐答案

    尝试可重复的工作

    var formatQueue = new Queue('format', "redis-db-url"); var existenceQueue = new Queue('existence', "redis-db-url"); // ------------ function for adding to queue ------------ module.exports.addToQueue = (emails) => { emails.forEach(element => { let jobOptions = { repeat: { every: 10 * 1000, // Run job every 10 seconds for example limit: 3 // Maximum number of times a job can repeat. }, jobId: someUniqueId, // important do not forget this removeOnComplete: true, // removes job from queue on success (if required) removeOnFail: true // removes job from queue on failure (if required) } formatQueue.add(element, jobOptions); }); } // ------------ Queue Process ------------- // Format Test Process formatQueue.process(function (job, done) { FormatTest.validate(job.data, (err, data) => { if (err) { // Done with error done(true); } else { job.data = data; // Done without any error done(false); } }); }); // Existence Test Process existenceQueue.process(function (job, done) { ExistenceTest.validate(job.data, (err, data) => { if (err) { // Done with error done(true); } else { job.data = data; // Done without any error done(false); } }); }); // ------------ On Complete Handlers ------------ formatQueue.on('completed', function (job) { if (job.data.is_well_format) { let jobOptions = { repeat: { every: 10 * 1000, // Run job every 10 seconds for example limit: 3 // Maximum number of times a job can repeat. }, jobId: someUniqueId, // important do not forget this removeOnComplete: true, // removes job from queue on success (if required) removeOnFail: true // removes job from queue on failure (if required) } existenceQueue.add(job.data, jobOptions); } else QueueModel.lastStep(job.data) }); existenceQueue.on('completed', function (job) { QueueModel.lastStep(job.data) }); // ------------ To update the email ------------ module.exports.lastStep = (data) => { Emails.updateEmail(data, (err, updated) => { if (!err) { formatQueue.clean('completed'); existenceQueue.clean('completed'); } }) }

    更多推荐

    NodeJS Bull停止作业上的队列作业失败

    本文发布于:2023-10-15 00:59:31,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1492772.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:作业   队列   NodeJS   Bull

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!