入门)"/>
Springboot 集成 RocketMq(入门)
1.RocketMq安装部署
Linux 安装 RocketMq-CSDN博客
2.添加依赖包
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
3.配置
rocketmq:name-server: 127.0.0.1:9876# 生产者producer:group: group_one# 消息发送超时时间send-message-timeout: 30000# 消息最大长度4Mmax-message-size: 4096# 消息发送失败重试次数retry-times-when-send-failed: 3# 异步消息发送失败重试次数retry-times-when-send-async-failed: 2# 消费者consumer:group: group_one# 每次提取的最大消息数pull-batch-size: 5
4.生成者代码
@RestController
@Slf4j
public class ProducerController {@Resourceprivate RocketMQTemplate rocketMqTemplate;@Resourceprivate DefaultMQProducer defaultMqProducer;@GetMapping("/send/msg1/{messageBody}")public String sendMsg1(@PathVariable("messageBody") String messageBody) {// 发送消息rocketMqTemplate.convertAndSend("topic1", messageBody);return "OK";}@GetMapping("/send/msg2/{messageBody}")public String sendMsg2(@PathVariable("messageBody") String messageBody) {// 构建消息对象Message message = new Message();message.setTopic("topic2");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes());// 发送消息,打印日志SendResult sendResult = null;try {sendResult = defaultMqProducer.send(message);log.info("sendMsg2 msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendMsg2 error", e);return "FAIL";}return "OK";}
}
5.消费者
package com.lhy.demo.rocketMq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "topic1", consumerGroup = "topic1")
@Slf4j
public class MyTopic1Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("Received topic1 message: {}", message);}}
package com.lhy.demo.rocketMq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmqmon.message.Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "topic2", consumerGroup = "topic2")
@Slf4j
public class MyTopic2Consumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received topic2 message: {}", message);}}
更多推荐
Springboot 集成 RocketMq(入门)
发布评论