admin管理员组文章数量:1566628
原理:
1.利用spring的schedule功能实现定时任务
2.利用redis的过期策略实现集群中定时任务的分配(单机版redis,集群的redis请考虑redission)
内容概述:
1.多线程调度定时任务
2.增加定时任务管理表(数据库或者redis中持久化)
3.基于反射机制实现动态调用不同的自定义定时任务
4.自动的根据定时任务管理表对定时任务进行增删改查
5.通过定时任务管理使定时任务只执行一次。
1.多线程调度定时任务
默认的schedule使用的是单线程,即多个定时任务需要排队执行,如果某些定时任务耗时过长,会导致其他任务排队过久,且不利于使用redis的过期策略实现分布式定时任务的分配,所以在集群情况下,定时任务需要使用多线程实现,建议:线程数大于同时可执行任务数
/**
* @author Bight Chen
* @Date: 2021/9/17 10:27
* 定时任务线程池
*/
@Configuration
public class ScheduleConfig {
@Bean
public TaskScheduler taskScheduler(){
//此bean对象支持根据cron表达式创建周期性任务
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
//定时任务执行线程池核心数
//线程数一定大于当前可执行任务数
taskScheduler.setPoolSize(50);
//此方法会使得任务一旦被取消将立即被移除
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("Schedule-");
return taskScheduler;
}
}
2.增加定时任务管理表(数据库或者redis中持久化)
1.需要的原因是利于自动增加或者删除所需定时任务
ps:
在mysql中做为数据表
@Data是基于lombok来简化编写 get() set() toString()等方法
@Data
public class ScheduleJob {
//id
private Long scheduleJobId;
//动态bean
private String beanName;
//方法
private String methodName;
//参数
private String jobParams;
//表达式
private String jobCron;
//任务名
private String jobName;
private String remark;
//0停止,1正常,2已完成
private String status;
private String createdBy;
private Date createdTime;
private String lastUpdatedBy;
}
3.基于反射机制实现动态调用不同的自定义定时任务
1.反射调用bean的工具类
/**
* @author Bight Chen
* @Date: 2021/9/17 11:54
*/
@SuppressWarnings("unchecked")
@Component
public class SpringToolsConfig implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringToolsConfig.applicationContext = applicationContext;
}
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
}
2.动态调用bean
/**
* @author Bight Chen
* @Date: 2021/9/17 11:40
*/
public class TaskRunable implements Runnable {
private static final Logger logger = LogManager.getLogger(TaskRunable.class);
private ScheduleJob scheduleJob;
public TaskRunable(ScheduleJob scheduleJob) {
this.scheduleJob = scheduleJob;
}
private RedisTemplate redisTemplate = (RedisTemplate) SpringToolsConfig.getBean("redisTemplate");
@Override
public void run() {
//周期性任务:加锁的过期时间要大于不同服务器时间之差,且小于同个任务2次执行间隔(cron)
//抢到锁的执行,没抢到锁的等待下一次任务执行
try {
Object target = SpringToolsConfig.getBean(scheduleJob.getBeanName());
Method method = null;
if (!StringUtils.isEmpty(scheduleJob.getJobParams())) {
method = target.getClass().getDeclaredMethod(scheduleJob.getMethodName(), String.class);
} else {
method = target.getClass().getDeclaredMethod(scheduleJob.getMethodName());
}
//考虑到集群部署时,旧版本的查找不到新增的定时任务时处理,就需要先找到bean再存redis
//保证原子性
if(!redisTemplate.opsForValue().setIfAbsent("schdeleJob:"+scheduleJob.getScheduleJobId().toString(), "1", 15000L,TimeUnit.MILLISECONDS)){
logger.info("{}正在执行!!,不能重复执行",scheduleJob.getJobName());
return;
}
logger.info("定时任务开始执行 - 参数:{}", scheduleJob.toString());
long startTime = System.currentTimeMillis();
try {
ReflectionUtils.makeAccessible(method);
if (!StringUtils.isEmpty(scheduleJob.getJobParams())) {
JSONObject jsonObject = JSONObject.parseObject(scheduleJob.getJobParams());
jsonObject.put("scheduleJobId", scheduleJob.getScheduleJobId());
method.invoke(target, JSONObject.toJSONString(jsonObject));
} else {
method.invoke(target);
}
} catch (Exception e) {
logger.error("定时任务执行异常 -参数:{} ,异常:{}", scheduleJob.toString(), e);
} finally {
Object oDelete = redisTemplate.opsForValue().get("schdeleJob:" + scheduleJob.getScheduleJobId());
if (oDelete != null) {
//锁依旧存在则自动删除
redisTemplate.delete("schdeleJob:" + scheduleJob.getScheduleJobId().toString());
}
}
long times = System.currentTimeMillis() - startTime;
logger.info("定时任务执行结束 -参数:{},耗时:{} 毫秒", scheduleJob.toString(), times);
} catch (Exception e) {
logger.error("定时任务执行异常 -参数:{} ,异常:{}", scheduleJob.toString(), e);
}
}
}
4.自动的根据定时任务管理表对定时任务进行增删改查
/**
* @author Bight Chen
* @Date: 2021/9/17 13:59
* 分布式定时任务初始化
*/
@Component
public class ScheduleInitConfig {
private static final Logger logger = LogManager.getLogger(ScheduleInitConfig.class);
//内存中保存定时任务数据
private HashMap<Long, ScheduledFuture> map = new HashMap<>();
@Autowired
private TaskScheduler autoTaskScheduler;
@Autowired
private ScheduleJobService scheduleJobService;
/**
* 定时自动查询增加任务数据,注入定时任务
* schedule.cron:0/15 * * * * ?
*/
@Scheduled(cron = "${schedule.cron}")
public void autoAddTask() {
Long time = System.currentTimeMillis();
try {
List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample());
for (ScheduleJob scheduleJob : list) {
if (map.get(scheduleJob.getScheduleJobId()) == null && "1".equals(scheduleJob.getStatus())) {
//存在 启动状态的 定时任务自动增加
TaskRunable taskRunable = new TaskRunable(scheduleJob);
ScheduledFuture future = autoTaskScheduler.schedule(taskRunable, new CronTrigger(scheduleJob.getJobCron()));
map.put(scheduleJob.getScheduleJobId(), future);
logger.info("autoAddTask,自动增加任务,参数:{}", scheduleJob.toString());
}
}
} catch (Exception e) {
logger.error("autoAddTask,error:{}", e);
}
//logger.info("autoAddTask,end:{}", System.currentTimeMillis() - time);
}
/**
* 定时自动查询任务数据,删除过期任务列表
* schedule.cron:0/15 * * * * ?
*/
@Scheduled(cron = "${schedule.cron}")
public void autoDeleteTask() {
Long time = System.currentTimeMillis();
try {
List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample());
for (ScheduleJob scheduleJob : list) {
if (map.get(scheduleJob.getScheduleJobId()) != null &&!"1".equals(scheduleJob.getStatus())) {
//非启动中的都删除
ScheduledFuture future = map.get(scheduleJob.getScheduleJobId());
future.cancel(true);
map.remove(scheduleJob.getScheduleJobId());
logger.info("autoDeleteTask,自动删除任务,参数:{}", scheduleJob.toString());
}
}
} catch (Exception e) {
logger.error("autoDeleteTask,error:{}", e);
}
//logger.info("autoDeleteTask,end:{}", System.currentTimeMillis() - time);
}
}
5.通过定时任务管理使定时任务只执行一次。
/**
* @author Bight Chen
* @Date: 2021/9/17 13:53
*/
@Component("testTask")
public class TestTask implements BaseTask {
private static final Logger logger = LogManager.getLogger(TestTask.class);
@Autowired
private ScheduleJobService scheduleJobService;
@Override
public void runTask() { }
@Override
public void runTask(String params) {
JSONObject jsonObject = JSON.parseObject(params);
Long scheduleJobId = Long.parseLong( jsonObject.get("scheduleJobId")+"");
try {
Thread.sleep(16000);
} catch (Exception e) {
logger.info("runTask>>>"+e.getMessage());
}finally {
if (scheduleJobId != null) {
//最后关闭当前任务,使任务执行一次
scheduleJobService.updateStatusById(scheduleJobId, "2");
}
}
logger.info(Thread.currentThread().getName() + ":" + params);
}
}
本文标签: 集群SpringBootschedule
版权声明:本文标题:springboot的schedule集群实现 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/dianzi/1725779859a1042139.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论