RabbitMQ消息队列处理库存解锁及关闭订单问题

编程入门 行业动态 更新时间:2024-10-11 03:23:28

RabbitMQ消息<a href=https://www.elefans.com/category/jswz/34/1771257.html style=队列处理库存解锁及关闭订单问题"/>

RabbitMQ消息队列处理库存解锁及关闭订单问题

文章目录

  • 一、RabbitMQ延时队列
    • 消息的TTL
    • 死信
  • 二、实战
    • 延时关单
    • 规范设计
  • 三、消息队列处理库存解锁及关单
    • 1、流程分析
    • 2、库存微服务
      • 2.1 解锁库存配置
      • 2.2 解锁库存流程
      • 2.3 业务代码
      • 2.4 调试
  • 四、RMQ 延时队列处理关单及库存解锁整合
    • 1、流程分析
    • 2、订单关单
    • 3、订单释放和库存释放进行绑定
  • 五、消息丢失、重复、积压等解决方案
    • 1、消息丢失
    • 2、消息重复
    • 3、消息积压

一、RabbitMQ延时队列

RabbitMQ延时队列实现定时任务。
场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有的库存。
常用解决方案:
spring的schedule定时任务轮训数据库
缺点:
消耗系统内存,增加了数据库的压力、存在较大的时间误差
解决:
Rabbit的消息 TTL 和死信Exchange结合。

消息的TTL

消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。。
RabbitMQ 可以对队列消息分别设置TTL。

  • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信
  • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果。

注意:延时消息放入到队列中,没有被任何消费者监听,如果监听就拿到了,也就被消费了,队列里边的消息只要一过设置的过期时间,就成了死信队列,服务器就会丢弃。
那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put(“x-message-ttl”, 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。
另一种方式便是针对每条消息设置TTL,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration(“6000”);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, “msg body”.getBytes());

这样这条消息的过期时间也被设置成了6s。
但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列没有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

死信

死信:Dead Letter Exchange(DLX)
一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

  • 一个消息被Consumer拒收了,并且reject方法的参数里 requeue 是false。也就是说不会被放在队列里,被其他消费者使用。(basic.reject/basic.nack) requeue=false
  • 上面的消息的TTL到了,消息过期了。
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。

Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他exchange一样。只是在某一个设置Dead Letter Exchange 的队列中有信息过期了,会自动触发消息的转发,发送到 Dead Letter Exhange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。

二、实战

延时关单

场景:用户下单,过了30分钟没有支付,系统会默认关闭该订单,以前可以用定时任务做,现在使用延时队列。

规范设计

