消息队列 项目

编程入门 行业动态 更新时间:2024-10-27 16:31:20

消息<a href=https://www.elefans.com/category/jswz/34/1771257.html style=队列 项目"/>

消息队列 项目

那些实时性要求不高,且比较耗时的任务,是队列的最佳应用场景。

动账提醒

            wrapperResponse.setMessage("结算成功");// 动账提醒AccountExpenseDTO accountExpenseDTO = setlResultDTO.getAccountExpenseDTO();if (accountExpenseDTO != null && accountExpenseDTO.getPaySumamt() != null && accountExpenseDTO.getPaySumamt().compareTo(BigDecimal.ZERO) != 0) {//发送消息到 MQJobBaseService jobBaseService = SpringContextUtils.getBean("trtAcctPayPushMsgJobService");MQCommParamDTO mqCommParamDTO = new MQCommParamDTO("TX","1",1,"trtAcctPayPushMsgJobService",accountExpenseDTO);jobBaseService.sendMsgToMQ(mqCommParamDTO);}

一、简介

它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

二、组成

  2.1 Broker

消息服务器,作为server提供消息核心服务

  2.2 Producer

消息生产者,业务的发起方,负责生产消息传输给broker,

  2.3 Consumer

消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

  2.4 Topic

主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播。

  2.5 Queue

队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收。

  2.6 Message

消息体,根据不同通信协议(ampq)定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输。

ActiveMQ

官方下载地址

安装启动

  • 第一步:把ActiveMQ 的压缩包上传到Linux系统
  • 第二步:解压缩
  • 第三步:启动

使用bin目录下的activemq命令

# 启动:
[root@localhost bin]# ./activemq start# 关闭:
[root@localhost bin]# ./activemq stop# 查看状态:
[root@localhost bin]# ./activemq status

进入管理后台

<http://IP:8161/admin>用户名:admin
密码:admin

登录用户名密码可在 conf/jetty-realm.properties 文件中修改

消息队列两种模型

1.点对点模型(基于队列 Point to Point,PTP)

每个消息只能有一个消费者。

消息的生产者和消费者之间没有时间上的相关性,可以有多个发送者,但只能被一个消费者消费

一个消息只能被一个接受者接受一次生产者把消息发送到队列中(Queue),接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态。

2.发布者/订阅者模型(基于主题的Publish/Subscribe,pub/sub

每个消息可以有多个消费者。 生产者和消费者之间有时间上的相关性。

。订阅一个主题的消费者只能消 费自它订阅之后发布的消息. 允许多个接受者,类似于广播的方式 生产者将消息发送到主题上(Topic) 接受者必须先订阅

注:持久化订阅者:特殊的消费者,告诉主题,我一直订阅着,即使网络断开,消息服务器也记住所有持久化订阅者,如果有新消息,也会知道必定有人回来消费。

 
/** Copyright @ 2019 com.iflysse.trains* 01SpringBoot 下午2:18:30* All right reserved.**/package com.dcx.comm.utils;import java.util.List;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;public class ActiveMqUtils {/*** 注入JMS*/@Autowiredprivate JmsTemplate jmsTemplate;/*** 设置普通* @Title: sendNorMolMessage * @author: cxding* @createTime: 2019年4月28日 下午1:03:56* @param destination* @param text void*/public <T> void sendNorMolMessage(Destination destination, String text) {// 连接工厂ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();Connection connection = null;Session session = null;MessageProducer producer = null;try {// 创建链接connection = connectionFactory.createConnection();connection.start();// 创建session,开启事物session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 创建生产者producer = session.createProducer(destination);// 设置持久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置过期时间//producer.setTimeToLive(time);TextMessage message = session.createTextMessage(text);producer.send(message);// 提交session.commit();} catch (JMSException e) {throw new RuntimeException(e);} finally {// 关闭连接close(producer, session, connection, connectionFactory);}}}
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=root
spring.activemq.password=pass
package com.atguigu.gmall.payment.mq;import com.atguigu.gmall.bean.PaymentInfo;
import com.atguigu.gmall.service.PaymentService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.MapMessage;
import java.util.Date;
import java.util.Map;/*** @author : 熊亚东* @description :* @date : 2019/7/24 | 9:55**/
@Component/*让springboot启动时扫描此类*/
public class PaymentServiceMqListener {@AutowiredPaymentService paymentService;/*** 获取消息队列中的交易消息信息通过支付平台java客户端对象;** @param mapMessage* @throws JMSException*/@JmsListener(destination = "PAYMENT_CHECK_QUEUE", containerFactory = "jmsListenerContainerFactory")public void consumPaymentCheckResult(MapMessage mapMessage) throws JMSException {String  out_trade_no = mapMessage.getString("out_trade_no");Integer count        = 0;if (mapMessage.getString("count") != null) {count = Integer.parseInt("" + mapMessage.getInt("count"));}/*** 支付平台客户端对象 贸易号* 不能 从支付宝获取贸易状态的接口 中获取到该贸易的支付信息 就发送延时消息继续检查;*/Map<String, Object> resultMap = paymentService.checkAlipayPayment(out_trade_no);if (resultMap == null || resultMap.isEmpty()) { //如果结果是空的;继续发送消息 ,指定次数没有获取到支付订单的数据,就提示失败;/*继续发送延时消息检查用户支付状态*/if (count > 0) {count--;System.out.println("检查次数剩余" + count); //队列的检查次数paymentService.sendDelayPaymentResultCheckQueue(out_trade_no, count);} else {System.out.println("支付失败");return;}} else {//支付宝接口获取订单的状态,根据订单的状态,判断进行一下次的延迟任务还是支付成功更新数据和后续任务;String trade_status = (String) resultMap.get("trade_status");System.err.println(trade_status);//如果获取到的支付信息数据中订单状态是支付成功状态,添加支付信息到用户的支付信息表中;if (StringUtils.isNotBlank(trade_status) && trade_status.equals("TRADE_SUCCESS")) {/*支付成功,更新支付发送支付成功消息队列*/PaymentInfo paymentInfo = new PaymentInfo();paymentInfo.setOrderSn(out_trade_no); /*对外交易编号*/paymentInfo.setPaymentStatus("已付款");paymentInfo.setAlipayTradeNo((String) resultMap.get("trade_no")); /*第三方交易编号*/paymentInfo.setCallbackContent((String) resultMap.get("callback_content"));paymentInfo.setCallbackTime(new Date());paymentInfo.setConfirmTime(new Date());/*======================================================================================================================================*//*进行支付更新的幂等性检查操作在updatePayment方法里面,防止与paymentController一起重复更新*//*======================================================================================================================================*/paymentService.updatePayment(paymentInfo);return;} else {/*继续发送延时消息检查用户支付状态*/if (count > 0) {count--;System.out.println("检查次数剩余" + count);paymentService.sendDelayPaymentResultCheckQueue(out_trade_no, count);} else {System.out.println("支付失败");return;}}}}}

ultimate 美 /ˈʌltəmɪt/

adj. 最终的;极限的;根本的
n. 终极;根本;基本原则

更多推荐

消息队列 项目

本文发布于:2023-07-28 18:02:09,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1269892.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   消息   项目

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!