CompletableFuture实现异步并阻塞获取返回结果,巧用CompletableFuture返回值解决性能瓶颈,线程池,异步编排
参考: https://blog.csdn.net/LUOHUAPINGWIN/article/details/122222011
https://blog.csdn.net/sunquan291/article/details/103991184
配置:
gulimall.thread.coreSize=20
gulimall.thread.maxSize=200
gulimall.thread.keepAliveTime=10
读取配置:
package com.xunqi.gulimall.order.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @Description:
* @Created: with IntelliJ IDEA.
* @author: 夏沫止水
* @createTime: 2020-06-23 20:28
**/
@ConfigurationProperties(prefix = "gulimall.thread")
// @Component
@Data
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
注入线程池:
package com.xunqi.gulimall.order.config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: 线程池配置类
* @Created: with IntelliJ IDEA.
* @author: 夏沫止水
* @createTime: 2020-06-23 20:24
**/
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
使用:
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@Override
public List<WxUserInfo> getWxUserInfoByUid(String appid, List<Long> uidList) {
// 数据太多了.分片执行
List<List<Long>> uidListGroupList = CollectionUtil.split(uidList, 500);
List<CompletableFuture<List<WxUserInfo>>> futures = uidListGroupList.stream().map(list -> {
return CompletableFuture.supplyAsync(() -> {
RestResult<List<WxUserInfo>> wxUserInfoByAppIdUid = passportFeignService.getWxUserInfoByAppIdUid(appid, list, appName);
return wxUserInfoByAppIdUid.getData();
}, threadPoolExecutor);
}).collect(Collectors.toList());
// List<WxUserInfo> collect = futures.stream().map(p -> {
// try {
// return p.get();
// } catch (InterruptedException e) {
// e.printStackTrace();
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
// return null;
// }).filter(Objects::nonNull).flatMap(List::stream).collect(Collectors.toList());
List<WxUserInfo> biddingList = futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).flatMap(List::stream).collect(Collectors.toList());
return biddingList;
}