(译)Android中的Kotlin协程-挂起函数

原文

挂起函数原理

在上一篇文章中,我们学习了Kotlin协程的基本概念。如果你对Kotlin协程仍没有概念,请先阅读它。

上次我仅解释说挂起函数允许我们挂起和等待直到方法恢复。我们没有深入,因为细节很多,值得单独开一篇帖子。所以今天,我们详细介绍挂起函数的用法。

目录

  1. 什么是挂起函数?
  2. 在挂起函数中调用阻塞方法
  3. 回调和挂起的可取消协程(SuspendCancellableCoroutine)
    (1) resume(value: T)
    (2) resumeWithException(exception: Throwable)
    (3) cancellableContinuation.cancel()
  4. 在挂起函数里调用RxJava

1. 什么是挂起函数?

我们可以将挂起函数当作可以暂停并在任务结束后恢复的常规方法,这意味着我们可以开启一个耗时任务然后等待它完成。这就是为什么我们可以用串行的方式来写协程,而无须回调或者RxJava。


什么是挂起函数.png

挂起函数只能在协程中调用。挂起函数跟普通函数的使用一样,只是它会挂起当前协程的执行。例如,delay()是一个内建的挂起函数。感谢Android Studio的贴心提醒,我们可以从左侧栏的箭头icon知道delay()是一个挂起函数。当我们在协程里调用delay(1_000)的时候,它会中断1s执行,不会阻塞线程,然后在1s后回到协程继续执行doSomething()方法。

挂起函数咋定义?suspend来帮忙。只需在常规方法前加上suspend,阻塞线程的繁重任务就能变成非阻塞方法吗?答案是大大的NO。虽然官方文档提到“通过调用其他挂起函数,它会挂起代码的执行而不会阻塞当前线程。”,但我们仍需考虑挂起函数运行的Dispatchers(调度器)。

如果你只是在普通方法前加上supend,IDE会警告“冗余的supend修饰符。

// IDE warning: "Redundant 'suspend' modifier".
private suspend fun doSomething() {
    // do some heavy tasks
}

最简单且正确的方式是用withContext()包裹任务,并指定恰当的dispatchers(调度器)。例如,如果繁重任务是计算密集的,那我们应该将它包裹在withContext(Dispatchers.default)里。请见上一篇帖子

private suspend fun doSomething() {
    withContext(Dispatchers.Default) {
        // do some heavy tasks
    }
}

2. 在挂起函数里调用阻塞方法

将耗时任务放进挂起函数是个好主意。例如,通过网络任务获取用户数据然后更新UI是一件常事。最大的问题是网络请求这类繁重任务会阻塞主线程。为了避免ANR,我们将该任务放到后台线程,接下来一件烦人的事是不能在后台线程更新UI,于是我们使用Activity.runOnUiThread(Runnable)甚至Handler来实现这一点。
Umm..对Android开发者而言,维护大量这样的任务并非易事。幸运的是,Kotlin协程来了。

MainScope().launch {
  val user = fetchUser() // Waits for fetching user in I/O thread.
  updateUser(user) // Updates UI in main thread.
}

private suspend fun fetchUser(): User = withContext(Dispatchers.IO) {
  fetchUserFromServerBlocking()
}

private fun fetchUserFromServerBlocking(): User {
  // Do I/O execution.
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

class User

这些代码片段在数据拉取后更新UI。更重要的是,网络任务不会阻塞主线程,它在工作线程中执行,因为我们用withContext(Dispatchers.IO)切了线程。

3. 回调和挂起的可取消协程(SuspendCancellableCoroutine)

假定我们有一个线上的Android项目。我们使用了大量异步任务读取数据库或者从服务器拉取数据。使用回调是在主线程处理数据的一个可能的方法。现在,怎么把回调任务转为协程呢?suspendCancellableCoroutine来了。

SuspendCancellableCoroutine返回一个CancellableContinuation对象供我们resumeresumeWithException,以及在协程取消时抛出CancellationException异常。(有一个类似的方法叫suspendCoroutine,它俩的区别是suspendCoroutine不能被Job.cancel()取消)

CancellableContinuation

我们可以在suspendCancellableCoroutine里执行一个代码块,它具有一个CancellableContinuation参数。CancellableContinuation有3种用法:

(1) resume(value: T):

恢复相应协程的执行,传递 [value]作为挂起点的返回值。

MainScope().launch {
  try {
    val user = fetchUser()
    updateUser(user)
  } catch (exception: Exception) {
    // Use try-catch or CoroutinesExceptionHandler to handle exceptions.
  }
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      // Invokes this line since the callback onSuccess() is invoked.
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      cancellableContinuation.resumeWithException(exception)
    }
  })
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    
    // Invokes onSuccess() with user data.
    callback.onSuccess(User())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

(2) resumeWithException(exception: Throwable)
恢复相应协程的执行,以便[exeption]在上一个挂起点后重新抛出。

