RabbitMQ快速入门

编程入门 行业动态 更新时间:2024-10-26 18:30:23

RabbitMQ快速<a href=https://www.elefans.com/category/jswz/34/1770026.html style=入门"/>

RabbitMQ快速入门

1、RabbitMQ管理控制台的使用

(1)添加allinpay 用户

(2)添加虚拟机 /jinqiao

2、使用简单模式完成消息传递

 

3、RabbitMQ的工作模式

3.1 Work queues 工作队列模式

在一个队列中,如果有多个消费者,都监听同一个队列,那么消费者之间对于同一个消息的关系是竞争的关系。

例如:短信服务部署多个,只需要有一个节点成功发送即可。

生产者: 

 

消费者1和消费者2:(顺序消费)

 

3.2 Pub / Sub订阅模式

引入交换机:生产者把消息发给交换机,交换机再将消息路由分发到每一个队列,消费者通过监听队列来获取消息。

功能:发送一条日志信息,两个不同的系统都可以收到这个日志信息,并且进行对日志不同的操作。

package com.dhu.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 发送消息:* @author zhou* @create 2020/7/21*/
public class Producer_PubSub {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机/*void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1.exchange 交换机名称2.type 交换机类型DIRECT("direct"), 定向FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"), 通配符的方式HEADERS("headers"); 参数匹配3.internal 内部使用,一般设false*/String exchangeName = "test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);//6.创建队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7.绑定队列和交换机/*com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)参数:routingKey 路由键,也就是绑定规则。* 若交换机类型是 fanout,routingKey默认设置为""*/channel.queueBind(queue1Name, exchangeName, ""); //这里路由key为""channel.queueBind(queue2Name, exchangeName, "");//8.发送消息String body = "日志信息 ...,日志级别 info";channel.basicPublish(exchangeName, "", null, body.getBytes());channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

 

3.3 Routing路由模式

发消息使用的routing key 与 绑定队列和交换机 使用同一个 routing key。

 

交换机的类型必须是:DIRECT

package com.dhu.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import sun.util.locale.provider.FallbackLocaleProviderAdapter;/*** 发送消息:* @author zhou* @create 2020/7/21*/
public class Producer_Routing {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机/*void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1.exchange 交换机名称2.type 交换机类型DIRECT("direct"), 定向FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"), 通配符的方式HEADERS("headers"); 参数匹配3.internal 内部使用,一般设false*/String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);//6.创建队列String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7.绑定队列和交换机/*com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)参数:routingKey 路由键,也就是绑定规则。* 若交换机类型是 fanout,routingKey默认设置为""*///队列1绑定 errorchannel.queueBind(queue1Name, exchangeName, "error"); //这里路由key为//队列2绑定channel.queueBind(queue2Name, exchangeName, "info");channel.queueBind(queue2Name, exchangeName, "error");channel.queueBind(queue2Name, exchangeName, "warning");//8.发送消息String body = "日志信息:调用了delete方法,出错了...,日志级别 error";channel.basicPublish(exchangeName, "info", null, body.getBytes());channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
package com.dhu.consumer;import com.rabbitmq.client.*;import java.io.IOException;/*** @author zhou* @create 2020/7/21*/
public class Consumer_Routing1 {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建队列QueueString queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";//6.接收消息Consumer consumer = new DefaultConsumer(channel) {//回调方法。当收到消息后,会自动执行该方法/*consumerTag 标识envelope 获取信息:交换机、路由keyproperties 配置信息body 数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body :" + new String(body));System.out.println("将日志消息保存数据库...");}};/*String basicConsume(String queue, boolean autoAck, Consumer callback)参数:1.queeu 队列名称2.autoAck 是否自动确认3.callback 回调对象*/channel.basicConsume(queue1Name, true, consumer); //监听队列,消费//消费则不需要关闭资源,用来监听的} catch (Exception e) {e.printStackTrace();}}
}

3.4 Topics通配符模式

除了监控日志中error信息,还需求监控订单系统中的所有级别日志信息。

通配符:# 和 *;#匹配0个或多个单词,*匹配1个单词。 

package com.dhu.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 发送消息:* @author zhou* @create 2020/7/21*/
public class Producer_Topic {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机/*void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1.exchange 交换机名称2.type 交换机类型DIRECT("direct"), 定向FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"), 通配符的方式HEADERS("headers"); 参数匹配3.internal 内部使用,一般设false*/String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);//6.创建队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7.绑定队列和交换机/*com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)参数:routingKey 路由键,也就是绑定规则。* 若交换机类型是 fanout,routingKey默认设置为""*///routing key : 系统的名称.日志的级别(只有两部分组成)channel.queueBind(queue1Name, exchangeName, "#.error"); //这里路由key为channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*"); //都打印到控制台//8.发送消息String body = "日志信息:调用了delete方法,出错了...,日志级别";channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

 

4、Spring整合RabbitMQ

 

 

 

 

 

 

 

 

 

 

更多推荐

RabbitMQ快速入门

本文发布于:2024-02-12 04:48:34,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1686103.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:入门   快速   RabbitMQ

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!