php实战RabbitMQ四(消息持久化与公平调度)

编程入门 行业动态 更新时间:2024-10-09 18:23:11

php实战RabbitMQ四(消息持久<a href=https://www.elefans.com/category/jswz/34/1769842.html style=化与公平调度)"/>

php实战RabbitMQ四(消息持久化与公平调度)

php实战RabbitMQ四(数据持久化与公平调度)

  • 前言
    • 分析
  • 消息持久化
    • RabbitMQ退出或崩溃应对
      • 队列持久化
      • 消息持久化
    • 消费者异常退出应对
      • 消费者消息确认
    • 持久化注意
  • 公平调度
  • 源码
    • 生产者
    • 消费者
  • 运行

前言

在开始之前先讲个故事吧!
憧憬下未来,假如我有2个孩子,龙凤胎,俊男靓女,emmm哈哈哈停不下来了。

有天,我给家里留了纸条,纸条内容:
1. 孩子们记得吃饭
2. 要洗碗
3. 要拖地
4. 出门时记得锁门
哈哈哈[ 爱你们的爸爸 ]

分析

我:消息生产者
纸条:rabbitMQ
孩子们: 消费者

通过将故事与我们实际开发相结合,在这里有两个问题:

	1. 谁洗碗?谁拖地?都是男孩吗?那女孩子就没事干了。反映到开发中,有的消费者很忙,有的消费者很闲。是否公平调度?2. 纸条我放到地方安全显眼吗?会不会被小风吹走?会不会被无视?反映到开发中,数据能持久存在吗?

消息持久化

RabbitMQ退出或崩溃应对

RabbitMQ退出或崩溃会丢失队列与消息,所以在这里我们需要将消息与队列标记为持久。

队列持久化

源码

根据源码将第三个参数设为true


$channel->queue_declare('hello', false, true, false, false);

消息持久化

源码

源码

根据源码在消息传递的过程中设置delivery_mode的值为2或者DELIVERY_MODE_PERSISTENT

$msg = new AMQPMessage('你的消息', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] );

消费者异常退出应对

消费者可能接收到消息后,执行一个耗时任务,结果执行在中途,异常退出。默认情况下,rabbitMQ一旦发送消息给客户端立即删除。这就很尴尬了,反映到现实~~张三刚下的订单突然没了

我们当然希望有消费者异常退出后,赶紧来个接茬的,任务交给另一个消费者来干。

消费者消息确认

为了保证消息不丢失,我们希望消费者完成消息处理后发送ack确认,rabbitMQ收到后才能对消息删除。
源码

根据源码在消费者方法中,第四个参数设置为false,要求确认
紧接着在回调函数中处理完消息后调用ack()方法,发送确认

$callback = function($msg) {//收到消息了,打印echo " [x] Received ", $msg->body, "\n";//***消息确认***$msg->ack();
};$channel->basic_consume('mq_sms_send_q', 'consumer1', false, false, false, false, $callback);

持久化注意

不能在已存在的队列名上加持久化设置,会报错,可以重新设置队列名称

公平调度

rabbitMQ在分发给消费者任务时,不会智能去监测消费者是否空闲。所以会出现部分消费者在处理一个重任务还未完成,另一个任务已经到来。而另外的消费者之前拿到轻任务很快处理完,闲了很久。

源码
根据源码在basic_qos()方法中,第二个参数设置为1,等待消费者处理完成后再接新消息,不堆积

$channel->basic_qos(null, 1, null);

源码

生产者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;//获取终端提示用户输入的数据
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);//建立生产者与mq之间的连接    
//参数:地址,端口,账号,密码
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');//在已连接基础上建立生产者与mq之间的通道
$channel = $connection->channel();//声明初始化交换机   
//参数:交换机名,路由类型,是否检测同名队列,是否开启队列持久化,通道关闭后是否删除队列
$channel->exchange_declare('mq_sms_send_ex', 'direct', false, true, false); //声明初始化一条队列
//参数:队列名,是否检测同名队列,是否开启队列持久化,是否能被其他队列访问,通道关闭后是否删除队列
$channel->queue_declare('mq_sms_send_q1', false, true, false, false);//将队列与某个交换机进行绑定,并使用路由关键字
//参数:队列名,交换机名,路由键名
$channel->queue_bind('mq_sms_send_q1', 'mq_sms_send_ex', 'sms_send'); //生成消息
$msg = new AMQPMessage($msg_str, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);//推送消息到某个交换机
//参数:消息,交换机名,路由键名
$channel->basic_publish($msg, 'mq_sms_send_ex', 'sms_send');
echo " [x] Sent: $msg_str \n";$channel->close();
$connection->close();

消费者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();//声明初始化交换机   
//参数:交换机名,路由类型,是否检测同名队列,是否开启队列持久化,通道关闭后是否删除队列
$channel->exchange_declare('mq_sms_send_ex', AMQPExchangeType::DIRECT, false, true, false);//声明初始化一条队列
//参数:队列名,是否检测同名队列,是否开启队列持久化,是否能被其他队列访问,通道关闭后是否删除队列
$channel->queue_declare('mq_sms_send_q1', false, true, false, false);//将队列与某个交换机进行绑定,并使用路由关键字
//参数:队列名,交换机名,路由键名
$channel->queue_bind('mq_sms_send_q1', 'mq_sms_send_ex', 'sms_send');echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";$callback = function($msg) {echo " [x] Received ", $msg->body, "\n";//消息确认$msg->ack();//判断获取到quit后if (trim($msg->body) == 'quit') {$msg->getChannel()->basic_cancel($msg->getConsumerTag());}};$channel->basic_qos(null, 1, null);//
//参数:队列名,消费者标识符,不接收此使用者发布的消息,使用者是否使用自动确认模式,请求独占使用者访问,不等待,消息回调函数
$channel->basic_consume('mq_sms_send_q1', 'consumer1', false, false, false, false, $callback);function shutdown($channel, $connection)
{$channel->close();$connection->close();
}register_shutdown_function('shutdown', $channel, $connection);while(count($channel->callbacks)) {$channel->wait();
}

运行

更多推荐

php实战RabbitMQ四(消息持久化与公平调度)

本文发布于:2024-02-07 08:24:25,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1754813.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:化与   持久   实战   公平   消息

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!