队列 项目"/>
消息队列 项目
那些实时性要求不高,且比较耗时的任务,是队列的最佳应用场景。
动账提醒
wrapperResponse.setMessage("结算成功");// 动账提醒AccountExpenseDTO accountExpenseDTO = setlResultDTO.getAccountExpenseDTO();if (accountExpenseDTO != null && accountExpenseDTO.getPaySumamt() != null && accountExpenseDTO.getPaySumamt().compareTo(BigDecimal.ZERO) != 0) {//发送消息到 MQJobBaseService jobBaseService = SpringContextUtils.getBean("trtAcctPayPushMsgJobService");MQCommParamDTO mqCommParamDTO = new MQCommParamDTO("TX","1",1,"trtAcctPayPushMsgJobService",accountExpenseDTO);jobBaseService.sendMsgToMQ(mqCommParamDTO);}
一、简介
它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。
二、组成
2.1 Broker
消息服务器,作为server提供消息核心服务
2.2 Producer
消息生产者,业务的发起方,负责生产消息传输给broker,
2.3 Consumer
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
2.4 Topic
主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播。
2.5 Queue
队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收。
2.6 Message
消息体,根据不同通信协议(ampq)定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输。
ActiveMQ
官方下载地址
安装启动
- 第一步:把ActiveMQ 的压缩包上传到Linux系统
- 第二步:解压缩
- 第三步:启动
使用bin目录下的activemq命令
# 启动:
[root@localhost bin]# ./activemq start# 关闭:
[root@localhost bin]# ./activemq stop# 查看状态:
[root@localhost bin]# ./activemq status
进入管理后台
<http://IP:8161/admin>用户名:admin
密码:admin
登录用户名密码可在 conf/jetty-realm.properties
文件中修改
消息队列两种模型
1.点对点模型(基于队列 Point to Point,PTP)
每个消息只能有一个消费者。
消息的生产者和消费者之间没有时间上的相关性,可以有多个发送者,但只能被一个消费者消费。
一个消息只能被一个接受者接受一次生产者把消息发送到队列中(Queue),接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态。
2.发布者/订阅者模型(基于主题的Publish/Subscribe,pub/sub)
每个消息可以有多个消费者。 生产者和消费者之间有时间上的相关性。
。订阅一个主题的消费者只能消 费自它订阅之后发布的消息. 允许多个接受者,类似于广播的方式 生产者将消息发送到主题上(Topic) 接受者必须先订阅
注:持久化订阅者:特殊的消费者,告诉主题,我一直订阅着,即使网络断开,消息服务器也记住所有持久化订阅者,如果有新消息,也会知道必定有人回来消费。
/** Copyright @ 2019 com.iflysse.trains* 01SpringBoot 下午2:18:30* All right reserved.**/package com.dcx.comm.utils;import java.util.List;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;public class ActiveMqUtils {/*** 注入JMS*/@Autowiredprivate JmsTemplate jmsTemplate;/*** 设置普通* @Title: sendNorMolMessage * @author: cxding* @createTime: 2019年4月28日 下午1:03:56* @param destination* @param text void*/public <T> void sendNorMolMessage(Destination destination, String text) {// 连接工厂ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();Connection connection = null;Session session = null;MessageProducer producer = null;try {// 创建链接connection = connectionFactory.createConnection();connection.start();// 创建session,开启事物session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 创建生产者producer = session.createProducer(destination);// 设置持久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置过期时间//producer.setTimeToLive(time);TextMessage message = session.createTextMessage(text);producer.send(message);// 提交session.commit();} catch (JMSException e) {throw new RuntimeException(e);} finally {// 关闭连接close(producer, session, connection, connectionFactory);}}}
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=root
spring.activemq.password=pass
package com.atguigu.gmall.payment.mq;import com.atguigu.gmall.bean.PaymentInfo;
import com.atguigu.gmall.service.PaymentService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.MapMessage;
import java.util.Date;
import java.util.Map;/*** @author : 熊亚东* @description :* @date : 2019/7/24 | 9:55**/
@Component/*让springboot启动时扫描此类*/
public class PaymentServiceMqListener {@AutowiredPaymentService paymentService;/*** 获取消息队列中的交易消息信息通过支付平台java客户端对象;** @param mapMessage* @throws JMSException*/@JmsListener(destination = "PAYMENT_CHECK_QUEUE", containerFactory = "jmsListenerContainerFactory")public void consumPaymentCheckResult(MapMessage mapMessage) throws JMSException {String out_trade_no = mapMessage.getString("out_trade_no");Integer count = 0;if (mapMessage.getString("count") != null) {count = Integer.parseInt("" + mapMessage.getInt("count"));}/*** 支付平台客户端对象 贸易号* 不能 从支付宝获取贸易状态的接口 中获取到该贸易的支付信息 就发送延时消息继续检查;*/Map<String, Object> resultMap = paymentService.checkAlipayPayment(out_trade_no);if (resultMap == null || resultMap.isEmpty()) { //如果结果是空的;继续发送消息 ,指定次数没有获取到支付订单的数据,就提示失败;/*继续发送延时消息检查用户支付状态*/if (count > 0) {count--;System.out.println("检查次数剩余" + count); //队列的检查次数paymentService.sendDelayPaymentResultCheckQueue(out_trade_no, count);} else {System.out.println("支付失败");return;}} else {//支付宝接口获取订单的状态,根据订单的状态,判断进行一下次的延迟任务还是支付成功更新数据和后续任务;String trade_status = (String) resultMap.get("trade_status");System.err.println(trade_status);//如果获取到的支付信息数据中订单状态是支付成功状态,添加支付信息到用户的支付信息表中;if (StringUtils.isNotBlank(trade_status) && trade_status.equals("TRADE_SUCCESS")) {/*支付成功,更新支付发送支付成功消息队列*/PaymentInfo paymentInfo = new PaymentInfo();paymentInfo.setOrderSn(out_trade_no); /*对外交易编号*/paymentInfo.setPaymentStatus("已付款");paymentInfo.setAlipayTradeNo((String) resultMap.get("trade_no")); /*第三方交易编号*/paymentInfo.setCallbackContent((String) resultMap.get("callback_content"));paymentInfo.setCallbackTime(new Date());paymentInfo.setConfirmTime(new Date());/*======================================================================================================================================*//*进行支付更新的幂等性检查操作在updatePayment方法里面,防止与paymentController一起重复更新*//*======================================================================================================================================*/paymentService.updatePayment(paymentInfo);return;} else {/*继续发送延时消息检查用户支付状态*/if (count > 0) {count--;System.out.println("检查次数剩余" + count);paymentService.sendDelayPaymentResultCheckQueue(out_trade_no, count);} else {System.out.println("支付失败");return;}}}}}
ultimate 美 /ˈʌltəmɪt/
adj. 最终的;极限的;根本的
n. 终极;根本;基本原则
更多推荐
消息队列 项目
发布评论