记一次rabbitmq生产者和消费者高可用的封装

编程入门 行业动态 更新时间:2024-10-24 09:16:37

记一次rabbitmq<a href=https://www.elefans.com/category/jswz/34/1768323.html style=生产者和消费者高可用的封装"/>

记一次rabbitmq生产者和消费者高可用的封装

背景

公司需要引入rabbitmq做队列,因此需要对rabbitmq封装。网上搜索了很多封装都是千篇一律相互cp,因此,自己觉定封装一个简洁高效的rabbitmq单例。可以达到消费者和生产者共用的效果。效果如下实例:

生产者: mqService::getInstance()->listen($this->queue,$this->exchange,$this->routeKey)->sendMq($item);

消费者 mqService::getInstance()->listen($this->queue,$this->exchange,$this->routeKey)->consume(array($consume,'consume'));

封装代码
<?php
/*** rabbitmq封装* @author 摩尔小哥*/
namespace Common\Lib\Rabbitmq;class mqService{/*** 当前实例* @var null*/private static $_instance = null;/*** debug参数* @var bool*/private  $debug = false;/*** mq连接配置【后期可以写入配置】* @var array*/protected  $_config = ['host' => '127.0.0.1','port' => '5672','vhost' => 'sync_product','login' => 'admin','password' => 'test2233'];/**** MQ连接* @var null*/private $connection = null;/**** 管道* @var null*/private $channel = null;/**** 待推送消息体* @var null*/private $msg = null;/**** Routing key* @var string*/private $routing_key = '';/**** 队列名称* @var string*/private $queue_name = '';/**** 交换机* @var string*/private $exchange = null;/*** 队列* @var null*/private $queue = null;/*** 交换机名字* @var string*/private $exchange_name = '';/**** 交换机类型* @var string*/private $exchange_type = '';/*** //消息类型* @var int*/protected $delivery_mode = 2;          //消息类型private function __construct(){}private function __clone(){}/*** @return mqService|null*/public static function getInstance(){if (self::$_instance === null) {self::$_instance = new self();}return self::$_instance;}/*** 连接rabbitMQ* @param string $queueName* @param string $exchangeName* @param string $routeKey* @param string $exchangeType* @param array $config* @return $this* @throws \AMQPConnectionException* @throws \AMQPExchangeException* @throws \AMQPQueueException*/public function listen($queueName='', $exchangeName='', $routeKey='',$exchangeType = '', $config = array()){$this->exchange_name    = $exchangeName;$this->queue_name       = $queueName;$this->routing_key      = $routeKey;$this->exchange_type    = $exchangeType ?: AMQP_EX_TYPE_DIRECT;if(!$config) $config = $this->_config;$this->setMQConfig($config);//创建链接$this->connection = new \AMQPConnection($config);$this->connection->connect() or die("Cannot connect to the broker!" .PHP_EOL);//在链接中创建通道与交换机$this->channel = new \AMQPChannel($this->connection);//设置并发连接数$this->channel->setPrefetchCount(15);//确认机制/*$this->channel->confirmSelect();$ack = $this->channel->waitForConfirm();if(!$ack) throw new \Exception('confirm ack failure');*/$this->exchange = new \AMQPExchange($this->channel);//设置交换机$this->setExchange($this->exchange_name,$this->exchange_type);//消费者名称存在时设置队列if($this->queue_name){$this->queue = new \AMQPQueue($this->channel);$this->setQueue();}return $this;}public function consume($class,$func,$workId=-1){if (!$class || !$func || !$this->queue) return false;if(isset($workId) && $workId > -1){$this->queue->consume(function($envelope, $queue) use($class,$func,$workId){// ack 应答机制// 查看那个进程在消费usleep(5000);$getBody = $envelope->getBody();$queue->ack($envelope->getDeliveryTag());call_user_func_array(array($class,$func),array($getBody,$workId));});}else{while (true) {$this->queue->consume($func);}}$this->close();}/*** 发送json消息* mix : msg*/public function sendMq($msg){if ($msg && is_array($msg)) {$msg = json_encode($msg, true);}// wait service logic$ret = $this->exchange->publish($msg, $this->routing_key, AMQP_NOPARAM, ['deliver_mode' => $this->delivery_mode]);$this->close();if ($this->debug) {echo 'rabbitmq send message:' . $ret . PHP_EOL;}echo 'mq Send Message : ' . $ret . PHP_EOL;return $ret;}//    /**
//     * 处理消息
//     */
//    public function dealMessage(){
//
//        // wait service logic
//
//        $this->queue->consume(function($envelope, $queue){
//            $this->consume($envelope, $queue);
//        });
//    }/*** 申明消费者中的虚函数* @param $envelope* @param $queue* @return mixed*/
//    //重写虚基类中的虚拟方法、
//    public function consume($envelope, $queue){
//        $received =  $envelope->getBody();
//        usleep(5000);
//        //显式确认,队列收到消费者显式确认后,会删除该消息
//        $queue->ack($envelope->getDeliveryTag());
//        if($received){
//            $item = json_decode($received,true);
//            // TODO 业务逻辑处理
//            unset($item);
//        }
//        //自行编写业务逻辑...
//    }/*** 设置队列*/protected function setQueue(){$this->queue->setName($this->queue_name);//设置队列持久化$this->queue->setFlags(AMQP_DURABLE);//声明队列$this->queue->declareQueue();//交换机与队列通过routeKey进行绑定$this->queue->bind($this->exchange_name,$this->routing_key);}/*** 设置交换机* @param $name* @param $type*/protected function setExchange($name,$type){//AMQP_EX_TYPE_DIRECT:直连交换机//AMQP_EX_TYPE_FANOUT:扇形交换机//AMQP_EX_TYPE_HEADERS:头交换机//AMQP_EX_TYPE_TOPIC:主题交换机$this->exchange->setName($name);$this->exchange->setType($type);$this->exchange->setFlags(AMQP_DURABLE);$this->exchange->declareExchange();}/*** 重设mq配置* @param $config*/protected function setMQConfig($config){if(!is_array($config))die('config error:config not a array');foreach($config as $k => $v){$this->_config[$k] = $v;}}/*** 删除交换器* @param int $flags*/protected function deleteExchange($flags=AMQP_NOPARAM){$this->exchange->delete($this->exchange_name, $flags);}/*** 解绑交换机* @param $exchange_name* @param null $routing_key* @param array $arguments*/protected function unbindExchange(array $arguments = array()) {$this->exchange->unbind($this->exchange_name,$this->routing_key,$arguments);}/*** 删除队列* @param int $flags*/protected function deleteQueue($flags=AMQP_NOPARAM){$this->queue->delete($flags);}/*** 解绑队列* @param array $arguments*/protected function unbindQueue(array $arguments = array()) {$this->queue->unbind($this->exchange_name,$this->routing_key,$arguments);}/*** 断开连接*/protected function disconnect(){$this->connection->disconnect();}/*** 关闭channel*/protected function closeChannel(){$this->channel->close();}/*** 销毁*/public function __destruct(){$this->close();}/*** debug参数* @param $debug*/protected function setDebug($debug){$this->debug = $debug;}protected function getDebug(){return $this->debug;}public function close(){$this->closeChannel();$this->disconnect();}}
如何调用

生产者比较简单,把需要推送的数据推送进去即可。消费者这里比较灵活,可以任意使用。
demo如下:

