1 资料集合
Fork/Join框架(一)引言
Fork/Join框架(二)创建一个Fork/Join池
Fork/Join框架(三)加入任务的结果
Fork/Join框架(四)异步运行任务
Fork/Join框架(五)在任务中抛出异常
Fork/Join框架(六)取消任务
全部摘自并发编程网,特别好的翻译和校对,可以看特别多的内容,感谢方腾飞,感谢并发编程网 希望你们越做越好。
2 为什么要用?——业务需求
有200万左右的历史用户需要赠送会员并根据用户行为给予30天或365天的期限。
因为有业务需求,所以无法直接走DB变更。
因为用户量并没有特别大,也没用zk或者其他分布式服务均分用户去做(服务器同时启动对DB压力会很大)。
最后准备用预发单节点去刷200万的用户。很适合用多线程去处理。
3 Fork/Join 框架
多线程经验并不丰富,但是知道用线程池来管理线程比较好,查询一些Java并发的类,很多都是集中在concurrent包中,Fork/Join框架也在其中(Duog Lea 大神威武)。
既然是一个框架,自然已经处理了线程池和待任务。使用过程主要依赖ForkJoinPool 和 ForkJoinTask的两个子类RecursiveTask和RecursiveAction;ForkJoinPool继承自AbstractExecutorService,ForkJoinTask实现了Future接口。
典型ForkJoinPool的使用:
// 声明pool
ForkJoinPool pool = new ForkJoinPool();
// 执行task
pool.execute(task);
// 观察执行中
do {
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
System.out.printf("******************************************\n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
// 关闭池
pool.shutdown();
// 等待关闭
try {
pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
典型ForkJoinTask的设计使用:
If (problem size < default size){
tasks=divide(task);
execute(tasks);
} else {
resolve problem using another algorithm;
}
ForkJoinTask简单实例(重写compute方法):
@Override
protected Integer compute() {
int result = 0;
if (end - start < 10) {
result = processLines(document, start, end, word);
} else {
int mid = (start + end) / 2;
DocumentTask task1 = new DocumentTask(document, start, mid, word);
DocumentTask task2 = new DocumentTask(document, mid, end, word);
invokeAll(task1, task2);
try {
result = groupResults(task1.get(), task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
4 结合业务场景使用 Fork/Join
Main类使用ForkJoinPool:
RefreshAgentToMemberTask task = new RefreshAgentToMemberTask(startId, endId, applicationContext);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(task);
do {
logger.info("Thread Count: {}", pool.getActiveThreadCount());
logger.info("Thread Steal: {}", pool.getStealCount());
logger.info("Parallelism: {}", pool.getParallelism());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
pool.shutdown();
if (task.isCompletedNormally()) {
logger.info("The process has completed normally.");
}
是不是很类似于之前的例子?其实几乎所有对ForkJoinPool使用方式都类似。
ForkJoinTask 类设计:
public class RefreshAgentToMemberTask extends RecursiveAction {
private static final long serialVersionUID = -7434939182574934204L;
private Logger logger = LoggerFactory.getLogger(RefreshAgentToMemberTask.class);
private long startId;
private long endId;
private ApplicationContext applicationContext; // 用于获取spring bean
private static final int THRESHOLD = 100;//10000; TODO test=100, prod=10000
private Map<String, Long> times = new HashMap<>();
RefreshAgentToMemberTask(long startId, long endId, ApplicationContext applicationContext) {
this.startId = startId;
this.endId = endId;
this.applicationContext = applicationContext;
}
@Override
protected void compute() {
if (endId - startId <= THRESHOLD) {
doAgentToMember();
} else {
long middleId = (startId + endId) / 2;
RefreshAgentToMemberTask task1 = new RefreshAgentToMemberTask(startId, middleId, applicationContext);
RefreshAgentToMemberTask task2 = new RefreshAgentToMemberTask(middleId, endId, applicationContext);
invokeAll(task1, task2);
}
}
/**
* 到达阀值以内的处理方法
*/
private void doAgentToMember() {
logger.info("task doAgentToMember range: startId={}, endId={}.", startId, endId);
long start = System.currentTimeMillis();
......
logger.info("startId={}, endId={}. total cost={}", startId, endId, (System.currentTimeMillis() - start));
}
}
5 测试结果
很遗憾,没有在线上环境使用,因为功能被大老板砍了。没想到技术没干成的事最后是老板砍了,估计产品也没想到。
不过在测试环境还是做了尝试,利用本机(Mac JVM 能拿到的CPU核数是4),数据量1.8万,两次执行遍历所有用时分别为330s,360s。全部遍历大概需要 38,888s ~ 10小时。挺好的,预发机器估计比我性能好,上班点一下,下班了就可以验证。
6 感想
业务有需要,加上一点点平时的积累,没准就能学到一些有用的新技术。为了技术点赞~。