RabbitMQ等待多个队列完成

编程入门 行业动态 更新时间:2024-10-24 04:29:58
本文介绍了RabbitMQ等待多个队列完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

好,这是正在发生的事情的概述:

Ok here is an overview of what's going on:

M <-- Message with unique id of 1234 | +-Start Queue | | | <-- Exchange /|\ / | \ / | \ <-- bind to multiple queues Q1 Q2 Q3 \ | / <-- start of the problem is here \ | / \ | / \|/ | Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start | C <-- Consumer

所以我有一个交换,它推送到多个队列,每个队列都有一个任务,一旦所有任务完成,队列4才能启动.

So I have an exchange that pushes to multiple queues, each queue has a task, once all tasks are completed, only then can Queue 4 start.

因此,唯一ID为1234的消息被发送到交换机,当消息ID 1234的所有任务都已完成时,交换机将其路由到所有任务队列(Q1,Q2,Q3等)邮件ID为1234的第4季度.

So message with unique id of 1234 gets sent to the exchange, the exchange routes it to all the task queues ( Q1, Q2, Q3, etc... ), when all the tasks for message id 1234 have completed, run Q4 for message id 1234.

我该如何实现?

使用Symfony2,RabbitMQBundle和RabbitMQ 3.x

Using Symfony2, RabbitMQBundle and RabbitMQ 3.x

资源:

  • www.rabbitmq/tutorials/amqp-concepts.html
  • www.rabbitmq/tutorials/tutorial-six-python.html
  • www.rabbitmq/tutorials/amqp-concepts.html
  • www.rabbitmq/tutorials/tutorial-six-python.html

更新#1

好吧,我认为这就是我想要的:

Ok I think this is what I'm looking for:

  • github/videlalvaro/Thumper/tree/master/examples/parallel_processing

具有并行处理功能的RPC,但是如何将相关性ID"设置为我的唯一ID,以对消息进行分组并确定哪个队列?

RPC with Parallel Processing, but how do I set the Correlation Id to be my unique id to group the messages and also identify what queue?

推荐答案

在 RabbitMQ网站上的RPC 教程,提供了一种传递关联ID"的方法,该ID可以向队列中的用户标识您的消息.

In the RPC tutorial at RabbitMQ's site, there is a way to pass around a 'Correlation id' that can identify your messages to users in the queue.

我建议对您的消息使用某种ID,将其放入前3个队列中,然后再进行另一个处理,以将消息从这3个队列中取出并放入某种存储桶中.当这些存储桶收到我假设的3个任务的完成时,请将最终消息发送到第4个队列进行处理.

I'd recommend using some sort of id with your messages into the first 3 queues and then have another process to dequeue messages from the 3 into buckets of some sort. When those buckets receive what I'm assuming is the completion of there 3 tasks, send the final message off to the 4th queue for processing.

如果要向一个用户的每个队列发送一个以上的工作项目,则可能需要做一些预处理,以找出特定用户放入队列中的项目数,以便在4之前出队的进程知道有多少要处理的项目.在排队之前期待.

If you are sending more than 1 work item to each queue for one user, you might have to do a little preprocessing to find out how many items a particular user placed into the queue so the process dequeuing before 4 knows how many to expect before queuing up.

我用C#编写Rabbitmq,所以抱歉,我的伪代码不是php风格

I do my rabbitmq in C#, so sorry my pseudo code isn't in php style

// Client byte[] body = new byte[size]; body[0] = uniqueUserId; body[1] = howManyWorkItems; body[2] = command; // Setup your body here Queue(body)

// Server // Process queue 1, 2, 3 Dequeue(message) switch(message.body[2]) { // process however you see fit } processedMessages[message.body[0]]++; if(processedMessages[message.body[0]] == message.body[1]) { // Send to queue 4 Queue(newMessage) }

响应更新#1

将客户端视为服务器上的进程可能会有用,而不是将客户端视为终端.因此,如果您在此服务器,那么您要做的就是让服务器处理用户唯一ID的生成并将消息发送到适当的队列:

Instead of thinking of your client as a terminal, it might be useful to think of the client as a process on a server. So if you setup an RPC client on a server like this one, then all you need to do is have the server handle the generation of a unique id of a user and send the messages to the appropriate queues:

public function call($uniqueUserId, $workItem) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( serialize(array($uniqueUserId, $workItem)), array('correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while(!$this->response) { $this->channel->wait(); } // We assume that in the response we will get our id back return deserialize($this->response); } $rpc = new Rpc(); // Get unique user information and work items here // Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need. $response = rpc->call($uniqueUserId, $workItem); $responseBuckets[array[0]]++; // Just like above code that sees if a bucket is full or not

更多推荐

RabbitMQ等待多个队列完成

本文发布于:2023-11-05 12:35:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1560860.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:多个   队列   RabbitMQ

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!