背景:
1.有个批量关单并且同步返回结果的需求
1.分析
需求
- 1.批量关单,且需要同步返回结果(成功,失败)
由于调用我方接口是一次传入很多任务,假设是N
,串行关单必然超时,因此这里需要并发执行 - 2.由于公司不允许随便定义线程池(即使比较小的池)
因此不能直接通过定义线程池容量和队列做并发控制,需要使用通用线程池+自定义线程控制规则 - 3.由于下游rpc给到我们这边的流量上限是有限的,假设是
X
,因此我们需要并发,但是又需要控制同时并发的数量
2.写代码
方案:通用线程池(实现并发)+CountDownLatch(实现阻塞主线程,并发结束返回结果)+Semaphore (实现并发数量的控制)
需要注意的一点是
等待队列容量>=2*Semaphore,不然会有线程因为拿不到线程池资源不处理直接失败的(原因参考线程池执行流程)
,当然我们可以将这部分压根没处理的也一并同步返回出去,让调用侧重试(我方不重试,防止超时问题)
public class test {
/**
* 线程池
* 等待队列容量>=2*Semaphore
*/
static ExecutorService threadPool = new ThreadPoolExecutor(
5,
5,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory()
, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws InterruptedException {
List<Integer> successList=new CopyOnWriteArrayList<>();
List<Integer> failList=new CopyOnWriteArrayList<>();
List<Integer> errorList=new CopyOnWriteArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(100);
Semaphore semaphore=new Semaphore(10);
for (int i = 0; i < 100; i++) {
try {
semaphore.acquire();
AtomicInteger self = new AtomicInteger(i);
threadPool.execute(() -> {
try {
if ((self.get() & 1) == 1) {
//奇数线程模拟失败
int temp = (self.get()/0);
}
Thread.sleep(5000);
successList.add(self.get());
System.out.println(self.get() + "执行成功");
}catch (Exception e){
System.out.println(self.get() + "执行失败");
failList.add(self.get());
}finally {
semaphore.release();
countDownLatch.countDown();
}
});
} catch (Exception e) {
errorList.add(i);
System.out.println("biz Exception");
semaphore.release();
countDownLatch.countDown();
}
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("成功的:"+successList.size());
successList.stream().sorted().forEach(item->{
System.out.print(item+" ");
});
System.out.println();
System.out.println("失败的:"+failList.size());
failList.stream().sorted().forEach(item->{
System.out.print(item+" ");
});
System.out.println();
System.out.println("错误的:"+errorList.size());
errorList.stream().sorted().forEach(item->{
System.out.print(item+" ");
});
System.out.println("全部执行结束");
threadPool.shutdown();
}
}