springboot+消息队列

编程入门 行业动态 更新时间:2024-10-10 23:26:54

springboot+消息<a href=https://www.elefans.com/category/jswz/34/1771257.html style=队列"/>

springboot+消息队列

1.创建msg文件夹,在里面添加下面几个java文件

MsgQueueConstant:
package com.panku.msg;import java.util.concurrent.Semaphore;/*** @program: hycloudoa* @description:* @author: xwf* @create: 2020-06-24 16:15**/
public class MsgQueueConstant {/*** 最大队列数量*/public final static int MAX_QUEUE = 1000;/*** 队列中最少成员数*/public final static int MIN_Count = 10;/*** 生产者信号量 锁,用于系统消息挂起和通知*/public  static Semaphore producerSemaphore = new Semaphore(1);/*** 消费者信号量 锁,用于系统消息挂起和通知*/public  static Semaphore cosumerSemaphore = new Semaphore(0);}
SysMsgQueue:
package com.panku.msg;import com.panku.core.entity.BizMsg;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;/*** @program: hycloudoa* @description: 系统消息队列,* 采用无界队列LinkedBlockingQueue:是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素* @author: xwf* @create: 2020-06-24 15:06**/
@Component
public class SysMsgQueue {/*** 存放系统消息, 采用无界队列LinkedBlockingQueue:是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素*/private BlockingQueue<BizMsg> bizMsgQueue= new LinkedBlockingDeque<>(MsgQueueConstant.MAX_QUEUE);/*** 批量添加队列消息* @param bizMsgs*/public void addAll(List<BizMsg> bizMsgs) {bizMsgQueue.addAll(bizMsgs);}/*** 移除并返回列头部元素,如队列为空阻塞* @return* @throws InterruptedException*/public BizMsg take() throws InterruptedException {return bizMsgQueue.take();}/*** //获取并移除此队列的头,如果此队列为空,则返回 null。* @return*/public BizMsg poll(){return bizMsgQueue.poll();}/*** 返回当前队列成员数量* @return*/public Integer count(){return bizMsgQueue.size();}
}
SysMsgQueueCosumer:
package com.panku.msg;import com.alibaba.fastjson.JSONObject;
import com.panku.core.entity.BizMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @program: hycloudoa* @description: 系统消息消费者* @author: xwf* @create: 2020-06-24 15:17**/
@Component
@Slf4j
public class SysMsgQueueCosumer implements Runnable {@Autowiredprivate SysMsgQueue sysMsgQueue;private Thread thread;public void init() {thread = new Thread(this, "MsgCosumer");thread.start();}@Overridepublic void run() {//阻塞,等待生产者生产数据,继续执行try {MsgQueueConstant.cosumerSemaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}while (true) {try {execute();TimeUnit.MILLISECONDS.sleep(200);} catch (Exception e) {e.printStackTrace();log.info("消费者发生异常:{}", e.getMessage());}}}/*** 消费消息*/private void execute() {BizMsg msg = sysMsgQueue.poll();//获取队列消息,如消息为null,则释放lock锁,继续队列添加消息if (msg != null) {log.info("--------开始消费消息{}", JSONObject.toJSONString(msg));/*  log.info("--------开始消费消息{}", JSONObject.toJSONString(msg));//根据消息类型发送对应的消息if (msg.getMsgType() == 1) {//发送短信try {//判断是否发送消息if (SysParamInfo.getParamVal(Constants.IF_SEND_SMS).equals("1")) {//将长连接转为短连接String simpleUrl = SimpleUrlUtil.getSimpleUrl(msg.getMsgContent());
//                        String urlParam = simpleUrl.substring(25);String[] split = simpleUrl.split("/");String urlParam = split[3];log.info("短信参数:【{}】", urlParam);SmsSingleSenderResult smsSingleSenderResult = smsSingleSender.sendWithParam("86", msg.getAcpPerson(),configBean.getInvoiceTplId().intValue(), new String[]{urlParam}, null, "", "");log.info("短信发送结果:{}", JSONObject.toJSONString(smsSingleSenderResult));bizMsgMapper.update(null, new UpdateWrapper<BizMsg>().eq("id", msg.getId()).set("send_state", 2).set("send_time", new Date()));} else {log.info("参数表中发送短信的配置为0,即不发送短信");bizMsgMapper.update(null, new UpdateWrapper<BizMsg>().eq("id", msg.getId()).set("send_state", 2).set("send_time", new Date()));}} catch (Exception e) {e.printStackTrace();log.error("发送短信{}时,发生异常{}", msg.toString(), e.getMessage());bizMsgMapper.update(null, new UpdateWrapper<BizMsg>().eq("id", msg.getId()).set("send_state", 3).set("send_time", new Date()));}} else if (msg.getMsgType() == 2) {//发送邮件if (SysParamInfo.getParamVal(Constants.IF_SEND_EMAIL).equals("1")) {MailBean mailBean = new MailBean();mailBean.setSubject(configBean.getHospitalName() + "开票结果通知");mailBean.setRecipient(msg.getAcpPerson());mailBean.setContent("您的电子发票已开具,详情访问地址:" + msg.getMsgContent());try {mailService.sendSimpleMail(mailBean);bizMsgMapper.update(null, new UpdateWrapper<BizMsg>().eq("id", msg.getId()).set("send_state", 2).set("send_time", new Date()));} catch (Exception e) {e.printStackTrace();log.info("发送邮件时出错:{}", e.getMessage(), e);bizMsgMapper.update(null, new UpdateWrapper<BizMsg>().eq("id", msg.getId()).set("send_state", 3).set("send_time", new Date()));}} else {log.info("参数表中发送邮件的配置为0,即不发送邮件");bizMsgMapper.update(null, new UpdateWrapper<BizMsg>().eq("id", msg.getId()).set("send_state", 2).set("send_time", new Date()));}}*/} else {//队列中执行完毕,通知生产者生产数据log.info("消费者:队列中执行完毕,通知生产者生产数据");MsgQueueConstant.producerSemaphore.release();try {log.info("消费者:队列中执行完毕,等待生产者生产数据");//阻塞,等待生产者生产数据,继续执行MsgQueueConstant.cosumerSemaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}}}
}
SysMsgQueueProducer:
package com.panku.msg;import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.panku.core.entity.BizMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;/*** @program: hycloudoa* @description: 系统消息生产者* @author: xwf* @create: 2020-06-24 15:17**/
@Component
@Slf4j
public class SysMsgQueueProducer implements Runnable {@Autowiredprivate SysMsgQueue sysMsgQueue;/*** 是否存在消息*/private boolean emptyFlag = true;private Thread thread;public void init() {log.info("消息队列生产者服务");thread = new Thread(this, "MsgProducer");thread.start();}@Overridepublic void run() {while (true) {try {pullData();TimeUnit.MILLISECONDS.sleep(200);} catch (Exception e) {e.printStackTrace();log.error("生产者生产数据时发生异常:{}", e.getMessage());}}}/*** 拉取系统未发送数据*/public void pullData() {try {//使用信号量,消费信号量如信号量为0阻塞.等待通知:队列执行完毕或者有新的消息log.info("生产者:等待队列执行完毕或新消息");MsgQueueConstant.producerSemaphore.acquire();//此步骤为异步线程,会发生消息还没插入数据库,就会查询待发送消息,此时会查询不到。所以加阻塞等待消息插入数据库中,在查询执行Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}/*  QueryWrapper<BizMsg> queryWrapper = new QueryWrapper<>();queryWrapper.eq("send_state", 1);queryWrapper.orderByAsc("add_time");IPage<BizMsg> bizMsgIPage = bizMsgMapper.selectPage(new Page<>(1, MsgQueueConstant.MAX_QUEUE), queryWrapper);List<BizMsg> unsentMsg = bizMsgIPage.getRecords();*/List<BizMsg> unsentMsg = new ArrayList<>();BizMsg bizMsg=new BizMsg();bizMsg.setId(1);unsentMsg.add(bizMsg);log.info("生产者:生产数据{}条", unsentMsg.size());if (!unsentMsg.isEmpty()) {log.info("++++++++生产消息:{}", JSONObject.toJSONString(unsentMsg));sysMsgQueue.addAll(unsentMsg);//添加信号量+1,通知消费者进行消费log.info("生产者:通知消费者");MsgQueueConstant.cosumerSemaphore.release();}}}

2.新建config文件夹,在里面添加 SysMsgConfig.java文件

package com.panku.config.msg;import com.panku.msg.SysMsgQueueCosumer;
import com.panku.msg.SysMsgQueueProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** @program: hycloudoa* @description: 消息配置* @author: xwf* @create: 2020-06-28 09:04**/
@Configuration
public class SysMsgConfig {@Autowiredprivate SysMsgQueueCosumer sysMsgQueueCosumer;@Autowiredprivate SysMsgQueueProducer sysMsgQueueProducer;@PostConstructpublic void init() {sysMsgQueueProducer.init();sysMsgQueueCosumer.init();}}

3.启动测试

更多推荐

springboot+消息队列

本文发布于:2024-03-06 01:15:15,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1713966.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   消息   springboot

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!