    /*** 更新库存为0* @router /home/Test/receive*/public function receive(){echo "rabbit receive start \n";$worker_num = 2;$pool = new Pool($worker_num);$pool->set(['enable_coroutine' => true,]);$pool->on('workerStart', function ($pool, $workerId) {echo "rabbit WorkerId {$workerId} is started \n";try {$consumer = new updateZeroStock();#消费者$ret = mqService::getInstance()->listen($consumer->queue, $consumer->exchange, $consumer->routeKey)->consume($consumer, 'receive', $workerId);echo 'successful,ret='.ret. '\n';} catch (\Exception $e) {echo $e->getMessage() . '\n';}});//进程关闭$pool->on("WorkerStop", function ($pool, $workerId) {echo "rabbit WorkerId={$workerId} is stopped\n";});$pool->start();}
  • 接下来在需要用到的地方处理消费内部逻辑即可
<?php
/*** 自动库存调零*/
namespace Common\Lib\ZeroStock;class updateZeroStock
{var $queue;var $exchange;var $routeKey;var $err_msg = ' updateStock ';public function __construct(){$this->queue = 'update_platform_stock_zero_queue';$this->exchange = 'update_platform_stock_zero_exchange';$this->routeKey = 'update_platform_stock_zero_routingkey';}//receive此方法为需要处理的逻辑public function receive($receive, $workerId){if ($receive) {$flag = false;try {$item = json_decode($receive, true);if (!isset($item['appKey']) || empty($item['appKey'])) {throw new \Exception('appKey empty');}$appKey = $item['appKey'];$ret = $this->checkAppKey($appKey, 3600);if ($ret) {$res = $this->updateStock($item);}usleep(1000);echo $this->err_msg . 'rabbitmq receive received : ' . ($receive ? 1 : 0) . ' ,workerId=' . $workerId . ' ,res=' . $res . ' ' . date('Y-m-d H:i:s');$flag = true;unset($item);} catch (\Exception $e) {echo$this- >err_msg . 'rabbitmq receive error : ' . $e->getMessage();}unset($received);return $flag;}}/*** @param $appKey* @param int $timeout* @return bool* @throws \Exception*/public function checkAppKey($appKey, int $timeout=300){if(!$appKey){throw new \Exception('appKey empty');}$key = explode('||',base64_decode($appKey));$appKey = explode('|',base64_decode($key[1]));$keyArr = explode('&',$appKey[1]);if(time()-$appKey[0]>$timeout && $keyArr[2]!=date('Y-m-d') && $keyArr[1]!='V2'){throw new \Exception('appKey validation failure');}return true;}/*** @param array $item* @return bool*/public function updateStock($item){try {$platform = $item['platform'] ?? 'lcsc';if ($platform == 'lcsc') {$table = 't_goods';} elseif ($platform == 'hqchip') {$table = 'hq_goods';} elseif ($platform == 'oneyac') {$table = 't_yc_goods';} elseif ($platform == 'jbty') {$table = 'jb_goods';}$model = M($table,'','DB_LC_S2');echo $this->err_msg . ' ,item=='.json_encode($item);$flag  = $model->where(" id=" . intval($item['id']))->save(['stock'=> 0,'update_at'=>date('Y-m-d H:i:s')]);echo $this->err_msg . ',res=='.$flag;unset($model);return $flag;}catch (\Exception $e){echo $this->err_msg . ',更新库存失败,原因:' . $e->getMessage();return false;}}
}

更多推荐

记一次rabbitmq生产者和消费者高可用的封装

本文发布于:2024-03-14 20:02:39,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1737224.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:生产者   消费者   rabbitmq

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!