在springboot中使用事件监听实现消息订阅发布

编程入门 行业动态 更新时间:2024-10-25 22:23:15

在springboot中使用事件监听实现<a href=https://www.elefans.com/category/jswz/34/1771421.html style=消息订阅发布"/>

在springboot中使用事件监听实现消息订阅发布

前面文章介绍过使用guava中的eventbus 实现发布订阅功能 ,对于普通项目已经能够很好的实现代码解耦,其实在spring中也提供了类似的功能,可以在spring项目中不需要引入第三方依赖的情况下实现发布订阅功能,在spring中主要通过ApplicationContext方法中的publishEvent()方法发布消息,再通过ApplicationListener的实现类接收消息。下面就介绍一下如何使用:

首先需要引入springboot相关依赖:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.11</version><relativePath/>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.7</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins>
</build>

在主启动类上面开启异步,这样在消费者上添加异步注解时就可以实现与生产者线程的解耦,实现异步消费,否则生产者和消费者都是在同一个线程中,不能达到异步效果:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;/*** @Author xingo* @Date 2023/11/1*/
@SpringBootApplication
@EnableAsync
public class ProviderApplication {public static void main(String[] args) {SpringApplication.run(ProviderApplication.class, args);}
}

生产者生产消息可以封装到一个实体类中,这个实体类可以是一个普通的pojo类,这种消息需要通过@EventListener注解实现消息的消费;二是继承ApplicationEvent的实体类,这种消息可以通过@EventListener注解进行消费,也可以通过实现ApplicationListener接口进行消费,下面创建两个用于消息载体的实体类:

import org.springframework.context.ApplicationEvent;/*** 事件实体类1** @Author xingo* @Date 2023/11/1*/
public class MyEvent1 extends ApplicationEvent {private String message;public MyEvent1(Object source) {super(source);}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}
}
/*** 事件实体类2** @Author xingo* @Date 2023/11/1*/
public class MyEvent2 {private String message;public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}
}

消息的消费者有两种:第一种方式是通过实现ApplicationListener接口,在泛型中指定消息类型,通过onApplicationEvent方法接收消息并消费:

import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;/*** 事件监听方式1** @Author xingo* @Date 2023/11/1*/
@Async
@Component
public class MyApplicationListener implements ApplicationListener<MyEvent1> {@Overridepublic void onApplicationEvent(MyEvent1 event) {System.out.println("onApplicationEvent -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));}
}

第二种方式是通过@EventListener注解方式消费数据,它消费的消息不需要继承ApplicationEvent就可以被消费:

import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;/*** 事件监听** @Author xingo* @Date 2023/11/1*/
@Component
public class MyEventListener {@Async@EventListener(MyEvent1.class)public void listener01(MyEvent1 event) {System.out.println("listener01 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));}@Async@EventListener(MyEvent2.class)public void listener02(MyEvent2 event) {System.out.println("listener02 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));}@EventListener(MyEvent1.class)public void listener03(MyEvent1 event) {System.out.println("listener03 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));}@EventListener(MyEvent2.class)public void listener04(MyEvent2 event) {System.out.println("listener04 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));}
}

上面的方法添加@Async后就可以实现异步消费功能,如果不添加该注解,消费者与生产者会在同一个线程中执行。

生产者就比较简单了,只需要在类中注入ApplicationContext,通过publishEvent()方法发送事件消息:

import org.example.pojo.ApiResult;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author xingo* @Date 2023/11/1*/
@RestController
public class MessageController {@Autowiredprivate ApplicationContext applicationContext;@GetMapping("/send/message")public ApiResult sendMessage(String message) {System.out.println("sendMessage -> thread : " + Thread.currentThread().getName() + " | message : " + message);MyEvent1 event1 = new MyEvent1(this);event1.setMessage(message);MyEvent2 event2 = new MyEvent2();event2.setMessage(message);applicationContext.publishEvent(event1);applicationContext.publishEvent(event2);return ApiResult.success(null);}
}

通过上面简单几步就实现了在同一个进程中的事件订阅发布功能,相对来说还是比较简单的,启动程序调用接口:

可以看到控制台打印如下内容:

sendMessage -> thread : http-nio-9523-exec-1 | message : hello,world
listener03 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world","timestamp":1698824483483}
onApplicationEvent -> thread : task-1 | receive : {"message":"hello,world","timestamp":1698824483483}
listener04 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world"}
listener01 -> thread : task-2 | receive : {"message":"hello,world","timestamp":1698824483483}
listener02 -> thread : task-3 | receive : {"message":"hello,world"}

可以看到,添加@Async注解的方法接收消息时会开启一个新的线程,没有添加该注解的方法接收消息与发布消息在同一个线程中。如果要实现真正的解耦,那么在方法上添加@Async是必需的。

这种方式实现的订阅发布,并不能替代真正的消息队列的发布订阅,首先这种方式的生产者和消费者都在一个进程中,不能实现扩容,虽然可以通过配置线程池增加线程数量,但也给系统增加了负担;第二、因为这种发布订阅方式是基于内存的,默认并没有提供持久化来保证消息的不丢失,一旦系统崩溃或重启,对于没有消费的数据就会存在丢失的风险,这种情况就需要在业务中自己实现数据的一致性

框架提供的订阅发布模式将相关功能做了封装,使用场景更通用,如果我们不使用框架,一样可以实现相关功能,只是鸡肋了一些,比如我可以基于阻塞队列达到同样的效果:

import com.alibaba.fastjson.JSONObject;
import java.util.concurrent.LinkedBlockingQueue;/*** 基于阻塞队列实现发布订阅功能** @Author wangxixin* @Date 2023/11/1*/
public class MyPubSub {private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();/*** 发布事件* @param event*/public void publish(String event) {queue.add(event);}/*** 监听事件*/public void listener() {new Thread(() -> {while (true) {try {String take = queue.take();System.out.println("rec : " + JSONObject.toJSONString(take));} catch (Exception e) {e.printStackTrace();}}}).start();}public static void main(String[] args) {MyPubSub myPubSub = new MyPubSub();myPubSub.listener();myPubSub.publish("Hello,world!");}
}

更多推荐

在springboot中使用事件监听实现消息订阅发布

本文发布于:2023-11-16 22:39:20,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1636054.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:消息   事件   springboot

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!