1, 需要一个中文文档
https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ying-yong-jiao-cheng/php-ban/1-hello_world
详细的说明了调用rabbitMq的各种方法。
2,开始连接的时候,第一次使用有许多的坑,最后多谢老铁给我讲解了一下关于bs cs的东西
rabbit 拓展有php写的 还有c写的拓展,分成两种。
php:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
类似于上面这个就是php的扩展,不用考虑扩展和php的版本问题,这也算是坑的一部分吧。
c:
$conn = new \AMQPConnection($conn_args);
$message = json_encode(array('Hello World3!','php3','c++3:'));
//创建channel
$channel = new \AMQPChannel($conn);
//创建exchange
$ex = new \AMQPExchange($channel);
下面这是c写的扩展,其实C写的运行速度快,但是目前使用的是php的扩展
生产者代码:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$localhost = ['localhost',5672,'guest','guest'];
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
消费者代码:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$localhost = ['localhost',5672,'guest','guest'];
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
//由于项目原因,不能直接使用原生的php扩展
切换为 c扩展时报错 暂时未解决
报错 :Exception 'AMQPConnectionException' with message 'Library error: a socket error occurred - Potential login failure.'
Library error: connection closed unexpectedly - Potential login failure.
百度翻译 连接失败,登录失败。设置了管理员账号依然如此,
php - rabbit引用
https://blog.csdn/jj546630576/article/details/88533718
在切换为php扩展时,一直报错
PHP Fatal error: Class 'PhpAmqpLib\PhpAmqpLib\Connection\AMQPStreamConnection' not found in D:\php\couponapi\commands\RabbitMQController.php on line 214
PHP Fatal Error 'yii\base\ErrorException' with message 'Class 'PhpAmqpLib\PhpAmqpLib\Connection\AMQPStreamConnection' not found'
类名找不到,于是各种改类命名空间,最后发现是安转的composer的命名目录指向和我的命名目录指向有问题,大坑啊
name参数是目录指向:如果随便修改,就会找不到类
"name": "php-amqplib/php-amqplib",
"replace": {
"videlalvaro/php-amqplib": "self.version"
},
"type": "library",
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
"keywords": ["rabbitmq", "message", "queue"],
"homepage": "https://github/php-amqplib/php-amqplib/",
此处惨痛的经历告诉我,没事别瞎改composer安装好的目录。不然还找不到原因
3,安装好了之后,开始连接通过生产者代码发送给Mq,并且在Mq接收到之后,会推送到消费者进行消费和输出
由于目前项目中用到的是YII2的 console命令行,
如果需要在command命令中引用类库,需要再console.php进行配置数据库和加载,否则会报错。
同事的意思:是想在是接收到Mq接收的消息之后再把这些消息经过一轮处理放到redis队列再处理
我是直接在command全部处理完所有的逻辑,到时候在通过Mq的Ack应答机制返回完成
更多推荐
php 本地实现连接rabbitMq ,发送接收消息
发布评论