设计建议规范(基于事件模型的交换机设计):
1、交换机命名:业务+exchange;交换机为Topic
2、路由键:事件.需要感知的业务(可以不写)
3、队列命名:事件+想要监听服务名+queue
4、绑定关系:事件.感知的业务(#)
整体业务设计:

按照上边的规范设计,对关单业务进行升级设计:

上图说明:交换机 order-event-exchange 绑定了一个延时队列order.delay.queue,路由key是 order.create.order, 当创建了一个订单时,会发消息到该延时队列,等到TTL过期,变为死信,会自动触发消息的转发,发送到 Dead Letter Exhange(order-event-exchange) 中去,注意死信路由是 order.release.order,然后exchange根据路由key order.release.order转发消息到 order.release.order.queue队列,客户端监听该队列获取消息。
根据上图的业务设计分析,需要创建两个队列,一个交换机,和两个绑定。
gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;
import java.util.HashMap;/*** @author: kaiyi* @create: 2020-09-16 13:53*/
@Configuration
public class MyMQConfig {/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 *//*** 客户端监听队列(测试)* @param orderEntity* @param channel* @param message* @throws IOException*/@RabbitListener(queues = "order.release.order.queue")public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}/*** 死信队列** @return*/@Beanpublic Queue orderDelayQueue(){/*Queue(String name,  队列名字boolean durable,  是否持久化boolean exclusive,  是否排他boolean autoDelete, 是否自动删除Map<String, Object> arguments) 属性*/HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "order-event-exchange");arguments.put("x-dead-letter-routing-key", "order.release.order");arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟Queue queue = new Queue("order.delay.queue", true, false, false, arguments);return queue;}/*** 普通队列** @return*/@Beanpublic Queue orderReleaseQueue(){Queue queue = new Queue("order.release.order.queue", true, false, false);return queue;}/*** TopicExchange** @return*/@Beanpublic Exchange orderEventExchange(){/**   String name,*   boolean durable,*   boolean autoDelete,*   Map<String, Object> arguments* */return new TopicExchange("order-event-exchange", true, false);}@Beanpublic Binding orderCreateBinding() {/** String destination, 目的地(队列名或者交换机名字)* DestinationType destinationType, 目的地类型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments* */return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",  // 路由key一般为事件名null);}@Beanpublic Binding orderReleaseBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}

然后在控制器创建测试消息:
gulimall-order/xxx/order/web/HelloController.java

*** @author: kaiyi* @create: 2020-09-12 18:09*/
@Controller
public class HelloController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ResponseBody@GetMapping(value = "/test/createOrder")public String createOrderTest() {//订单下单成功OrderEntity orderEntity = new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());orderEntity.setModifyTime(new Date());//给MQ发送消息rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);return "ok";}
}

然后访问该路径 , 发送消息,然后去RMQ管理界面可以看到创建的消息已经成功了。
交换机:

交换机绑定的队列(路由key):

队列:

可以看到第一个队列是死信队列,第二个事普通队列
收到的消息为实体对象json:

控制器输出的监控信息:

收到过期的订单信息:准备关闭订单321c3329-d57a-4613-a4ff-331066d4105a
收到过期的订单信息:准备关闭订单44fcf65f-1e7a-40c6-8336-a6c60362920b

三、消息队列处理库存解锁及关单

1、流程分析


2、库存微服务

2.1 解锁库存配置

1、库存微服务gulimall-ware 引入高级消息队列amqp依赖:
gulimall-ware/pom.xml

<!-- 使用高级消息队列来解决分布式事务一致性 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、添加RMQ配置
gulimall-ware/src/main/resources/application.properties

# ===== RabbitMQ配置 ======
spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、创建RMQ配置文件
gulimall-ware/xxx/ware/config/MyRabbitMQConfig.java

package com.atguigu.gulimall.ware.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** RMQ配置** @author: kaiyi* @createTime: 2020-09-15 16:40**/@Configuration
public class MyRabbitMQConfig {/*** 使用JSON序列化机制,进行消息转换* @return*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// @RabbitListener(queues = "stock.release.stock.queue")// public void handle(Message message) {//// }/*** 库存服务默认的交换机* @return*/@Beanpublic Exchange stockEventExchange() {//String name, boolean durable, boolean autoDelete, Map<String, Object> argumentsTopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);return topicExchange;}/*** 普通队列* @return*/@Beanpublic Queue stockReleaseStockQueue() {//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsQueue queue = new Queue("stock.release.stock.queue", true, false, false);return queue;}/*** 延迟队列* @return*/@Beanpublic Queue stockDelay() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "stock-event-exchange");arguments.put("x-dead-letter-routing-key", "stock.release");// 消息过期时间 2分钟arguments.put("x-message-ttl", 120000);Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);return queue;}/*** 交换机与普通队列绑定* @return*/@Beanpublic Binding stockLocked() {//String destination, DestinationType destinationType, String exchange, String routingKey,//          Map<String, Object> argumentsBinding binding = new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);return binding;}/*** 交换机与延迟队列绑定* @return*/@Beanpublic Binding stockLockedBinding() {return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}
}

2.2 解锁库存流程

解锁库存流程:


可以看到,在锁定库存时,我们增加了库存工作单,用来记录库存锁定的明细记录,如果库存锁定异常,则会回滚,该表不会有数据记录,如果锁定成功,则会有具体的锁定记录,锁定成功后会发送消息到延时队列,过段时间会根据订单创建的状态(订单取消或订单未创建成功)来解锁库存。
解锁库存具体步骤:

2.3 业务代码

锁库存
锁库存,并发送消息到延时队列,方法 orderLockStock(WareSkuLockVo vo)
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

package com.atguigu.gulimall.ware.service.impl;import com.alibaba.fastjson.TypeReference;
import com.atguigumon.exception.NoStockException;
import com.atguigumon.to.mq.StockDetailTo;
import com.atguigumon.to.mq.StockLockedTo;
import com.atguigumon.utils.R;
import com.atguigu.gulimall.ware.entity.WareOrderTaskDetailEntity;
import com.atguigu.gulimall.ware.entity.WareOrderTaskEntity;
import com.atguigu.gulimall.ware.feign.OrderFeignService;
import com.atguigu.gulimall.ware.feign.ProductFeignService;
import com.atguigu.gulimall.ware.service.WareOrderTaskDetailService;
import com.atguigu.gulimall.ware.service.WareOrderTaskService;
import org.springframework.transaction.annotation.Transactional;@Service("wareSkuService")
public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> implements WareSkuService {@AutowiredWareSkuDao wareSkuDao;@AutowiredProductFeignService productFeignService;@Autowiredprivate WareOrderTaskService wareOrderTaskService;@Autowiredprivate WareOrderTaskDetailService wareOrderTaskDetailService;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate OrderFeignService orderFeignService;/*** 为某个订单锁定库存* @param vo* @return*/@Transactional(rollbackFor = Exception.class)@Overridepublic boolean orderLockStock(WareSkuLockVo vo) {/*** 保存库存工作单详情信息* 追溯*/WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();wareOrderTaskEntity.setOrderSn(vo.getOrderSn());wareOrderTaskEntity.setCreateTime(new Date());wareOrderTaskService.save(wareOrderTaskEntity);//1、按照下单的收货地址,找到一个就近仓库,锁定库存//2、找到每个商品在哪个仓库都有库存List<OrderItemVo> locks = vo.getLocks();List<SkuWareHasStock> collect = locks.stream().map((item) -> {SkuWareHasStock stock = new SkuWareHasStock();Long skuId = item.getSkuId();stock.setSkuId(skuId);stock.setNum(item.getCount());//查询这个商品在哪个仓库有库存List<Long> wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId);stock.setWareId(wareIdList);return stock;}).collect(Collectors.toList());//2、锁定库存for (SkuWareHasStock hasStock : collect) {boolean skuStocked = false;Long skuId = hasStock.getSkuId();List<Long> wareIds = hasStock.getWareId();if (org.springframework.util.StringUtils.isEmpty(wareIds)) {//没有任何仓库有这个商品的库存,抛出异常,前边已经锁定的库存也一起会回滚throw new NoStockException(skuId);}//1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ//2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁for (Long wareId : wareIds) {//锁定成功就返回1,失败就返回0Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());if (count == 1) {skuStocked = true;WareOrderTaskDetailEntity taskDetailEntity = WareOrderTaskDetailEntity.builder().skuId(skuId).skuName("").skuNum(hasStock.getNum()).taskId(wareOrderTaskEntity.getId()).wareId(wareId).lockStatus(1).build();wareOrderTaskDetailService.save(taskDetailEntity);//TODO 告诉MQ库存锁定成功StockLockedTo lockedTo = new StockLockedTo();lockedTo.setId(wareOrderTaskEntity.getId());StockDetailTo detailTo = new StockDetailTo();BeanUtils.copyProperties(taskDetailEntity,detailTo);lockedTo.setDetailTo(detailTo);rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);break;} else {//当前仓库锁失败,重试下一个仓库}}if (skuStocked == false) {//当前商品所有仓库都没有锁住throw new NoStockException(skuId);}}//3、肯定全部都是锁定成功的return true;}@Overridepublic void unlockStock(StockLockedTo to) {//库存工作单的idStockDetailTo detail = to.getDetailTo();Long detailId = detail.getId();/*** 解锁* 1、查询数据库关于这个订单锁定库存信息*   有:证明库存锁定成功了*      解锁:订单状况*          1、没有这个订单,必须解锁库存*          2、有这个订单,不一定解锁库存*              订单状态:已取消:解锁库存*                      已支付:不能解锁库存*/WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);if (taskDetailInfo != null) {//查出wms_ware_order_task工作单的信息Long id = to.getId();WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);//获取订单号查询订单状态String orderSn = orderTaskInfo.getOrderSn();//远程查询订单信息R orderData = orderFeignService.getOrderStatus(orderSn);if (orderData.getCode() == 0) {//订单数据返回成功OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});//判断订单状态是否已取消或者支付或者订单不存在if (orderInfo == null || orderInfo.getStatus() == 4) {//订单已被取消,才能解锁库存if (taskDetailInfo.getLockStatus() == 1) {//当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);}}} else {//消息拒绝以后重新放在队列里面,让别人继续消费解锁//远程调用服务失败throw new RuntimeException("远程调用服务失败");}} else {//无需解锁}}/*** 解锁库存的方法* @param skuId* @param wareId* @param num* @param taskDetailId*/public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {//库存解锁wareSkuDao.unLockStock(skuId,wareId,num);//更新工作单的状态WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();taskDetailEntity.setId(taskDetailId);taskDetailEntity.setLockStatus(2); //变为已解锁wareOrderTaskDetailService.updateById(taskDetailEntity);}@Dataclass SkuWareHasStock {private Long skuId;private Integer num;private List<Long> wareId;}
}

监听队列:
gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;import com.atguigumon.to.mq.StockLockedTo;
import com.atguigu.gulimall.ware.service.WareSkuService;
import com.rabbitmq.client.Channel;/*** 库存解锁监听** @desc* 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。** @author: kaiyi* @create: 2020-09-16 19:01*/
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁*  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁**  2、订单失败*      库存锁定失败**   只要解锁库存的消息失败,一定要告诉服务解锁失败*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

上边就是创建订单后锁库存,发消息到延时队列,监听队列,创建的订单出现异常是否来解锁库存,手动确认消息的核心代码逻辑。

2.4 调试

订单结算页,购买了一件商品。

在提交订单时,远程锁库存成功后模拟代码异常:

提交订单,由于异常会回滚订单并且回退到结算页,连续提交三次,我们可以看到延时队列里边有三条信息。

库存表wms_ware_sku,原来锁定了 3 件库存,现在库存锁定为6,因为库存是远程锁定的,所以,主程序事务回滚对远程的不起作用,不过在锁定库存成功时发库存锁定成功的消息,后边通过消息会检查是否释放库存。

库存工作单主表:

库存工作单明细表,我们可以看到新增的3条记录,明细状态lock_status(1-已锁定 2-已解锁 3-扣减)

订单服务订单表:

然后等到消息过期进入死信路由,TTL后客户端监听消息判断是否释放库存,消息在判断的时候先根据生成的订单号远程查询gulimall-order是否存在对应的订单,如果不存在,则直接释放锁定的库存,因为在生成订单的时候抛出异常生成的订单回滚了,所以 oms_order 表不存在订单,这时监听的消息拿到延时消息后,做完判断后会触发解锁库存的动作。
过了几分钟后,我们可以看到消息已经被消费了,并且锁定的库存也释放了,变回原来的 3 件。
消息队列:

库存表:

可以看到,RMQ在解决分布式事务一致性问题上非常强大,不仅实现了解耦,而且还保证了可靠消息+最终一致性。

四、RMQ 延时队列处理关单及库存解锁整合

1、流程分析


步骤:

  • 1、订单创建成功,发送消息给MQ
  • 2、订单服务订单关单监听器(死信之后判断是否关单)
  • 3、订单关单成功后发消息给MQ(订单释放直接和库存释放进行绑定)
  • 4、库存服务是释放库存监听器监听是否解锁库存队列(stock.release.stock.queue)
  • 5、库存解锁处理逻辑

这里出现了两个交换机绑定同一个队列的情况,即订单的交换机和库存的队列绑定在一起了。

2、订单关单

1、订单释放直接和库存释放进行绑定
gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.AMQP;/*** @author: kaiyi* @create: 2020-09-16 13:53*/
@Configuration
public class MyMQConfig {/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 *//*** 客户端监听队列(测试)* @param orderEntity* @param channel* @param message* @throws IOException*//*@RabbitListener(queues = "order.release.order.queue")public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}*//*** 死信队列** @return*/@Beanpublic Queue orderDelayQueue(){/*Queue(String name,  队列名字boolean durable,  是否持久化boolean exclusive,  是否排他boolean autoDelete, 是否自动删除Map<String, Object> arguments) 属性*/HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "order-event-exchange");   // 信死了交给哪个交换机arguments.put("x-dead-letter-routing-key", "order.release.order"); // 信死了交给哪个路由keyarguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟Queue queue = new Queue("order.delay.queue", true, false, false, arguments);return queue;}/*** 普通队列** @return*/@Beanpublic Queue orderReleaseQueue(){Queue queue = new Queue("order.release.order.queue", true, false, false);return queue;}/*** TopicExchange** @return*/@Beanpublic Exchange orderEventExchange(){/**   String name,*   boolean durable,*   boolean autoDelete,*   Map<String, Object> arguments* */return new TopicExchange("order-event-exchange", true, false);}@Beanpublic Binding orderCreateBinding() {/** String destination, 目的地(队列名或者交换机名字)* DestinationType destinationType, 目的地类型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments* */return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",  // 路由key一般为事件名null);}@Beanpublic Binding orderReleaseBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}/*** 订单释放直接和库存释放进行绑定* @return*/@Beanpublic Binding orderReleaseOtherBinding() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}
}

2、提交订单增加订单创建成功,发送消息给MQ
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/*** 提交订单* @param vo* @return*/// @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别// @Transactional(propagation = Propagation.REQUIRED)   设置事务的传播级别@Transactional(rollbackFor = Exception.class)// @GlobalTransactional(rollbackFor = Exception.class)@Overridepublic SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {confirmVoThreadLocal.set(vo);SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();//去创建、下订单、验令牌、验价格、锁定库存...//获取当前用户登录的信息MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();responseVo.setCode(0);//1、验证令牌是否合法【令牌的对比和删除必须保证原子性】String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";String orderToken = vo.getOrderToken();//通过lua脚本原子验证令牌和删除令牌Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()),orderToken);if (result == 0L) {//令牌验证失败responseVo.setCode(1);return responseVo;} else {//令牌验证成功//1、创建订单、订单项等信息OrderCreateTo order = createOrder();//2、验证价格BigDecimal payAmount = order.getOrder().getPayAmount();BigDecimal payPrice = vo.getPayPrice();if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {//金额对比//TODO 3、保存订单saveOrder(order);//4、库存锁定,只要有异常,回滚订单数据//订单号、所有订单项信息(skuId,skuNum,skuName)WareSkuLockVo lockVo = new WareSkuLockVo();lockVo.setOrderSn(order.getOrder().getOrderSn());//获取出要锁定的商品数据信息List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setSkuId(item.getSkuId());orderItemVo.setCount(item.getSkuQuantity());orderItemVo.setTitle(item.getSkuName());return orderItemVo;}).collect(Collectors.toList());lockVo.setLocks(orderItemVos);//TODO 调用远程锁定库存的方法//出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata)//为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务R r = wmsFeignService.orderLockStock(lockVo);if (r.getCode() == 0) {//锁定成功responseVo.setOrder(order.getOrder());// int i = 10/0;   // 抛出异常,测试远程回滚//TODO 订单创建成功,发送消息给MQrabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());//删除购物车里的数据redisTemplate.delete(CartConstant.CART_PREFIX + memberResponseVo.getId());return responseVo;} else {//锁定失败String msg = (String) r.get("msg");throw new NoStockException(msg);// responseVo.setCode(3);// return responseVo;}} else {responseVo.setCode(2);return responseVo;}}}

3、关单监听
gulimall-order/xxx/order/listener/OrderCloseListener.java

package com.atguigu.gulimall.order.listener;import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.service.OrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;/*** 关单监听** @author: kaiyi* @create: 2020-09-17 11:01*/
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {@Autowiredprivate OrderService orderService;@RabbitHandlerpublic void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());try {orderService.closeOrder(orderEntity);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

4、关单成功给库存服务发送MQ消息
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/*** 关闭订单* @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) {//关闭订单之前先查询一下数据库,判断此订单状态是否已支付OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn",orderEntity.getOrderSn()));if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {//代付款状态进行关单OrderEntity orderUpdate = new OrderEntity();orderUpdate.setId(orderInfo.getId());orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(orderUpdate);// 发送消息给MQOrderTo orderTo = new OrderTo();BeanUtils.copyProperties(orderInfo, orderTo);try {//TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);} catch (Exception e) {//TODO 定期扫描数据库,重新发送失败的消息}}}

3、订单释放和库存释放进行绑定

1、库存服务-库存释放监听器增加订单关单库存释放处理方法
gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;import com.atguigumon.to.OrderTo;
import com.atguigumon.to.mq.StockLockedTo;
/*** 库存解锁监听** @desc* 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。** @author: kaiyi* @create: 2020-09-16 19:01*/
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁*  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁**  2、订单失败*      库存锁定失败**   只要解锁库存的消息失败,一定要告诉服务解锁失败*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}/*** 订单关单库存释放** @param orderTo* @param message* @param channel* @throws IOException*/@RabbitHandlerpublic void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {log.info("******收到订单关闭,准备解锁库存的信息******");try {wareSkuService.unlockStock(orderTo);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}
}

这个监听器既可以处理库存解锁又可以处理订单关单的处理业务,根据参数来决定具体调用哪一个,这是一个重载。

2、具体解锁库存实现
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

/*** 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理* 导致卡顿的订单,永远都不能解锁库存* @param orderTo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void unlockStock(OrderTo orderTo) {String orderSn = orderTo.getOrderSn();//查一下最新的库存解锁状态,防止重复解锁库存WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);//按照工作单的id找到所有 没有解锁的库存,进行解锁Long id = orderTaskEntity.getId();List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id).eq("lock_status", 1));for (WareOrderTaskDetailEntity taskDetailEntity : list) {unLockStock(taskDetailEntity.getSkuId(),taskDetailEntity.getWareId(),taskDetailEntity.getSkuNum(),taskDetailEntity.getId());}}

五、消息丢失、重复、积压等解决方案

高并发场景的分布式事务,我们采用柔性事务+可靠消息+最终一致性方案(异步确保型),可靠性是最重要的,那么如何保证消息的可靠性呢?

1、消息丢失

1、消息发送出去,由于网络问题没有抵达服务器

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有容错机制,可记录到数据库,采用定期扫描重发的方式。
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发

代码示例:

/*** 关闭订单* @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) {//关闭订单之前先查询一下数据库,判断此订单状态是否已支付OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn",orderEntity.getOrderSn()));if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {//代付款状态进行关单OrderEntity orderUpdate = new OrderEntity();orderUpdate.setId(orderInfo.getId());orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(orderUpdate);// 发送消息给MQOrderTo orderTo = new OrderTo();BeanUtils.copyProperties(orderInfo, orderTo);try {//TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);} catch (Exception e) {//TODO 定期扫描数据库,重新发送失败的消息// while() 重试次数}}}

创建消息日志记录表:

2、消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。

  • publisher 也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

生产者消息确认回调应该增加日志记录,确认回调成功后修改记录日志的状态:
gulimall-order/xxx/order/config/MyRabbitConfig.java

/*** 定制RabbitTemplate* 1、服务收到消息就会回调* 1、spring.rabbitmq.publisher-confirms: true* 2、设置确认回调* 2、消息正确抵达队列就会进行回调* 1、spring.rabbitmq.publisher-returns: true* spring.rabbitmq.template.mandatory: true* 2、设置确认回调ReturnCallback* <p>* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)*/// @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法public void initRabbitTemplate() {/*** 1、只要消息抵达Broker就ack=true* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)* ack:消息是否成功收到* cause:失败的原因*///设置确认回调rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {/*** 1、做好消息确认机制(publisher,consumer【手动ack】】)* 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍*/// 服务器收到生产者发送的消息了// 修改消息的状态System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");});/*** 2、只要消息没有投递给指定的队列,就触发这个失败回调* message:投递失败的消息详细信息* replyCode:回复的状态码* replyText:回复的文本内容* exchange:当时这个消息发给哪个交换机* routingKey:当时这个消息用哪个路邮键*/rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");});}

3、自动ACK的状态下。消费者收到消息,但没来得及消费然后宕机。

  • 一定开启手动ACK,消费成功才移除,失败或者还没来得及处理就 noAck并重新入队。

防止消息丢失记住这两条:

1、做好消息确认机制(publisher,consumer【手动ack】】)
2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍

2、消息重复

出现重复的几种情况

  • 1、消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功。
    • Broker的消息重新由 unack 变为ready,并发送给其他消费者
  • 2、消息消费失败,由于重试机制,自动又将消息发送出去。
  • 3、成功消费,ack时宕机,消息又unack变为ready,Broker又重新发送

解决方案

  • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标识。
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用再处理。
  • rabbitMQ的每一个消息都有 redilivered字段,可以获取是否被重新投递过来的,而不是第一次被投递过来的。

3、消息积压

  • 消费者宕机
  • 消费者消费能力不足
  • 发送者发送流量太大
    • 上线更多的消费者,进行正常的消费
    • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

更多推荐

RabbitMQ消息队列处理库存解锁及关闭订单问题

本文发布于:2024-03-08 09:41:31,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1720512.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   解锁   库存   订单   消息

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!