1 背景
积分中心首页嵌入了一个 做任务赚积分 页面,后台通过 任务中心管理,包含各种签到 浏览 寄件 查件任务,是否完成需要调度各个下游服务获取数据以及查询第三方API 然后再进行聚合,具有鲜明的I/O密集型(I/O Bound)特点。随着积分中心页面日活达到1500万(高峰时3000万)情况下,业务对系统吞吐量的要求也越来越高,使用同步加载方式的弊端逐渐显现,因此我们开始考虑将同步加载改为并行加载的可行性。
先复习一下Java IO模型 Netty理论一:Java IO与NIO 以及 NIO在netty中的应用 Netty理论三:Netty线程模型
2 同步调用(BIO模型)
2.1 串行同步
任务中心API服务需要独立查询多个业务模块,然后聚合查询结果一次性返回任务结果,是典型的I/O密集型(I/O Bound)服务。在同步调用的场景下,接口耗时长、性能差,接口响应时长T > T1+T2+T3+……+Tn。
2.2 并行同步
这时为了缩短接口的同步等待时间,一般会使用线程池来同时处理多个任务,优化前查询接口就是使用ExecutorService 线程池处理,
同步模型中使用线程池确实能实现异步调用的效果,也能压缩同步等待的时间,但由于以下三个原因,导致资源利用率比较低:
线程池中的线程都是阻塞的,硬件资源无法充分利用,系统吞吐量容易达到瓶颈
CPU资源大量浪费在阻塞等待上,导致CPU资源利用率低。在Java 8之前,一般会通过回调的方式来减少阻塞,但是大量使用回调,又引发臭名昭著的回调地狱问题,导致代码可读性和可维护性大大降低。
为了增加并发度,会引入更多额外的线程池,随着CPU调度线程数的增加,会导致更严重的资源争用,宝贵的CPU资源被损耗在上下文切换上,而且线程本身也会占用系统资源,且不能无限增加。
3 异步调用(NIO模型)
Java NIO的介绍见 Netty理论一:Java IO与NIO ,那我们在业务中怎么使用呢,从 文章的demo 中可以发现,自己动手将 BIO替换成NIO不现实,所幸我们已有现成的组件CompletableFuture可以直接使用。
4 CompletableFuture 简介
CompletableFuture是由Java 8引入的,是对Future的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,支持流式计算、函数式编程、完成通知、自定义异常处理,同时CompletableFuture实现了对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
CompletableFuture
的使用场景:创建异步任务、简单任务异步回调、多任务组合处理
4.1 CompletableFuture原理
CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。
这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。
- UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
- BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
4.2 CompletableFuture的设计思想
按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。由于回调种类多,但结构差异不大,所以这里单以一元依赖中的thenApply为例,不再枚举全部回调类型。如下图所示:
4.2.1 被观察者
- 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
- 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。
4.2.2 观察者
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
- 观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。
- 观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。
- 观察者Completion中的fn属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。
4.3 整体流程
4.3.1 一元依赖
这里仍然以thenApply为例来说明一元依赖的流程:
- 将观察者Completion注册到CF1,此时CF1将Completion压栈。
- 当CF1的操作运行完成时,会将结果赋值给CF1中的result属性。
- 依次弹栈,通知观察者尝试运行。
初步流程设计如上图所示,这里有几个关于注册与通知的并发问题,大家可以思考下:
Q1:在观察者注册之前,如果CF已经执行完成,并且已经发出通知,那么这时观察者由于错过了通知是不是将永远不会被触发呢 ? A1:不会。在注册时检查依赖的CF是否已经完成。如果未完成(即result == null)则将观察者入栈,如果已完成(result != null)则直接触发观察者操作。
Q2:在”入栈“前会有”result == null“的判断,这两个操作为非原子操作,CompletableFufure的实现也没有对两个操作进行加锁,完成时间在这两个操作之间,观察者仍然得不到通知,是不是仍然无法触发?
A2:不会。入栈之后再次检查CF是否完成,如果完成则触发。
Q3:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF完成的时候都会进行,那么会不会导致一个操作被多次执行呢 ?如下图所示,即当CF1、CF2同时完成时,如何避免CF3被多次触发。
A3:CompletableFuture的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者已经执行过了,那么CAS操作将会失败,取消执行。
通过对以上3个问题的分析可以看出,CompletableFuture在处理并行问题时,全程无加锁操作,极大地提高了程序的执行效率。我们将并行问题考虑纳入之后,可以得到完善的整体流程图如下所示:
CompletableFuture支持的回调方法十分丰富,但是正如上一章节的整体流程图所述,他们的整体流程是一致的。所有回调复用同一套流程架构,不同的回调监听通过策略模式实现差异化。
4.3.2 二元依赖
我们以thenCombine为例来说明二元依赖:
thenCombine操作表示依赖两个CompletableFuture。其观察者实现类为BiApply,如上图所示,BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF完成时都会尝试触发观察者BiApply,BiApply会检查两个依赖是否都完成,如果完成则开始执行。这里为了解决重复触发的问题,同样用的是上一章节提到的CAS操作,执行时会先通过CAS设置状态位,避免重复触发。