入门"/>
RabbitMQ快速入门
1、RabbitMQ管理控制台的使用
(1)添加allinpay 用户
(2)添加虚拟机 /jinqiao
2、使用简单模式完成消息传递
3、RabbitMQ的工作模式
3.1 Work queues 工作队列模式
在一个队列中,如果有多个消费者,都监听同一个队列,那么消费者之间对于同一个消息的关系是竞争的关系。
例如:短信服务部署多个,只需要有一个节点成功发送即可。
生产者:
消费者1和消费者2:(顺序消费)
3.2 Pub / Sub订阅模式
引入交换机:生产者把消息发给交换机,交换机再将消息路由分发到每一个队列,消费者通过监听队列来获取消息。
功能:发送一条日志信息,两个不同的系统都可以收到这个日志信息,并且进行对日志不同的操作。
package com.dhu.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 发送消息:* @author zhou* @create 2020/7/21*/
public class Producer_PubSub {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机/*void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1.exchange 交换机名称2.type 交换机类型DIRECT("direct"), 定向FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"), 通配符的方式HEADERS("headers"); 参数匹配3.internal 内部使用,一般设false*/String exchangeName = "test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);//6.创建队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7.绑定队列和交换机/*com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)参数:routingKey 路由键,也就是绑定规则。* 若交换机类型是 fanout,routingKey默认设置为""*/channel.queueBind(queue1Name, exchangeName, ""); //这里路由key为""channel.queueBind(queue2Name, exchangeName, "");//8.发送消息String body = "日志信息 ...,日志级别 info";channel.basicPublish(exchangeName, "", null, body.getBytes());channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
3.3 Routing路由模式
发消息使用的routing key 与 绑定队列和交换机 使用同一个 routing key。
交换机的类型必须是:DIRECT
package com.dhu.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import sun.util.locale.provider.FallbackLocaleProviderAdapter;/*** 发送消息:* @author zhou* @create 2020/7/21*/
public class Producer_Routing {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机/*void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1.exchange 交换机名称2.type 交换机类型DIRECT("direct"), 定向FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"), 通配符的方式HEADERS("headers"); 参数匹配3.internal 内部使用,一般设false*/String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);//6.创建队列String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7.绑定队列和交换机/*com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)参数:routingKey 路由键,也就是绑定规则。* 若交换机类型是 fanout,routingKey默认设置为""*///队列1绑定 errorchannel.queueBind(queue1Name, exchangeName, "error"); //这里路由key为//队列2绑定channel.queueBind(queue2Name, exchangeName, "info");channel.queueBind(queue2Name, exchangeName, "error");channel.queueBind(queue2Name, exchangeName, "warning");//8.发送消息String body = "日志信息:调用了delete方法,出错了...,日志级别 error";channel.basicPublish(exchangeName, "info", null, body.getBytes());channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
package com.dhu.consumer;import com.rabbitmq.client.*;import java.io.IOException;/*** @author zhou* @create 2020/7/21*/
public class Consumer_Routing1 {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建队列QueueString queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";//6.接收消息Consumer consumer = new DefaultConsumer(channel) {//回调方法。当收到消息后,会自动执行该方法/*consumerTag 标识envelope 获取信息:交换机、路由keyproperties 配置信息body 数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body :" + new String(body));System.out.println("将日志消息保存数据库...");}};/*String basicConsume(String queue, boolean autoAck, Consumer callback)参数:1.queeu 队列名称2.autoAck 是否自动确认3.callback 回调对象*/channel.basicConsume(queue1Name, true, consumer); //监听队列,消费//消费则不需要关闭资源,用来监听的} catch (Exception e) {e.printStackTrace();}}
}
3.4 Topics通配符模式
除了监控日志中error信息,还需求监控订单系统中的所有级别日志信息。
通配符:# 和 *;#匹配0个或多个单词,*匹配1个单词。
package com.dhu.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 发送消息:* @author zhou* @create 2020/7/21*/
public class Producer_Topic {public static void main(String[] args) {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("192.168.93.101"); //默认localhost 127.0.0.1factory.setPort(5672);factory.setVirtualHost("/jinqiao"); //虚拟机,默认值为/factory.setUsername("allinpay"); //用户名,默认值为guestfactory.setPassword("allinpay"); //密码,默认值为guest//3.创建连接 Connectiontry {Connection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机/*void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1.exchange 交换机名称2.type 交换机类型DIRECT("direct"), 定向FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定的队列TOPIC("topic"), 通配符的方式HEADERS("headers"); 参数匹配3.internal 内部使用,一般设false*/String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);//6.创建队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7.绑定队列和交换机/*com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)参数:routingKey 路由键,也就是绑定规则。* 若交换机类型是 fanout,routingKey默认设置为""*///routing key : 系统的名称.日志的级别(只有两部分组成)channel.queueBind(queue1Name, exchangeName, "#.error"); //这里路由key为channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*"); //都打印到控制台//8.发送消息String body = "日志信息:调用了delete方法,出错了...,日志级别";channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
4、Spring整合RabbitMQ
更多推荐
RabbitMQ快速入门
发布评论