使用redis实现延时消息发送功能

编程入门 行业动态 更新时间:2024-10-28 05:20:51

使用redis实现延时<a href=https://www.elefans.com/category/jswz/34/1771421.html style=消息发送功能"/>

使用redis实现延时消息发送功能

前言

         redis本身也有发布订阅的模式,但是如果想要实现例如rocketmq或者rabbitmq的延时任务功能要怎么做呢,目前比较流行的做法有两种,一种是使用sortedset数据模型,把超时时间设置为score,然后系统启动一个定时任务定时去检查score超时的key然后把key取出来,再进行下一阶段的任务;第二种方法就是利用redis本身的通知机制,当key到期的时候会进行通知,通过捕获通知的信息来实现延时通知的效果。本文使用第二种方式,并且通过注解和反射使得整体功能更具有扩展性和适用性。首先看下效果

初始化监听器成功 com.loveprogrammer.redismq.listener.handler.impl.TestHandler
初始化监听器成功 com.loveprogrammer.redismq.listener.handler.impl.Test2Handler
c.l.r.l.RedisKeyExpirationListener       :获取到延时任务key mq:test:execute:key1
c.l.r.listener.handler.impl.TestHandler  : test-入参是字符串:hello
c.l.r.l.RedisKeyExpirationListener       :获取到延时任务key mq:test:execute2:key2
c.l.r.listener.handler.impl.TestHandler  : test-入参是数字:2
c.l.r.l.RedisKeyExpirationListener       :获取到延时任务key mq:test2:execute:key3
c.l.r.l.handler.impl.Test2Handler        : test2-入参是对象:张三 20
c.l.r.l.RedisKeyExpirationListener       : 获取到延时任务key mq:test2:execute2:key4
c.l.r.l.handler.impl.Test2Handler        : test2-入参是浮点数:3.1415926

目标

           1、通过注解自动注册为监听器

           2、一个topic下支持多个tag,不同的tag指向不同的方法执行。

实现

第一步:初始化项目

        创建一个空的springboot项目,pom依赖如下:

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- redis 缓存操作 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--常用工具类 --><dependency><groupId>org.apachemons</groupId><artifactId>commons-lang3</artifactId></dependency><!--反射工具类 --><dependency><groupId>org.reflections</groupId><artifactId>reflections</artifactId><version>0.9.10</version></dependency><!-- 阿里JSON解析器 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.34</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

第二步:接入redis

首先把redis的配置信息写入配置文件

# Spring配置
spring:# redis 配置redis:# 地址host: 127.0.0.1# 端口,默认为6379port: 6379# 数据库索引database: 0# 密码password: # 连接超时时间timeout: 10slettuce:pool:# 连接池中的最小空闲连接min-idle: 0# 连接池中的最大空闲连接max-idle: 8# 连接池的最大数据库连接数max-active: 8# #连接池最大阻塞等待时间(使用负值表示没有限制)max-wait: -1ms

编写配置类

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport
{@Bean@SuppressWarnings(value = { "unchecked", "rawtypes" })public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory){RedisTemplate<Object, Object> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);// 使用StringRedisSerializer来序列化和反序列化redis的key值template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(serializer);// Hash的key也采用StringRedisSerializer的序列化方式template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(serializer);template.afterPropertiesSet();return template;}
}

        redis操作工具类:RedisCache这里省略,大家可以去源码上下载,源码在文末

第三步:创建注解

@Retention(RetentionPolicy.RUNTIME)
public @interface Listeners {/*** 转换为DefaultMqPushConsumer后订阅的topic* 默认为“DEFAULT_TOPIC”*/String topic() default "DEFAULT_TOPIC";
}
@Retention(RetentionPolicy.RUNTIME)
public @interface MQListener {/*** 订阅的tag*/String tag() default "*";/*** 请求方消息类型*/Class<?> messageClass() default Object.class;}

