延时队列实现定金解冻清算

编程入门 行业动态 更新时间:2024-10-10 04:19:25

延时<a href=https://www.elefans.com/category/jswz/34/1771257.html style=队列实现定金解冻清算"/>

延时队列实现定金解冻清算

业务场景说明

用于解决用户交付定金时冻结定金到达解冻时间时清算定金

整合RabbitMQ实现延迟队列

在pom.xml中添加相关依赖

<!--消息队列相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok依赖-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>

修改SpringBoot配置文件

修改application.yml文件,在spring节点下添加RabbitMQ相关配置。
  rabbitmq:host: rabbitmq的连接地址port: rabbitmq的连接端口号virtual-host: rabbitmq的虚拟hostusername: rabbitmq的用户名password: rabbitmq的密码publisher-confirms: true #如果对异步消息需要回调必须设置为true

添加消息队列的枚举配置类QueueEnum

用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称。
package com.xcb.oms.enums;import lombok.Getter;/*** @className: QueueEnum* @description: TODO 类描述* @author: Jp* @date: 2022/1/21*/
@Getter
public enum QueueEnum {/** 插件实现延迟队列 */QUEUE_ORDER_PLUGIN_CANCEL("mall.order.direct.plugin", "mall.order.cancel.plugin", "mall.order.cancel.plugin");/** 交换名称 */private String exchange;/** 队列名称 */private String name;/** 路由键 */private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}

添加RabbitMQ的配置

用于配置交换机、队列及队列与交换机的绑定关系。
package com.xcb.oms.config;import com.xcb.oms.enums.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @className: RabbitConfig* @description: TODO 类描述* @author: Jp* @date: 2021/8/28*/
@Configuration
public class RabbitConfig {/** 订单延迟插件消息队列所绑定的交换机 */@BeanCustomExchange orderPluginDirect() {// 创建一个自定义交换机,可以发送延迟消息Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message", true, false, args);}/** 订单延迟插件队列 */@Beanpublic Queue orderPluginQueue() {return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());}/** 将订单延迟插件队列绑定到交换机 */@Beanpublic Binding orderPluginBinding(CustomExchange orderPluginDirect, Queue orderPluginQueue) {return BindingBuilder.bind(orderPluginQueue).to(orderPluginDirect).with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()).noargs();}
}

添加延迟消息的发送者MqProducer

用于向订单延迟消息队列(mall.order.cancel.plugin)里发送消息。
@Component
@Slf4j
public class MqProducer {@Autowired private AmqpTemplate amqpTemplate;/*** 发送延时信息** @param orderNo 订单编号* @param delayTimes routingKey*/public void sendMessage(String orderNo, final long delayTimes) {// 给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(),QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(),orderNo,message -> {// 给消息设置延迟毫秒值message.getMessageProperties().setHeader("x-delay", delayTimes);return message;});log.info("send delay message orderId:{}", orderNo);}
}

添加取消订单消息的接收者RabbitmqConsumer

package com.xcb.oms.event.mq.listener;import com.fasterxml.jackson.databind.ObjectMapper;/*** @className: RabbitmqConsumer* @description: 定金清算 ==> 延时队列* @author: Jp* @date: 2021/8/28*/
@Slf4j
@Component
public class RabbitmqConsumer {@Autowired private ObjectMapper objectMapper;public static RabbitmqConsumer rabbitmqConsumer;@PostConstructpublic void init() {rabbitmqConsumer = this;rabbitmqConsumer.objectMapper = this.objectMapper;}/** 处理延时订单 */@RabbitListener(queues = "mall.order.cancel.plugin")public void process(Message message) {String orderNo = null;try {orderNo = objectMapper.readValue(message.getBody(), String.class);} catch (IOException e) {e.printStackTrace();}log.info("延迟队列的订单编号[{}]", orderNo);// 获取到订单编号后可在这调用对应的业务方法}
}

添加OrderRecordService接口

package com.xcb.oms.service;/*** @author Jp* @date 2021-10-28*/
public interface OrderRecordService {/** 存入延时队列 */void generateOrder(String orderNo, int delay);
}

添加OrderRecordService的实现类OrderRecordServiceImpl

package com.xcb.oms.service.impl;/*** @author Jp* @date 2021-10-28*/
@Service
@Slf4j
public class OrderRecordServiceImpl extends DefaultModelService<OrderRecordDTO, OrderRecord>implements OrderRecordService {@Inject private MqProducer mqProducer;/** 存入延时队列 */@Overridepublic void generateOrder(String orderNo, int delay) {sendDelayMessageCancelOrder(orderNo, delay);}private void sendDelayMessageCancelOrder(String orderNo, int delay) {long delayTimes = delay * 1000;// 发送延迟消息mqProducer.sendMessage(orderNo, delayTimes);}
}
然后可以根据业务调用这个方法
注意,该方法依赖RabbitMQ的延时插件,需要该插件可以查看我这边博客,链接附上

服务器上安装RabbitMQ延时插件

更多推荐

延时队列实现定金解冻清算

本文发布于:2024-03-23 15:15:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1739667.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   定金

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!