自定义简易版消息中间件"/>
自定义简易版消息中间件
1.先来看看自定义版的消息中间件架构图
2.案例介绍: 口罩的生产和消费
3.代码大致结构
4.实现方式,通过多线程配合队列模式,实现简易消息的生产和消费
5.代码
生产者代码
package cn.mq.demo;import java.util.concurrent.BlockingQueue;/*** @author sj* @date 2023/3/6 22:38*/
public class Producer implements Runnable {// 阻塞队列,可指定大小private BlockingQueue<Mask> queue;public Producer(BlockingQueue<Mask> queue) {this.queue = queue;}private int index = 0;/*** 模拟生产者生产消息*/@Overridepublic void run() {while (true) {try {Thread.sleep(100);if (queue.remainingCapacity() == 0) {System.out.println("口罩生产太多了,快来买");} else {Mask mask = new Mask();mask.setId(index++);mask.setMaskType("N95");System.out.println("正在生产第" + (index -1)+ "个" + mask.getMaskType() + "口罩");queue.put(mask);System.out.println("生产完成第" +index+ "个" + mask.getMaskType() + "口罩");}} catch (InterruptedException e) {e.printStackTrace();}}}
}
消费者代码
package cn.mq.demo;import java.util.concurrent.BlockingQueue;/*** @author sj* @date 2023/3/6 22:38*/
public class Consumer implements Runnable {private BlockingQueue<Mask> queue;public Consumer(BlockingQueue<Mask> queue) {this.queue = queue;}/*** 模拟消费者消费消息*/@Overridepublic void run() {while (true) {try {Thread.sleep(200);System.out.println("正在准备购买口罩");Mask take = queue.take();System.out.println("买到了滴" + take.getId() + "个" + take.getMaskType() + "口罩");} catch (InterruptedException e) {e.printStackTrace();}}}
}
消息实体代码
package cn.mq.demo;/*** 口罩** @author sj* @date 2023/3/6 22:56*/
public class Mask {private Integer id;private String maskType;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getMaskType() {return maskType;}public void setMaskType(String maskType) {this.maskType = maskType;}@Overridepublic String toString() {return "Mask{" +"id=" + id +", maskType='" + maskType + '\'' +'}';}
}
测试类代码
package cn.mq.demo;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;/*** @author sj* @date 2023/3/6 23:10*/
public class App {public static void main(String[] args) {BlockingQueue<Mask> queue = new ArrayBlockingQueue<>(20);new Thread(new Producer(queue)).start();new Thread(new Consumer(queue)).start();}
}
测试效果图
6.自定义消息中介间问题
如何保证消息的顺序消费
如何保证消息不丢失
如何保证消息发送成功
如何保证消息不重复
如何保证高可用
更多推荐
自定义简易版消息中间件
发布评论