第四步:创建redis的监听

@Configuration
public class RedisListenerConfig {@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}

 第五步:设计一个接口IHanlder,其实现类为实际监听业务类

             这里接口及抽象实现类无内容,后续方便扩展

public interface IHandler {}
public abstract class BaseHandler implements IHandler{}

       创建一个监听业务类备用,这里类上的注解表示监听的topic,方法上的注解表示监听的tag以及入参的类型,方便进行类型转换。

@Component
@Listeners(topic = "test")
public class TestHandler extends BaseHandler {private static final Logger logger = LoggerFactory.getLogger(TestHandler.class);@MQListener(tag = "execute",messageClass = String.class)public Boolean execute(String key, String message) {logger.info("test-入参是字符串:{}" ,message);return Boolean.TRUE;}@MQListener(tag = "execute2",messageClass = Integer.class)public Boolean execute2(String key, Integer number) {logger.info("test-入参是数字:{}" ,number);return Boolean.TRUE;}
}

  设计一个工厂方法,自动注册这些监听业务类

 

@Component
@Slf4j
public class HandlerFactory implements CommandLineRunner {public Map<String, Class> handlerMap = new HashMap<>();@Overridepublic void run(String... args) throws Exception {// 找到所有实现类Reflections reflections = new Reflections("com.loveprogrammer.redismq.listener.handler.impl");// 获取在指定包扫描的目录所有的实现类Set<Class<? extends BaseHandler>> classes = reflections.getSubTypesOf(BaseHandler.class);for (Class<? extends IHandler > aClass : classes) {try {Listeners listeners = aClass.getAnnotation(Listeners.class);String topic = listeners.topic();handlerMap.put(topic, aClass);log.info("初始化监听器成功 {}",aClass.getName());} catch (Exception e) {log.error("初始化" + aClass.getName() + "监听器失败",e);}}}
}

第六步:监听主入口

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {@Autowiredprivate MqUtil mqUtil;@Autowiredprivate HandlerFactory handlerFactory;@Autowiredprivate RedisCache redisCache;private static final Logger logger = LoggerFactory.getLogger(RedisKeyExpirationListener.class);public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}// 延时任务执行线程池private ThreadPoolExecutor executor = new ThreadPoolExecutor(3,10,0L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2048),new ThreadFactoryBuilder().setNameFormat("hyh-mq-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());@Overridepublic void onMessage(Message message, byte[] pattern) {String expiredKey = message.toString();if (!expiredKey.startsWith("mq") || StringUtils.isBlank(expiredKey)) {return;}// 分布式锁-防止多次消费的情况String lock = redisCache.tryLock(expiredKey, 60);if (lock == null) {return;}// 获得值String value = mqUtil.getMessage(expiredKey);if (value == null) {value = "";}logger.info("获取到延时任务key {}", expiredKey);// 都是按照 mq:topic:tag:keyString[] keys = expiredKey.split(":");if (keys.length < 4) {return;}String type = keys[1];String tag = keys[2];String key = keys[3];Class handler = handlerFactory.handlerMap.get(type);if (handler == null) {logger.warn("未获取到指定的任务对象,key {}", expiredKey);return;}String finalValue = value;String lockValue = lock;executor.execute(() -> {Boolean execute = false;try {Object bean = SpringContextHelper.getBean(handler);// 找到tag 遍历methodsMethod[] methods = handler.getMethods();for (Method method : methods) {MQListener mqListener = method.getAnnotation(MQListener.class);if (tag.equals(mqListener.tag())) {Class<?> aClass = mqListener.messageClass();String name = aClass.getName();// 先处理基本类型if("java.lang.String".equals(name)) {method.invoke(bean, key, finalValue);}else if("java.lang.Long".equals(name) ) {Long object = Long.parseLong(finalValue);method.invoke(bean, key, object);}else if("java.lang.Integer".equals(name) ) {Integer object = Integer.parseInt(finalValue);method.invoke(bean, key, object);}else if("java.lang.Short".equals(name) ) {Short object = Short.parseShort(finalValue);method.invoke(bean, key, object);}else if("java.lang.Byte".equals(name) ) {Byte object = Byte.parseByte(finalValue);method.invoke(bean, key, object);}else if("java.lang.Double".equals(name)) {Double object = Double.parseDouble(finalValue);method.invoke(bean, key, object);}else if("java.lang.Float".equals(name)) {Float object = Float.parseFloat(finalValue);method.invoke(bean, key, object);}else{Object object = JSON.parseObject(finalValue, aClass);method.invoke(bean, key, object);}execute = true;break;}}if (!execute) {logger.error("执行延时任务失败,60秒钟后重试");mqUtil.sendMessage(expiredKey, finalValue, 60L, TimeUnit.SECONDS);}} catch (Exception e) {logger.error("执行延时任务失败,60秒钟后重试", e);mqUtil.sendMessage(expiredKey, finalValue, 60L, TimeUnit.SECONDS);}if (execute) {mqUtil.releaseMessage(expiredKey);}redisCache.unlock(expiredKey, lockValue);});}@PreDestroypublic void destroy() {shutdownAsyncManager();}/*** 停止异步执行任务*/private void shutdownAsyncManager() {try {logger.info("====关闭延时任务线程池====");executor.shutdown();} catch (Exception e) {logger.error(e.getMessage(), e);}}
}

               对上面的代码进行简单说明,通过获取到的key,解析得到topic和tag,然后通过第五步的工厂获得topic对应的class,通过反射去调用对应的method。之所以使用了线城池是不希望流量在这里卡住,而是只作为一个任务的分发。

