《Kotlin极简教程》正式上架:
点击这里 > 去京东商城购买阅读
点击这里 > 去天猫商城购买阅读
非常感谢您亲爱的读者,大家请多支持!!!有任何问题,欢迎随时与我交流~
9.8 挂起函数的组合执行
本节我们介绍挂起函数组合的各种方法。
9.8.1 按默认顺序执行
假设我们有两个在别处定义的挂起函数:
suspend fun doJob1(): Int {
println("Doing Job1 ...")
delay(1000L) // 此处模拟我们的工作代码
println("Job1 Done")
return 10
}
suspend fun doJob2(): Int {
println("Doing Job2 ...")
delay(1000L) // 此处模拟我们的工作代码
println("Job2 Done")
return 20
}
如果需要依次调用它们, 我们只需要使用正常的顺序调用, 因为协同中的代码 (就像在常规代码中一样) 是默认的顺序执行。下面的示例通过测量执行两个挂起函数所需的总时间来演示:
fun testSequential() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doJob1()
val two = doJob2()
println("[testSequential] 最终结果: ${one + two}")
}
println("[testSequential] Completed in $time ms")
}
执行上面的代码,我们将得到输出:
Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最终结果: 30
[testSequential] Completed in 6023 ms
可以看出,我们的代码是跟普通的代码一样顺序执行下去。
9.8.2 使用async异步并发执行
上面的例子中,如果在调用 doJob1 和 doJob2 之间没有时序上的依赖关系, 并且我们希望通过同时并发地执行这两个函数来更快地得到答案, 那该怎么办呢?这个时候,我们就可以使用async来实现异步。代码示例如下:
fun testAsync() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(CommonPool) { doJob1() }
val two = async(CommonPool) { doJob2() }
println("最终结果: ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
如果跟上面同步的代码一起执行对比,我们可以看到如下输出:
Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最终结果: 30
[testSequential] Completed in 6023 ms
Doing Job1 ...
Doing Job2 ...
Job1 Done
Job2 Done
[testAsync] 最终结果: 30
[testAsync] Completed in 3032 ms
我们可以看出,使用async函数,我们的两个Job并发的执行了,并发花的时间要比顺序的执行的要快将近两倍。因为,我们有两个任务在并发的执行。
从概念上讲, async跟launch类似, 它启动一个协程, 它与其他协程并发地执行。
不同之处在于, launch返回一个任务Job对象, 不带任何结果值;而async返回一个延迟任务对象Deferred,一种轻量级的非阻塞性future, 它表示后面会提供结果。
在上面的示例代码中,我们使用Deferred调用 await() 函数来获得其最终结果。另外,延迟任务Deferred也是Job类型, 它继承自Job,所以它也有isActive、isCompleted属性,也有join()、cancel()函数,因此我们也可以在需要时取消它。Deferred接口定义如下:
public interface Deferred<out T> : Job {
val isCompletedExceptionally: Boolean
val isCancelled: Boolean
public suspend fun await(): T
public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
public fun getCompleted(): T
@Deprecated(message = "Use `isActive`", replaceWith = ReplaceWith("isActive"))
public val isComputing: Boolean get() = isActive
}
其中,常用的属性和函数说明如下:
名称 | 说明 |
---|---|
isCompletedExceptionally | 当协程在计算过程中有异常failed 或被取消,返回true。 这也意味着isActive 等于 false ,同时 isCompleted 等于 true
|
isCancelled | 如果当前延迟任务被取消,返回true |
suspend fun await() | 等待此延迟任务完成,而不阻塞线程;如果延迟任务完成, 则返回结果值或引发相应的异常。 |
延迟任务对象Deferred的状态与对应的属性值如下表所示:
状态 | isActive | isCompleted | isCompletedExceptionally | isCancelled |
---|---|---|---|---|
New (可选初始状态) | false |
false |
false |
false |
Active (默认初始状态) | true |
false |
false |
false |
Resolved (最终状态) | false |
true |
false |
false |
Failed (最终状态) | false |
true |
true |
false |
Cancelled (最终状态) | false |
true |
true |
true |
9.9 协程上下文与调度器
到这里,我们已经看到了下面这些启动协程的方式:
launch(CommonPool) {...}
async(CommonPool) {...}
run(NonCancellable) {...}
这里的CommonPool 和 NonCancellable 是协程上下文(coroutine contexts)。本小节我们简单介绍一下自定义协程上下文。
9.9.1 调度和线程
协程上下文包括一个协程调度程序, 它可以指定由哪个线程来执行协程。调度器可以将协程的执行调度到一个线程池,限制在特定的线程中;也可以不作任何限制,让它无约束地运行。请看下面的示例:
fun testDispatchersAndThreads() = runBlocking {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) {
// 未作限制 -- 将会在 main thread 中执行
println("Unconfined: I'm working in thread ${Thread.currentThread()}")
}
jobs += launch(context) {
// 父协程的上下文 : runBlocking coroutine
println("context: I'm working in thread ${Thread.currentThread()}")
}
jobs += launch(CommonPool) {
// 调度指派给 ForkJoinPool.commonPool
println("CommonPool: I'm working in thread ${Thread.currentThread()}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) {
// 将会在这个协程自己的新线程中执行
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread()}")
}
jobs.forEach { it.join() }
}
运行上面的代码,我们将得到以下输出 (可能按不同的顺序):
Unconfined: I'm working in thread Thread[main,5,main]
CommonPool: I'm working in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
newSingleThreadContext: I'm working in thread Thread[MyOwnThread,5,main]
context: I'm working in thread Thread[main,5,main]
从上面的结果,我们可以看出:
使用无限制的Unconfined上下文的协程运行在主线程中;
继承了 runBlocking {...} 的context的协程继续在主线程中执行;
而CommonPool在ForkJoinPool.commonPool中;
我们使用newSingleThreadContext函数新建的协程上下文,该协程运行在自己的新线程Thread[MyOwnThread,5,main]中。
另外,我们还可以在使用 runBlocking的时候显式指定上下文, 同时使用 run 函数来更改协程的上下文:
fun log(msg: String) = println("${Thread.currentThread()} $msg")
fun testRunBlockingWithSpecifiedContext() = runBlocking {
log("$context")
log("${context[Job]}")
log("开始")
val ctx1 = newSingleThreadContext("线程A")
val ctx2 = newSingleThreadContext("线程B")
runBlocking(ctx1) {
log("Started in Context1")
run(ctx2) {
log("Working in Context2")
}
log("Back to Context1")
}
log("结束")
}
运行输出:
Thread[main,5,main] [BlockingCoroutine{Active}@b1bc7ed, EventLoopImpl@7cd84586]
Thread[main,5,main] BlockingCoroutine{Active}@b1bc7ed
Thread[main,5,main] 开始
Thread[线程A,5,main] Started in Context1
Thread[线程B,5,main] Working in Context2
Thread[线程A,5,main] Back to Context1
Thread[main,5,main] 结束
9.9.2 父子协程
当我们使用协程A的上下文启动另一个协程B时, B将成为A的子协程。当父协程A任务被取消时, B以及它的所有子协程都会被递归地取消。代码示例如下:
fun testChildrenCoroutine()= runBlocking<Unit> {
val request = launch(CommonPool) {
log("ContextA1: ${context}")
val job1 = launch(CommonPool) {
println("job1: 独立的协程上下文!")
delay(1000)
println("job1: 不会受到request.cancel()的影响")
}
// 继承父上下文:request的context
val job2 = launch(context) {
log("ContextA2: ${context}")
println("job2: 是request coroutine的子协程")
delay(1000)
println("job2: 当request.cancel(),job2也会被取消")
}
job1.join()
job2.join()
}
delay(500)
request.cancel()
delay(1000)
println("main: Who has survived request cancellation?")
}
运行输出: