【RocketMQ系列八】SpringBoot集成RocketMQ

编程入门 行业动态 更新时间:2024-10-21 18:56:07

【RocketMQ<a href=https://www.elefans.com/category/jswz/34/1770787.html style=系列八】SpringBoot集成RocketMQ"/>

【RocketMQ系列八】SpringBoot集成RocketMQ

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 引入依赖
    • 2. 配置文件修改
    • 3. 实现生产者
        • 3.1. 编写生产者单元测试
    • 4.实现消费者
    • 5. 实现事务消息
      • 5.1. 实现事务消息的生产者
      • 5.2. 实现本地事务消息

本文将主要介绍在SpringBoot项目中如何集成RocketMQ以实现普通消息和事务消息的。

首先是分别创建生产者的springboot项目 springboot-rocketmq-producer,创建消费者的springboot项目 springboot-rocketmq-consumer。

1. 引入依赖

本例中使用的RocketMQ的版本是 5.1.3。所以引入的 rocketmq-spring-boot 版本要与之匹配。

可以通过mvnrepository进行查看。.apache.rocketmq/rocketmq-spring-boot/2.2.2

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot</artifactId><version>2.2.2</version>
</dependency>

2. 配置文件修改

在springboot-rocketmq-producer项目的application.yml文件中添加如下配置:

rocketmq:name-server: 172.31.184.89:9876producer:group: feige-producer-groupconsumer:topic: my-spring-boot-topic

在springboot-rocketmq-consumer项目的application.yml文件中添加如下配置:

server:port: 8080
rocketmq:name-server: 172.31.184.89:9876consumer:group: feige-consumer-grouptopic: my-spring-boot-topic

3. 实现生产者

定义一个生产者类MyProducer,在该类中引入RocketMQTemplate 操作类,然后定义发送消息的方法sendMessage,在此方法中调用 rocketMQTemplate.convertAndSend 方法进行消息发送。

@Component
public class MyProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送普通消息** @param topic   主题* @param message 消息*/public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
3.1. 编写生产者单元测试
@Autowiredprivate MyProducer myProducer;@Value("${rocketmq.consumer.topic:}")private String consumerTopic;@Testvoid sendMessage() {myProducer.sendMessage(consumerTopic,"飞哥SpringBoot集成RocketMQ消息测试");}

4.实现消费者

定义消费者类MyConsumer。此类实现了RocketMQListener接口并重写了onMessage方法用于接收broker推送过来的消息。

@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic:}", consumerGroup = "generalConsumerGroup")
public class MyConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("收到的消息是=" + s);}
}

5. 实现事务消息

在SpringBoot中实现RocketMQ的事务消息,整体思路与 【RocketMQ系列六】RocketMQ事务消息 文中提到的思路相同。

5.1. 实现事务消息的生产者

在前面创建的MyProducer类中添加实现事务消息的方法 sendTransactionMessage。

/*** 发送事务消息** @param topic 话题* @param msg   消息*/public void sendTransactionMessage(String topic, String msg) throws InterruptedException {String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};for (int i = 0; i < 10; i++) {// 2. 将topic和tag整合在一起,以:分割,String destination = topic + ":" + tags[i % tags.length];// 1.注意该message是org.springframework.messaging.MessageMessage<String> message = MessageBuilder.withPayload(msg + "_" + tags[i % tags.length] + "_" + i).setHeader("destination", destination).build();// 第一个参数是发布的目的地,第二个参数是消息,第三个参数是额外的参数rocketMQTemplate.sendMessageInTransaction(destination, message, destination);Thread.sleep(10);}}

这里需要注意的是传入的Message类是org.springframework.messaging.Message ,不是RocketMQ的Message。

5.2. 实现本地事务消息

接着在定义生产者本地事务实现类 MyTransactionListener,该类实现了RocketMQLocalTransactionListener接口,并重写了executeLocalTransaction方法和checkLocalTransaction方法。这里多了一步就是将 org.springframework.messaging.Message 转成 org.apache.rocketmqmon.message.Message

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 将消息转成rocketmq下的messageorg.apache.rocketmqmon.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "utf-8", (String) arg, msg);String tags = message.getTags();if (tags.equals("tagA")) {return RocketMQLocalTransactionState.COMMIT;} else if (tags.equals("tagB")) {return RocketMQLocalTransactionState.ROLLBACK;}return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 将消息转成rocketmq下的messageString destination = (String) msg.getHeaders().get("destination");org.apache.rocketmqmon.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"utf-8",destination, msg);String tags = message.getTags();if (tags.equals("tagC")) {return RocketMQLocalTransactionState.COMMIT;} else if (tags.equals("tagD")) {return RocketMQLocalTransactionState.ROLLBACK;}return RocketMQLocalTransactionState.UNKNOWN;}
}

更多推荐

【RocketMQ系列八】SpringBoot集成RocketMQ

本文发布于:2023-12-08 00:34:25,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1672109.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:系列   RocketMQ   SpringBoot

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!