我的NodeJS项目中有多个公牛队列,如果执行了先前的队列,这些队列将运行成功地. 我正在尝试在此处验证一些电子邮件地址.
I have multiple Bull Queues in my NodeJS project which will run if previous queue is executed successfully. I'm trying to verify some email addresses here.
检查电子邮件格式(formatQueue)
Check the Email format (formatQueue)
使用npm email-existence程序包(existenceQueue)的电子邮件存在 p>
Email Existence using npm email-existence package (existenceQueue)
formatQueue花费的时间更少,可以运行RegEx并验证Email格式.但是email-existence程序包大约需要5到10秒才能完成.
The formatQueue is less time taking process, which wil run the RegEx and validate the Email format. but The email-existence package takes around 5-10 seconds to complete.
formatQueue和existenceQueue可以正常工作.但是当我一次添加超过1000个作业时,existenceQueue失败并显示以下错误
formatQueue and existenceQueue works properly if there are less jobs like 20-100. but when I Add more than that around 1000 jobs at a time, existenceQueue failes with below error
myemail@email job stalled more than allowable limit我检查了问题这里和这里,我认为该过程花费的时间太长了,因此在这里.但这对我没有帮助.
I checked the issue HERE and HERE, I thought the process is taking too long to respond, so added limiter as refered HERE. But that does not help me.
如果任何队列中的作业失败,则它不处理下一个作业.它将在那里停止,其他作业将保持在waiting状态.
If a job in any of the queue fails, Its not processing the next job. It will stop there and the other jobs will stay in waiting state.
我的代码类似于以下代码.请帮助我解决这个问题.
My code is something similar to below code. please help me with the issue.
Queue.js
var formatQueue = new Queue('format', "redis-db-url"); var existenceQueue = new Queue('existence', "redis-db-url"); // ------------ function for adding to queue ------------ module.exports.addToQueue = (emails) => { emails.forEach(element => { formatQueue.add(element, { attempts: 3, backoff: 1000 }); }); } // ------------ Queue Process ------------- // Format Test Process formatQueue.process(function(job, done){ FormatTest.validate(job.data, (err, data) => { if(err) done(); else{ job.data = data; done(); } }); }); // Existence Test Process formatQueue.process(function(job, done){ ExistenceTest.validate(job.data, (err, data) => { if(err) done(); else{ job.data = data; done(); } }); }); // ------------ On Cmplete Handlers ------------ formatQueue.on('completed', function(job){ if(job.data.is_well_format){ existenceQueue.add(job.data, { attempts: 3, backoff: 1000 }); }else QueueModel.lastStep(job.data) }); existenceQueue.on('completed', function(job){ QueueModel.lastStep(job.data) }); // ------------ To update the emaile ------------ module.exports.lastStep = (data) => { Emails.updateEmail(data, (err, updated) => { if(!err) { formatQueue.clean('completed'); existenceQueue.clean('completed'); } }) }---------更新---------
处理器花费了太多时间来响应,因此由于我正在使用超时,因此作业变得stalled或失败.
The processor was taking too much time to respond so the job was getting stalled or getting failed since i was using timeout.
我试图在bull文档中的其他processor文件itsef中运行process,我已如下添加文件.
I'm trying to run the process in different processor file itsef as its in bull documentation, I've added the file as below.
// -------- Queue.js ---------- formatQueue.process(__dirname+"/processors/format-worker.js"); // On Cmplete Handler formatQueue.on('completed', function(job, result){ console.log(result, "Format-Complete-job"); // result is undefined if(job.data.is_well_format){ existenceQueue.add(job.data, { attempts: 3, backoff: 1000 }); }else QueueModel.lastStep(job.data) }); // -------- Queue.js ends --------- //format-worker.js Validator = require("../../validators"); module.exports = (job) => { Validator.Format.validate(job.data, (data) => { job.data = data; return Promise.resolve(data); }); }现在我以前使用的Job Complete上完成了,我曾经使用更新的Job参数来获取Job数据.现在,我没有更新的工作数据.文档中的第二个参数,即result是undefined. 现在,在这种情况下,如何获取更新的作业数据.
Now On Job complete which i was using before, I used to get job data with updated job parameters. Now I'm not getting updated job data. and the second parameter which is there in the documentation i.e result is undefined. Now how can I get the updated job data in this case.
推荐答案尝试可重复的工作
var formatQueue = new Queue('format', "redis-db-url"); var existenceQueue = new Queue('existence', "redis-db-url"); // ------------ function for adding to queue ------------ module.exports.addToQueue = (emails) => { emails.forEach(element => { let jobOptions = { repeat: { every: 10 * 1000, // Run job every 10 seconds for example limit: 3 // Maximum number of times a job can repeat. }, jobId: someUniqueId, // important do not forget this removeOnComplete: true, // removes job from queue on success (if required) removeOnFail: true // removes job from queue on failure (if required) } formatQueue.add(element, jobOptions); }); } // ------------ Queue Process ------------- // Format Test Process formatQueue.process(function (job, done) { FormatTest.validate(job.data, (err, data) => { if (err) { // Done with error done(true); } else { job.data = data; // Done without any error done(false); } }); }); // Existence Test Process existenceQueue.process(function (job, done) { ExistenceTest.validate(job.data, (err, data) => { if (err) { // Done with error done(true); } else { job.data = data; // Done without any error done(false); } }); }); // ------------ On Complete Handlers ------------ formatQueue.on('completed', function (job) { if (job.data.is_well_format) { let jobOptions = { repeat: { every: 10 * 1000, // Run job every 10 seconds for example limit: 3 // Maximum number of times a job can repeat. }, jobId: someUniqueId, // important do not forget this removeOnComplete: true, // removes job from queue on success (if required) removeOnFail: true // removes job from queue on failure (if required) } existenceQueue.add(job.data, jobOptions); } else QueueModel.lastStep(job.data) }); existenceQueue.on('completed', function (job) { QueueModel.lastStep(job.data) }); // ------------ To update the email ------------ module.exports.lastStep = (data) => { Emails.updateEmail(data, (err, updated) => { if (!err) { formatQueue.clean('completed'); existenceQueue.clean('completed'); } }) }更多推荐
NodeJS Bull停止作业上的队列作业失败
发布评论