在上一篇文章中,我们学习了Kotlin协程的基本概念。如果你对Kotlin协程仍没有概念,请先阅读它。
上次我仅解释说挂起函数允许我们挂起和等待直到方法恢复。我们没有深入,因为细节很多,值得单独开一篇帖子。所以今天,我们详细介绍挂起函数的用法。
目录
- 什么是挂起函数?
- 在挂起函数中调用阻塞方法
- 回调和挂起的可取消协程(SuspendCancellableCoroutine)
(1) resume(value: T)
(2) resumeWithException(exception: Throwable)
(3) cancellableContinuation.cancel() - 在挂起函数里调用RxJava
1. 什么是挂起函数?
我们可以将挂起函数当作可以暂停并在任务结束后恢复的常规方法,这意味着我们可以开启一个耗时任务然后等待它完成。这就是为什么我们可以用串行的方式来写协程,而无须回调或者RxJava。
挂起函数只能在协程中调用。挂起函数跟普通函数的使用一样,只是它会挂起当前协程的执行。例如,delay()是一个内建的挂起函数。感谢Android Studio的贴心提醒,我们可以从左侧栏的箭头icon知道delay()是一个挂起函数。当我们在协程里调用delay(1_000)
的时候,它会中断1s执行,不会阻塞线程,然后在1s后回到协程继续执行doSomething()
方法。
挂起函数咋定义?suspend
来帮忙。只需在常规方法前加上suspend
,阻塞线程的繁重任务就能变成非阻塞方法吗?答案是大大的NO。虽然官方文档提到“通过调用其他挂起函数,它会挂起代码的执行而不会阻塞当前线程。”,但我们仍需考虑挂起函数运行的Dispatchers
(调度器)。
如果你只是在普通方法前加上supend
,IDE会警告“冗余的supend
修饰符。
// IDE warning: "Redundant 'suspend' modifier".
private suspend fun doSomething() {
// do some heavy tasks
}
最简单且正确的方式是用withContext()
包裹任务,并指定恰当的dispatchers(调度器)。例如,如果繁重任务是计算密集的,那我们应该将它包裹在withContext(Dispatchers.default)
里。请见上一篇帖子
private suspend fun doSomething() {
withContext(Dispatchers.Default) {
// do some heavy tasks
}
}
2. 在挂起函数里调用阻塞方法
将耗时任务放进挂起函数是个好主意。例如,通过网络任务获取用户数据然后更新UI是一件常事。最大的问题是网络请求这类繁重任务会阻塞主线程。为了避免ANR,我们将该任务放到后台线程,接下来一件烦人的事是不能在后台线程更新UI,于是我们使用Activity.runOnUiThread(Runnable)甚至Handler
来实现这一点。
Umm..对Android开发者而言,维护大量这样的任务并非易事。幸运的是,Kotlin协程来了。
MainScope().launch {
val user = fetchUser() // Waits for fetching user in I/O thread.
updateUser(user) // Updates UI in main thread.
}
private suspend fun fetchUser(): User = withContext(Dispatchers.IO) {
fetchUserFromServerBlocking()
}
private fun fetchUserFromServerBlocking(): User {
// Do I/O execution.
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
class User
这些代码片段在数据拉取后更新UI。更重要的是,网络任务不会阻塞主线程,它在工作线程中执行,因为我们用withContext(Dispatchers.IO)
切了线程。
3. 回调和挂起的可取消协程(SuspendCancellableCoroutine)
假定我们有一个线上的Android项目。我们使用了大量异步任务读取数据库或者从服务器拉取数据。使用回调是在主线程处理数据的一个可能的方法。现在,怎么把回调任务转为协程呢?suspendCancellableCoroutine
来了。
SuspendCancellableCoroutine
返回一个CancellableContinuation对象供我们resume、resumeWithException,以及在协程取消时抛出CancellationException异常。(有一个类似的方法叫suspendCoroutine
,它俩的区别是suspendCoroutine不能被Job.cancel()
取消)
CancellableContinuation
我们可以在suspendCancellableCoroutine
里执行一个代码块,它具有一个CancellableContinuation
参数。CancellableContinuation
有3种用法:
(1) resume(value: T):
恢复相应协程的执行,传递 [value]作为挂起点的返回值。
MainScope().launch {
try {
val user = fetchUser()
updateUser(user)
} catch (exception: Exception) {
// Use try-catch or CoroutinesExceptionHandler to handle exceptions.
}
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
// Invokes this line since the callback onSuccess() is invoked.
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
cancellableContinuation.resumeWithException(exception)
}
})
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
// Invokes onSuccess() with user data.
callback.onSuccess(User())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
(2) resumeWithException(exception: Throwable)
恢复相应协程的执行,以便[exeption]在上一个挂起点后重新抛出。
MainScope().launch {
try {
val user = fetchUser()
updateUser(user)
} catch (exception: Exception) {
// Use try-catch or CoroutinesExceptionHandler to handle exceptions.
Log.d("demo", "$exception") // Prints "java.io.IOException".
}
// If we handle exception in try-catch, we can still do something after it.
doSomething()
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
// Invokes this line since the callback onFailure() is invoked.
cancellableContinuation.resumeWithException(exception)
}
})
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
// Invokes onFailure() callback with "IOException()".
callback.onFailure(IOException())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
在上面的示例代码中,当我们调用CancellableContinuation.resumeWithException(user)时,fetchUser()就会抛出[exception]异常。
updateUser(user)不会被调用,而try-catch将会处理该异常。try-catch后面的代码块将会继续执行。
(3) cancellableContinuation.cancel()
虽然Kotlin没有受检异常,我们仍需要在try-catch中处理所有的异常。否则,应用将会崩溃。但仍有一个特殊异常我想在这里分享,那就是CancellationException,它会在我们调用cancellableContinuation.cancel()时抛出。
MainScope().launch {
try {
val user = fetchUser()
updateUser(user)
} catch (exception: Exception) {
// Handles exceptions here.
// Prints "java.util.concurrent.CancellationException: Continuation
// CancellableContinuation(DispatchedContinuation[Main, Continuation at
// com.mutant.coroutinestest.MainActivity$onCreate$1.invokeSuspend
// (MainActivity.kt:22)@5af0f84]){Active}@65c036d was cancelled normally".
Log.d("demo", "$exception")
}
// If we handle exception in try-catch, we can still do something after it.
doSomething()
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
cancellableContinuation.resumeWithException(exception)
}
})
// We call "contiuation.cancel()" to cancel this suspend function.
cancellableContinuation.cancel()
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
callback.onSuccess(User())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
即使我们不处理CancellationException,它也不会导致崩溃。更多信息,请参考此文。但它随后的代码不会被执行。
MainScope().launch {
val user = fetchUser()
updateUser(user)
// If we dont't handle CancellationException, this job would be cancelled.
canNOTBeExecuted()
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
cancellableContinuation.resumeWithException(exception)
}
})
// We call "contiuation.cancel()" to cancel this suspend function.
cancellableContinuation.cancel()
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
callback.onSuccess(User())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
在挂起函数中调用RxJava
如果我们的项目中用了RxJava怎么办?有一个库叫kotlinx-coroutines-rx2,它可以将RxJava转化为协程。用下列代码将它导入:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.2"
下列是所有的协程构建器:
例如,如果我们用了RaJava的Single,那么Single.await()帮我们将RxJava转为suspendCancellableCoroutine。
正如上面代码展示的,await()拓展函数将成功的情况传递给cancellableContinuation.resume(),而将失败的情况传递给cancellableContinuation.resumeWithException()。
让我们实现我们的示例代码:
MainScope().launch {
CoroutineScope(Dispatchers.Main).launch {
val user = fetchUserFromServer().await()
updateUser(user)
}
}
private fun fetchUserFromServer(): Single<User> =
Single.create<User> {
Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
Thread.sleep(3_000)
it.onSuccess(User())
Log.d("demo", "(2) fetchUserFromServer onSuccess, ${Thread.currentThread()}")
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
private fun updateUser(user: User) {
Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}
class User
日志将是:
D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onSuccess, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (3) updateUser, Thread[main,5,main]
fetchUserFromServer().await()代码挂起协程的执行,一直等待,直到RxJava返回结果。
如果RxJava的Single失败了,并且返回了一个异常怎么办呢?
oroutineScope(Dispatchers.Main).launch {
try {
val user = fetchUserFromServer().await()
updateUser(user)
} catch (e: Exception) {
Log.d("demo", "(4) {$e}, ${Thread.currentThread()}")
}
}
private fun fetchUserFromServer(): Single<User> =
Single.create<User> {
Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
Thread.sleep(3_000)
it.onError(IOException())
Log.d("demo", "(2) fetchUserFromServer onError, ${Thread.currentThread()}")
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
private fun updateUser(user: User) {
Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}
class User
那么异常将在try-catch中处理。日志如下:
D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onError, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (4) {java.io.IOException}, Thread[main,5,main]
对于RxJava的* Maybe, Observable*,都有相应的拓展函数供我们使用。尽管在你的代码中尝试它们。
这就是今天的全部。感谢阅读。我希望这些文章能帮你更加了解挂起函数,并有助于在你的项目中实现它。如果你有任何疑问或建议,欢迎留言。再见。👋👋