SpringBoot整合RabbitMQ学习笔记

编程入门 行业动态 更新时间:2024-10-22 13:31:20

SpringBoot整合RabbitMQ<a href=https://www.elefans.com/category/jswz/34/1770117.html style=学习笔记"/>

SpringBoot整合RabbitMQ学习笔记

SpringBoot整合RabbitMQ学习笔记

以下三种类型的消息,生产者和消费者需各自启动一个服务,模拟生产者服务发送消息,消费者服务监听消息,分布式开发。

一 Fanout类型信息


  1. . RabbitMQ创建交换机和队列
    在RabbitMQ控制台,新建交换机hmall.fanout,新建两个队列,fanout.queue1和fanout.queue2,并将连个队列和交换机进行绑定即可。
    操作如下图所示:
    一下操作可以通过代码实现,具体参考配置类
    (1)创建队列

(2)创建交换机

(3)绑定
2. 代码实现
(1)引入依赖

<dependency><groupId>org.springframework.book</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)配置MQ配置信息

spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.fanoutproducer:queue1: fanout.queue1

(3)声明队列和交换机配置类

@Component
public class FanoutConfg{@Value("${spring.rabbitmq.exchange}")private String exchange@Value("${spring.rabbitmq.producer.queue1}")private String queueName1// 声明fanout交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(exchanage);}// 声明队列@Beanpublic Queue fanoutQueue1(){return new Queue(queueName1);}//绑定队列和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder,build(fanoutQueue1).to(fanoutExchange);}
}

(4)生产者

@Component
public class RabbitMqProduce {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.queue})private String queueName;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void send(String msg){rabbitTemplete.covertAndSend(queueName,null,msg);}}

(4)消费者

@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue}")public void counsume(String msg){System.out.pringln("消费者收到 fanout.queue队列发的消息",msg);}
}

(5)测试类

@SpringBootTest
public class SpringBootTest{@AUtowiredprivate RabbitMqProduce producer;@Testpublic void testSendFanoutMsg(){producer.send("fanout类型发送消息!!!");}
}

二 direct类型发送消息

  1. 控制台操作
    (1)交换机和队列的创建参考fanout的操作
    (2)绑定:与fanout不同的是 给交换机绑定队列的同时需要指定路由键,如下图所示:
  2. 代码实现
    (1)依赖引入参考fanout类型的消息
    (2)mq消息配置
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.directproducer:queue1: direct.queue1queue2: direct.queue2routingKey1: redroutingKey2: red2

(3)MQ配置类
以下配置可以在消费者注解上实现

@Component
public class RabbitMqConfig {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.exchange}")private String exchange;@value("${spring.rabbitmq.producer.queue1}")private String queueName1;@value("${spring.rabbitmq.producer.queue2}")private String queueName2;@value("${spring.rabbitmq.producer.routingKey1}")private String routingKey1;@value("${spring.rabbitmq.producer.routingKey2}")private String routingKey2;// 创建交换机@Bean("directExchange")public Exchange getExchange(){return ExchangeBuilder.topicExchange(exchange) // 交换机类型,交换机名称.durable(true) //ture为持久化,存到磁盘,false存到内存.build();}// 创建队列@Bean("directQueue1")public Queue getDirectQueue1(){retuen new Queue(queueName1);}// 交换机绑定队列@beanpublic Binging bindDirectQueue1(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey1).noargs();	}// 创建队列@Bean("directQueue2")public Queue getDirectQueue2(){retuen new Queue(queueName2);}// 交换机绑定队列@beanpublic Binging bindDirectQueue2(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey2).noargs();	}}

(4)生产者发送消息

@Component
public class RabbitMqProduce {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.queue1})private String queueName1;@value("${spring.rabbitmq.producer.queue2})private String queueName2;@value("${spring.rabbitmq.producer.routingKey2})private String routingKey1;@value("${spring.rabbitmq.producer.routingKey1})private String routingKey2;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void sendQueue1(String msg){rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);}public void sendQueue2(String msg){rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);}}

(5)消费者监听消息
第一种:已经编写了配置类

@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")public void counsume(String msg){System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);}@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")public void counsume(String msg){System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);}
}

第二种:在注解上配置交换机和队列以及路由键

@Component
public class RabbitMqListener {@RabbitListener(bindings = @QueueBinding(value = Queue(name="${spring.rabbitmq.producer.queue1}",durable="true"),exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"}	))public void counsume(String msg){System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);}@RabbitListener(bindings = @QueueBinding(value = Queue(name="${spring.rabbitmq.producer.queue2}",durable="true"),exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"}	))public void counsume(String msg){System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);}
}

(6)测试类

@SpringBootTest
public class SpringBootTest{@AUtowiredprivate RabbitMqProduce producer;@Testpublic void testSendDirectMsg(){producer.send("direct类型发送消息!!!");}
}

三 Topic类型消息


  1. 控制台操作
    参考前面的创建交换机,队列,以及绑定关系操作
  2. 代码实现
    (1)依赖引入参考fanout类型的消息
    (2)mq消息配置
    路由键使用通配符进行匹配,#代表多个,*代表一个
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.topicproducer:queue1: topic.queue1queue2: topic.queue2routingKey1: china.#routingKey2: #.news