第七步:延时队列工具

@Configuration
public class MqUtil {@Autowiredprivate RedisTemplate<String, String> redisTemplate;private static final String SUFFIX = "_temp";public void sendMessage(String topic, String tag, String key, String value, Long timeout, TimeUnit timeUnit){String realKey = "mq"+":"+topic +":"+ tag + ":"+  key;String realKey2 = "mq"+":"+topic +":"+ tag + ":"+  key + SUFFIX;redisTemplate.opsForValue().set(realKey,value,timeout,timeUnit);redisTemplate.opsForValue().set(realKey2,value);}public void sendMessage(String messageKey,String value, Long timeout, TimeUnit timeUnit){String realKey = messageKey;String realKey2 = messageKey + SUFFIX;redisTemplate.opsForValue().set(realKey,value,timeout,timeUnit);redisTemplate.opsForValue().set(realKey2,value);}public String getMessage(String messageKey){String realKey = messageKey + SUFFIX;return redisTemplate.opsForValue().get(realKey);}public void releaseMessage(String messageKey){redisTemplate.delete(messageKey);String realKey = messageKey + SUFFIX;redisTemplate.delete(realKey);}
}

第八步:测试

        编写一个启动方法来测试。代码如下:

@Component
public class CommonRunner implements ApplicationRunner {@Autowiredprivate MqUtil mqUtil;@Overridepublic void run(ApplicationArguments args) throws Exception {mqUtil.sendMessage("test","execute","key1","hello",2000L, TimeUnit.MILLISECONDS);mqUtil.sendMessage("test","execute2","key2","2",3000L, TimeUnit.MILLISECONDS);mqUtil.sendMessage("test2","execute","key3", JSON.toJSONString(new User("张三","20")),4000L, TimeUnit.MILLISECONDS);mqUtil.sendMessage("test2","execute2","key4","3.1415926",5000L, TimeUnit.MILLISECONDS);}
}

               其余的代码大家请去仓库下载:redis-mq-demo: 使用redis实现延时消息的功能

更多推荐

使用redis实现延时消息发送功能

本文发布于:2023-11-16 23:35:24,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1634394.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:消息   功能   redis

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!