java 并发执行批量异步任务(Future、 CompletableFuture 实现)

编程入门 行业动态 更新时间:2024-10-22 23:34:10

java 并发执行<a href=https://www.elefans.com/category/jswz/34/1770428.html style=批量异步任务(Future、 CompletableFuture 实现)"/>

java 并发执行批量异步任务(Future、 CompletableFuture 实现)

文章目录

  • 前言
  • 一、创建线程池
  • 二、Future 类并发实现
  • 三、CompletableFuture 类并发实现


前言

当我们需要批量执行一些比较耗时任务时,使用并发的方式减少业务处理的整体时间,防止客户端响应时间过长。


一、创建线程池

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ThreadPoolExecutor;/*** @ClassName : ThreadPoolConfig* @Description : ThreadPoolConfig* @Author : zhuguangkui* @Date: 2022-08-03*/
@Configuration
@Slf4j
public class ThreadPoolConfig {@AutowiredThreadPoolProperties threadPoolProperties;/*** 获得Java虚拟机可用的处理器个数 + 1*/private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;/***   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,*  当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;*  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝*/@Bean(name = "varHandleThreadPool")public ThreadPoolTaskExecutor varHandleThreadPool(){int corePoolSizeConfig = threadPoolProperties.getCorePoolSizeConfig();//核心线程数int corePoolSize = corePoolSizeConfig ==0 ? THREADS : corePoolSizeConfig;ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setMaxPoolSize(2 * corePoolSize);executor.setCorePoolSize(corePoolSize);executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());executor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());// 线程池对拒绝任务(无线程可用)的处理策略// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();log.info("doc 线程池初始化配置:{},THREADS:{}", threadPoolProperties, THREADS);return executor;}
}

二、Future 类并发实现

/*** 批量并发处理业务*/
@Override
public void generateBatchFile(List<String> fileNameList) {List<Future<IdocDoc>> futureList = new ArrayList<>(); // 并发处理结果集// 批量处理业务for (String fileName : fileNameList) {Future<IdocDoc> future = generateFile(fileName);futureList.add(future);}// 依次获取异步结果while (true) {for (Future<IdocDoc> future : futureList) {if (future.isDone() && !future.isCancelled()) { // 判断任务执行是否完成IdocDoc idocDoc = future.get(); // 获取异步结果idocDocList.add(idocDoc);futureList.remove(future);}}if (CollectionUtil.isEmpty()) {break;}Thread.sleep(1); // 每次轮询休息1毫秒,避免CPU占用}
}/*** 子业务*/
@Async("varHandleThreadPool")
public Future<IdocDoc> generateFile(String fileName) {IdocDoc idoDoc = new IdoDoc();idocDoc.setName(fileName);... // 业务操作// 返回异步结果return new AsyncResult<>(idocDoc);
}

三、CompletableFuture 类并发实现

/*** 批量并发处理业务*/
@Override
public void generateBatchFile(List<String> fileNameList) {List<CompletableFuture<IdocDoc>> futureList = new ArrayList<>(); // 并发处理结果集// 批量处理业务for (String fileName : fileNameList) {CompletableFuture<IdocDoc> future = CompletableFuture.supplyAsync(() -> {return generateFile(fileName);}, threadPoolTaskExecutor);futureList.add(future);}// 依次获取异步结果while (true) {for (CompletableFuture<IdocDoc> future : futureList) {if (future.isDone() && !future.isCancelled()) { // 判断任务执行是否完成IdocDoc idocDoc = future.get(); // 获取异步结果idocDocList.add(idocDoc);futureList.remove(future);}}if (CollectionUtil.isEmpty()) {break;}Thread.sleep(1); // 每次轮询休息1毫秒,避免CPU占用}
}/*** 子业务*/
public IdocDoc generateFile(String fileName) {IdocDoc idoDoc = new IdoDoc();idocDoc.setName(fileName);... // 业务操作// 返回异步结果return idocDoc;
}

更多推荐

java 并发执行批量异步任务(Future、 CompletableFuture 实现)

本文发布于:2023-12-08 03:02:44,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1672161.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:批量   java   CompletableFuture   Future

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!