一文总结线程池ThreadPoolExecutor

编程入门 行业动态 更新时间:2024-10-09 19:13:18

一文总结<a href=https://www.elefans.com/category/jswz/34/1771240.html style=线程池ThreadPoolExecutor"/>

一文总结线程池ThreadPoolExecutor

文章目录

  • 1. 概述
  • 2. 线程池核心设计与实现

参考文献:

  1. Java线程池实现原理及其在美团业务中的实践
  2. java并发编程的艺术

1. 概述

好处
使用线程池可以带来一系列好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

2. 线程池核心设计与实现

Java中的线程池核心实现类是ThreadPoolExecutor,我们首先来看一下ThreadPoolExecutor的UML类图:

ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分
ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

  1. Java线程池实现原理及其在美团业务中的实践
  2. 线程池addWorker中的类似goto的retry
  3. 关于源码解析:彻底理解Java线程池原理篇
  4. 线程池的实现原理以及使用流程
  5. 三种队列和4种拒绝策略和4中线程池工厂:
    (1)有界、无界队列对ThreadPoolExcutor执行的影响
    (2)一次Java线程池误用引发的血案和总结
  6. 线程池中任务队列SynchronousQueue的理解

说明:使用SynchronousQueue队列通常配合长度为无界队列(长度为最大值),来一个线程创建一个线程。

Alibaba命名规范的解释

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
2) CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

附:线程池创建通用工具:

/*** Utility to create a {@link ThreadPoolExecutor}.** @param corePoolSize - min threads in the pool, even if idle* @param maxPoolSize - max threads in the pool* @param keepAliveTimeSecs - max seconds beyond which excess idle threads*        will be terminated* @param queue - the queue to use for holding tasks before they are executed.* @param threadNamePrefix - name prefix for the pool threads* @param runRejectedExec - when true, rejected tasks from*        ThreadPoolExecutor are run in the context of calling thread* @return ThreadPoolExecutor*/public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,int maxPoolSize, long keepAliveTimeSecs, BlockingQueue<Runnable> queue,String threadNamePrefix, boolean runRejectedExec) {Preconditions.checkArgument(corePoolSize > 0);ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,queue, new Daemon.DaemonFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread t = super.newThread(r);t.setName(threadNamePrefix + threadIndex.getAndIncrement());return t;}});if (runRejectedExec) {threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy() {@Overridepublic void rejectedExecution(Runnable runnable,ThreadPoolExecutor e) {LOG.info(threadNamePrefix + " task is rejected by " +"ThreadPoolExecutor. Executing it in current thread.");// will run in the current threadsuper.rejectedExecution(runnable, e);}});}return threadPoolExecutor;}

附:线程池测试用例:

public class ThreadPoolExecutorTest implements Runnable{public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,int maxPoolSize, long keepAliveTimeSecs, BlockingQueue<Runnable> queue,String threadNamePrefix, boolean runRejectedExec) {Preconditions.checkArgument(corePoolSize > 0);ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,queue, new Daemon.DaemonFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread t = super.newThread(r);t.setName(threadNamePrefix + threadIndex.getAndIncrement());return t;}});if (runRejectedExec) {threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy() {@Overridepublic void rejectedExecution(Runnable runnable,ThreadPoolExecutor e) {super.rejectedExecution(runnable, e);}});}return threadPoolExecutor;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName());Thread.sleep(100); } catch (InterruptedException e) {e.printStackTrace(); } throw new RuntimeException();}public static void main(String[] args) {LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(5);
//     BlockingQueue<Runnable> queue = new SynchronousQueue();ThreadPoolExecutor threadPool = getThreadPoolExecutor(2,10, 60, queue,"Thread-", false);threadPool.allowCoreThreadTimeOut(true);for (int i = 0; i < 16 ; i++) {threadPool.submit(new Thread(new ThreadPoolExecutorTest()));System.out.println("线程池中活跃的线程数: " + threadPool.getPoolSize()); }threadPool.shutdown();try {//等待直到所有任务完成threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);} catch (InterruptedException e) {e.printStackTrace();}}}
//结果
线程池中活跃的线程数: 1
Thread-0
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
Thread-1
线程池中活跃的线程数: 3
Thread-2
线程池中活跃的线程数: 4
Thread-3
线程池中活跃的线程数: 5
Thread-4
线程池中活跃的线程数: 6
Thread-5
线程池中活跃的线程数: 7
Thread-6
线程池中活跃的线程数: 8
Thread-7
线程池中活跃的线程数: 9
Thread-8
线程池中活跃的线程数: 10
Thread-9
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6576fe71 rejected from java.util.concurrent.ThreadPoolExecutor@76fb509a[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)at threads.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:67)Process finished with exit code 1

更多推荐

一文总结线程池ThreadPoolExecutor

本文发布于:2024-03-05 12:40:14,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1712322.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:线程   一文   ThreadPoolExecutor

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!