(3)MQ配置类

@Component
public class RabbitMqConfig {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.exchange}")private String exchange;@value("${spring.rabbitmq.producer.queue1}")private String queueName1;@value("${spring.rabbitmq.producer.queue2}")private String queueName2;@value("${spring.rabbitmq.producer.routingKey1}")private String routingKey1;@value("${spring.rabbitmq.producer.routingKey2}")private String routingKey2;// 创建交换机@Bean("topicExchange")public Exchange getExchange(){return ExchangeBuilder.topicExchange(exchange) // 交换机类型,交换机名称.durable(true) //ture为持久化,存到磁盘,false存到内存.build();}// 创建队列@Bean("topicQueue1")public Queue getDirectQueue1(){retuen new Queue(queueName1);}// 交换机绑定队列@beanpublic Binging bindDirectQueue1(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey1).noargs();	}// 创建队列@Bean("topicQueue2")public Queue getDirectQueue2(){retuen new Queue(queueName2);}// 交换机绑定队列@beanpublic Binging bindDirectQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey2).noargs();	}}

(4)生产者发送消息

@Component
public class RabbitMqProduce {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.producer.queue1})private String queueName1;@value("${spring.rabbitmq.producer.queue2})private String queueName2;@value("${spring.rabbitmq.producer.routingKey2})private String routingKey1;@value("${spring.rabbitmq.producer.routingKey1})private String routingKey2;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void sendQueue1(String msg){rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);}public void sendQueue2(String msg){rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);}}

(5)消费者监听消息

@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")public void counsume(String msg){System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);}@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")public void counsume(String msg){System.out.pringln("消费者收到 topic.queue2队列发的消息",msg);}
}

(6)测试类

@SpringBootTest
public class SpringBootTest{@AUtowiredprivate RabbitMqProduce producer;@Testpublic void testSendDirectMsg(){producer.send("direct类型发送消息!!!");}
}

四 消息转换器

MQ会把消息体变成字节码

解决办法:使用消息转换器,实现如下:

  1. 在生产者和消费者两个服务引入依赖
<dependency><groupId>com.fasterxml.jackson</groupId><artifactId>jasckson-databind</artifactId>
</dependency>
  1. 在生产者和消费者两个服务编写消息转换器配置
@Component
public class JacksonMessageConvertor{@Beanpublic MessageCoverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}
}
  1. 消息体
    对于生产者来说,是map类型的,则生成者接收的时候也是map类型
    例如:
@Component
public class RabbitMqListener {@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")public void counsume(Map<String,Objecct> msg){System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);}}

五 案例演示

支付服务支付成功后通知交易服务进行后续操作

生产者和消费者两个服务都需要进行1,2,3步骤

  1. 添加依赖
<!--mq依赖-->
<dependency><groupId>org.springframework.book</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--消息转换器依赖-->
<dependency><groupId>com.fasterxml.jackson</groupId><artifactId>jasckson-databind</artifactId>
</dependency>
  1. 添加MQ配置信息
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: pay.topicqueue: mark.order.pay.queueroutKingKey: pay.success
  1. 消息转换器配置类
@Component
public class JacksonMessageConvertor{@Beanpublic MessageCoverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}
}
  1. 生产者
    (1)生产者的配置
@Component
public class Rabbitroducer {@Autowiredprivate RabbitTemplate rabbitTemplete;@value("${spring.rabbitmq.queue})private String queueName;@value("${spring.rabbitmq.routingKey})private String routingKey;/*** 入参说明:* 第一个参数:queueName:队列名称* 第二个参数:路由键,fanout类型不需要路由键* 第三个参数:msg 消息题内容*/public void sendMsg(String msg){// 发送消息rabbitTemplete.covertAndSend(queueName,routingKey, msg);}
}

(2)业务代码支付成功发送消息

public class payOrderServiceImpl impletement PayOrderService{@Autowridprivate RabbitProducer payProducer;@Overirid@Transactional(rollback = Exception.class)public void payOrder(PayOrderDto payOrder){// 一些列操作最终交易成功// 发送消息通知try{payProducer.send(payOrder.getId());}catch(AmqpException e){log.error("交易成功,发送消息异常:{}",e.getMessages(););}}
}
  1. 消费者
@Component
public class PaySatusListener {@Autowiredprivate OrderService orderService;@RabbitListener(bindings = @QueueBinding(value = Queue(name="${spring.rabbitmq.queue}",durable="true"),exchange = @Exchange(name="${spring.rabbitmq.exchange)",type=ExchangeType.TOPIC),key = {"${spring.rabbitmq.routingKey}"}	))public void listenOrderPay(Long orderId){//标记订单为已支付orderService.markOrderPaySuccess(orderId);}}

更多推荐

SpringBoot整合RabbitMQ学习笔记

本文发布于:2023-11-16 13:13:46,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622834.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:学习笔记   SpringBoot   RabbitMQ

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!