底层实现源码实战"/>
美团动态ThreadPoolExecutor底层实现源码实战
开篇:介绍springboot连接nacos实现动态线程池,同时得安装nacos,同时代码将有两个模块,dtp-spring-boot-starter 与 user 模块,前者将是独立的动态线程池,可以引入自己的项目中,后者模块主要用于测试与使用动态线程池模块。
依赖与工具 描述 springboot 2.3.9.RELEASE nacos-config-spring-boot-starter 0.2.10 nacos 2.1.1 注意springboot与nacos的适配版本!
一,搭建实现
1.创建两个模块并配置
user:测试模块 (夫模块)
Maven依赖:
<parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.3.9.RELEASE</version> <!-- <version>2.0.3.RELEASE</version>--></parent><groupId>org.example</groupId><artifactId>ThreadPool-demo</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>dtp-spring-boot-starter</module></modules><properties><mavenpiler.source>8</mavenpiler.source><mavenpiler.target>8</mavenpiler.target></properties>
在配置文件中写入连接nacos的配置代码
nacos:config:server-addr: 192.168.1.116:8848username: nacospassword: nacos#命名空间不要写错namespace: 6b5d4d2a-5385-4d9f-85a1-18b748b8256c
dtp-spring-boot-starter:动态线程池 (子模块)
Maven依赖:
<parent><artifactId>user</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>dtp-spring-boot-starter</artifactId><properties><mavenpiler.source>8</mavenpiler.source><mavenpiler.target>8</mavenpiler.target></properties><dependencies><dependency><groupId>com.alibaba.boot</groupId><artifactId>nacos-config-spring-boot-starter</artifactId><version>0.2.10</version></dependency> <!-- <dependency>--> <!-- <groupId>com.alibaba.boot</groupId>--> <!-- <artifactId>nacos-config-spring-boot-actuator</artifactId>--> <!-- <version>0.2.10</version>--> <!-- </dependency>--></dependencies>
2. 开始编写动态线程池配置 (dtp-spring-boot-starter模块)
创建动态线程池对象
package com.laoyang.dtp;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;/*** @author:Kevin* @create: 2023-10-24 17:13* @Description: 实现动态线程池对象*/public class DtpExecutor extends ThreadPoolExecutor {public DtpExecutor(int corePoolSize, int maximumPoolSize) {super(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));} }
创建动态线程池核心配置类
1.相关Bean的注入 2. nacos监听的bean注入
package com.laoyang.dtp;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment;/*** @author:Kevin* @create: 2023-10-24 18:36* @Description: 动态线程池核心配置类*/ @Configuration public class DtpExecutorAutoConfiguration {@Autowiredprivate Environment environment;//最大核心数private static final String CORE_POOL_SIZE = "dtp.core-pool-size";//最大线程数private static final String MAXIMUM_POOL_SIZE = "dtp.maximum-pool-size";//创建动态线程池对象@Beanpublic DtpExecutor executor(){Integer corePoolSize = Integer.valueOf(environment.getProperty(CORE_POOL_SIZE));Integer maximumPoolSize = Integer.valueOf(environment.getProperty(MAXIMUM_POOL_SIZE));return new DtpExecutor(corePoolSize,maximumPoolSize);}@Beanpublic NacosLinsenter NacosLinsenter(){return new NacosLinsenter();}}
然后通过springboot的自动配置实现将核心配置类注入
然后写入以下代码
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.laoyang.dtp.DtpExecutorAutoConfiguration
创建nacos的监听类并实现动态绑定
通过nacos的 Listener 接口实现相应的方法编写动态变换逻辑,同时实现spring提供的
InitializingBean接口将当前监听类通过nacos的ConfigService的addListener()方法与dataId一一绑定。(只要dataId的配置文件发生改变,当前绑定的监听类就会调用相应的方法),最终注入线程池对象Bean,将修改的配置文件值再注入进线程池对象Bean,就实现动态线程池。
getExecutor() 创建一个线程池供下面的调用 receiveConfigInfo() 每次当前的dataId只要改变,就会调用这个方法package com.laoyang.dtp;import com.alibaba.nacos.api.annotation.NacosInjected; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.YamlPropertiesFactoryBean; import org.springframework.core.io.ByteArrayResource;import java.util.Properties; import java.util.concurrent.Executor; import java.util.concurrent.Executors;/*** @author:Kevin* @create: 2023-10-24 19:56* @Description: nacos自带监听器*/public class NacosLinsenter implements Listener, InitializingBean {@NacosInjectedprivate ConfigService configService;@Autowiredprivate DtpExecutor executor;//nacos的dataId的名称private static final String DATA_ID = "dtp.yaml";private static final String GROUP = "DEFAULT_GROUP";//最大核心数private static final String CORE_POOL_SIZE = "dtp.core-pool-size";//最大线程数private static final String MAXIMUM_POOL_SIZE = "dtp.maximum-pool-size";//创建一个线程池供下面的调用@Overridepublic Executor getExecutor() {return Executors.newFixedThreadPool(1);}//每次当前的dataId只要改变,就会调用这个方法//但是调用这个方法的线程需要上面的方法创建一个线程池@Overridepublic void receiveConfigInfo(String s) {//首先需要将字符串yml格式转化为map的格式YamlPropertiesFactoryBean factoryBean = new YamlPropertiesFactoryBean();factoryBean.setResources(new ByteArrayResource(s.getBytes()));//使用springboot内置工具类转换为Properties类似于map的格式Properties object = factoryBean.getObject();//获取更新后的数据String corePoolSize = object.getProperty(CORE_POOL_SIZE);String maximumPoolSize = object.getProperty(MAXIMUM_POOL_SIZE);//直接更新数据executor.setCorePoolSize(Integer.parseInt(corePoolSize));executor.setMaximumPoolSize(Integer.parseInt(maximumPoolSize));}@Overridepublic void afterPropertiesSet() throws Exception {//将这个NacosLinsenter与当前的dataId一一绑定configService.addListener(DATA_ID,GROUP,this);} }
3. 开始在user模块使用 dtp-spring-boot-starter模块
创建启动Springboot配置类
package com.laoyang;import com.alibaba.nacos.spring.context.annotation.config.NacosPropertySource; import com.alibaba.nacos.spring.context.annotation.discovery.EnableNacosDiscovery; import com.laoyang.dtp.DtpExecutor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.core.env.Environment;/*** @author:Kevin* @create: 2023-10-24 17:04* @Description:*/ @SpringBootApplication @NacosPropertySource(dataId = "dtp.yaml", autoRefreshed = true) public class UserApplication {public static void main(String[] args) {SpringApplication.run(UserApplication.class, args);} }
创建Controller注入动态线程池对象来使用
package com.laoyang.Controller;import com.laoyang.dtp.DtpExecutor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping;import java.util.concurrent.ThreadPoolExecutor;/*** @author:Kevin* @create: 2023-10-24 17:05* @Description: 视图层*/ @Controller public class UserController {@Autowiredprivate DtpExecutor executor;@GetMappingpublic Integer test(){executor.execute(() -> dotest());return 1;}public void dotest(){System.out.println("dotest");}}
到此大功告成 !!
二,改进
上面的只能实现一个线程池对象,但是实际项目中并不只是这一个线程池对象,所以接下来我们需要进行优化!
更多推荐
美团动态ThreadPoolExecutor底层实现源码实战
发布评论