MainScope().launch {
  try {
    val user = fetchUser()
    updateUser(user)
  } catch (exception: Exception) {
    // Use try-catch or CoroutinesExceptionHandler to handle exceptions.
    Log.d("demo", "$exception") // Prints "java.io.IOException".
  }
  
  // If we handle exception in try-catch, we can still do something after it.
  doSomething()
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      // Invokes this line since the callback onFailure() is invoked.
      cancellableContinuation.resumeWithException(exception)
    }
  })
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    
    // Invokes onFailure() callback with "IOException()".
    callback.onFailure(IOException())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

在上面的示例代码中,当我们调用CancellableContinuation.resumeWithException(user)时,fetchUser()就会抛出[exception]异常。
updateUser(user)不会被调用,而try-catch将会处理该异常。try-catch后面的代码块将会继续执行。
(3) cancellableContinuation.cancel()
虽然Kotlin没有受检异常,我们仍需要在try-catch中处理所有的异常。否则,应用将会崩溃。但仍有一个特殊异常我想在这里分享,那就是CancellationException,它会在我们调用cancellableContinuation.cancel()时抛出。

MainScope().launch {
  try {
    val user = fetchUser()
    updateUser(user)
  } catch (exception: Exception) {
    // Handles exceptions here.
    // Prints "java.util.concurrent.CancellationException: Continuation 
    // CancellableContinuation(DispatchedContinuation[Main, Continuation at 
    // com.mutant.coroutinestest.MainActivity$onCreate$1.invokeSuspend
    // (MainActivity.kt:22)@5af0f84]){Active}@65c036d was cancelled normally".
    Log.d("demo", "$exception")
  }
  
  // If we handle exception in try-catch, we can still do something after it.
  doSomething()
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      cancellableContinuation.resumeWithException(exception)
    }
  })
  
  // We call "contiuation.cancel()" to cancel this suspend function.
  cancellableContinuation.cancel()
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    callback.onSuccess(User())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

即使我们不处理CancellationException,它也不会导致崩溃。更多信息,请参考此文。但它随后的代码不会被执行。

MainScope().launch {
  val user = fetchUser()
  updateUser(user)
  
  // If we dont't handle CancellationException, this job would be cancelled.
  canNOTBeExecuted()
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      cancellableContinuation.resumeWithException(exception)
    }
  })
  
  // We call "contiuation.cancel()" to cancel this suspend function.
  cancellableContinuation.cancel()
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    callback.onSuccess(User())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

在挂起函数中调用RxJava

如果我们的项目中用了RxJava怎么办?有一个库叫kotlinx-coroutines-rx2,它可以将RxJava转化为协程。用下列代码将它导入:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.2"

下列是所有的协程构建器:

coroutine builders

例如,如果我们用了RaJava的Single,那么Single.await()帮我们将RxJava转为suspendCancellableCoroutine
RxJava transform.png

正如上面代码展示的,await()拓展函数将成功的情况传递给cancellableContinuation.resume(),而将失败的情况传递给cancellableContinuation.resumeWithException()
让我们实现我们的示例代码:

MainScope().launch {
  CoroutineScope(Dispatchers.Main).launch {
    val user = fetchUserFromServer().await()
    updateUser(user)
  }
}

private fun fetchUserFromServer(): Single<User> =
  Single.create<User> {
    Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
    Thread.sleep(3_000)
    it.onSuccess(User())
    Log.d("demo", "(2) fetchUserFromServer onSuccess, ${Thread.currentThread()}")
  }.subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())

private fun updateUser(user: User) {
  Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}

class User

日志将是:

D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onSuccess, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (3) updateUser, Thread[main,5,main]

fetchUserFromServer().await()代码挂起协程的执行,一直等待,直到RxJava返回结果。
如果RxJava的Single失败了,并且返回了一个异常怎么办呢?

oroutineScope(Dispatchers.Main).launch {
  try {
    val user = fetchUserFromServer().await()
    updateUser(user)
  } catch (e: Exception) {
    Log.d("demo", "(4) {$e}, ${Thread.currentThread()}")
  }
}

private fun fetchUserFromServer(): Single<User> =
  Single.create<User> {
    Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
    Thread.sleep(3_000)
    it.onError(IOException())
    Log.d("demo", "(2) fetchUserFromServer onError, ${Thread.currentThread()}")
  }.subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())

private fun updateUser(user: User) {
  Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}

class User

那么异常将在try-catch中处理。日志如下:

D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onError, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (4) {java.io.IOException}, Thread[main,5,main]

对于RxJava的* Maybe Observable*,都有相应的拓展函数供我们使用。尽管在你的代码中尝试它们。

RxJava suspending extensions.png

这就是今天的全部。感谢阅读。我希望这些文章能帮你更加了解挂起函数,并有助于在你的项目中实现它。如果你有任何疑问或建议,欢迎留言。再见。👋👋

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容