springBoot 多线程+线程池处理+等待获取执行结果Future
Java 线程池
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
优点
重用存在的线程,减少对象创建、消亡的开销,性能佳。
可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
提供定时执行、定期执行、单线程、并发数控制等功能。
在springboot项目中一般使用方法二。
一、方法一(CountDownLatch)
public class StatsDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
final static String startTime = sdf.format(new Date());
/**
* IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
* CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
* 混合型任务 = 视机器配置和复杂度自测而定
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors();
/**
* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
* TimeUnit unit,BlockingQueue<Runnable> workQueue)
* corePoolSize用于指定核心线程数量
* maximumPoolSize指定最大线程数
* keepAliveTime和TimeUnit指定线程空闲后的最大存活时间
* workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待
* 监控队列长度,确保队列有界
* 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。
* 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
* ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000));
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
//使用execute方法
executor.execute(new Stats("任务A", 1000, latch));
executor.execute(new Stats("任务B", 1000, latch));
executor.execute(new Stats("任务C", 1000, latch));
executor.execute(new Stats("任务D", 1000, latch));
executor.execute(new Stats("任务E", 1000, latch));
latch.await();// 等待所有人任务结束
System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));
}
static class Stats implements Runnable {
String statsName;
int runTime;
CountDownLatch latch;
public Stats(String statsName, int runTime, CountDownLatch latch) {
this.statsName = statsName;
this.runTime = runTime;
this.latch = latch;
}
public void run() {
try {
System.out.println(statsName+ " do stats begin at "+ startTime);
//模拟任务执行时间
Thread.sleep(runTime);
System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
latch.countDown();//单次任务结束,计数器减一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
二、方法二(Future)
重点是和springBoot整合,采用注解bean方式生成ThreadPoolTaskExecutor
在springBoot项目中开启异步线程需要满足一下几点
- 在启动类加入异步线程注解
@EnableAsync
- 创建线程池并创建
Bean
实例
1. 自定义线程池
@Configuration
//@EnableAsync
public class ThreadPoolConfig
{
/**
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
*/
/**
* 获得Java虚拟机可用的处理器个数 + 1
*/
private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;
@Value("${async.executor.thread.core_pool_size:0}")
public static int corePoolSizeConfig;
// 核心线程池大小
public static int corePoolSize = corePoolSizeConfig ==0 ? THREADS : corePoolSizeConfig;
// 最大可创建的线程数
//@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize = 2 * THREADS;;
// 队列最大长度
//@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity = 1024;
// 线程池维护线程所允许的空闲时间
//@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds = 300;
//线程池名前缀
//@Value("${async.executor.thread.threadNamePrefix}")
private static final String threadNamePrefix = "Async-Service-";
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务(无线程可用)的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}
2. 异步执行方法
启动类添加@EnableAsync注解
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
service层方法
@Service
public class AsyncInvokeService {
@Async("threadPoolTaskExecutor")
public Future<Boolean> exec1(String name) {
System.out.println("子线程 name -->" + Thread.currentThread().getName());
System.out.println(name);
Thread.sleep(10000);
return new AsyncResult<>(true);
}
@Async("threadPoolTaskExecutor")
public Future<Boolean> exec2(String phone) {
System.out.println("子线程 name -->" + Thread.currentThread().getName());
System.out.println(phone);
Thread.sleep(10000);
return new AsyncResult<>(true);
}
3. 多线程执行返回结果
@GetMapping("/gettest")
public String b() throws InterruptedException, ExecutionException {
Future<Boolean> future1 = asyncInvokeService.exec1("张三");
Future<Boolean> future2 = asyncInvokeService.exec2("15618881888");
List<Future<Boolean>> futureList = new ArrayList<>();
futureList.add(future1);
futureList.add(future2);
//查询任务执行的结果
for (Future<?> future : futureList) {
while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
if (future.isDone() && !future.isCancelled()) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
Boolean result = future.get();//获取结果
System.out.println("任务i=" + i + "获取完成!" + new Date());
list.add(result);
break;//当前future获取结果完毕,跳出while
} else {
Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
}
}
}
return "执行成功!!";
}
三、方法三(CompletableFuture)
示例:
1、线程池
@Configuration
//@EnableAsync
public class ThreadPoolConfig {
/**
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
*/
/**
* 获得Java虚拟机可用的处理器个数 + 1
*/
private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;
// @Value("${async.executor.thread.core_pool_size:0}") //可在配置文件配置核心线程数
public static int corePoolSizeConfig = 0;
// 核心线程池大小
public static int coreIoPoolSize = (corePoolSizeConfig == 0 ? THREADS : corePoolSizeConfig) * 4;
// 最大可创建的线程数
//@Value("${async.executor.thread.max_pool_size}")
private int maxIoPoolSize = 4 * 2 * THREADS;
// 核心线程池大小
public static int coreCpuPoolSize = corePoolSizeConfig == 0 ? THREADS : corePoolSizeConfig;
// 最大可创建的线程数
//@Value("${async.executor.thread.max_pool_size}")
private int maxCpuPoolSize = 2 * THREADS;
// 队列最大长度
//@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity = 1024;
// 线程池维护线程所允许的空闲时间
//@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds = 60;
//线程池名前缀
//@Value("${async.executor.thread.threadNamePrefix}")
private static final String threadNamePrefix = "Async-Service-";
/**
* IO 密集类型线程池 (corePoolSize 核心线程 和 maxPoolSize最大线程数比cpu核数翻4倍)
*
* @return
*/
@Bean(name = "threadIoPoolTaskExecutor")
public ThreadPoolTaskExecutor threadIoPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxIoPoolSize);
executor.setCorePoolSize(coreIoPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务(无线程可用)的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
/**
* cpu 密集类型线程池
*
* @return
*/
@Bean(name = "threadCpuPoolTaskExecutor")
public ThreadPoolTaskExecutor threadCpuPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxCpuPoolSize);
executor.setCorePoolSize(coreCpuPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务(无线程可用)的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}
2、调用
@Service
@Slf4j
public class TestServiceImpl implements TestService {
@Autowired
private Executor threadIoPoolTaskExecutor;
public void testCompletableFuture(){
//并行调用 提供执行效率
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> testAsync("a"), threadIoPoolTaskExecutor);
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> testAsync("b"), threadIoPoolTaskExecutor);
CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> testAsync("c"), threadIoPoolTaskExecutor);
try {
String resultA = a.get();
String resultB = b.get();
String resultC = c.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public String testAsync(String s){
//耗时操作
return s;
}
}
参考:
SpringBoot线程池ThreadPoolExecutor
SpringBoot线程池ThreadPoolTaskExecutor