本文为 Jose Alcérreca 发布于 Medium 的文章译文
原文链接为 Migrating from LiveData to Kotlin’s Flow
本文仅作为个人学习记录所用。如有涉及侵权,请相关人士尽快联系译文作者。
LiveData 是在 2017 年被大家所开始使用,观察者模式有效简化了开发,但 RxJava 等选项在当时对于初学者来说太复杂了。 Android 架构组件团队创建了 LiveData:一个非常固执的可观察数据持有者类,专为 Android 设计。 它保持简单以使其易于上手,并且建议将 RxJava 用于更复杂的反应式流案例,利用两者之间的集成。
DeadData?
LiveData 仍然是我们为 Java 开发人员、初学者和简单情况提供的解决方案。 对于其余的,一个不错的选择是转向 Kotlin Flows。 Flows 仍然有一个陡峭的学习曲线,但它们是 Kotlin 语言的一部分,由 Jetbrains 提供支持; Compose 即将到来,它非常适合反应式模型。
我们一直在谈论使用 Flows 来连接应用程序的不同部分,除了视图和 ViewModel。 现在我们有了一种从 Android UI 收集流的更安全的方法,我们可以创建一个完整的迁移指南。
在这篇文章中,您将学习如何将 Flows 暴露给一个视图,如何收集它们,以及如何对其进行微调以满足特定需求。我们一直在谈论使用 Flows 来连接应用程序的不同部分,除了视图和 ViewModel。 现在我们有了一种从 Android UI 收集流的更安全的方法,我们可以创建一个完整的迁移指南。
在这篇文章中,您将学习如何将 Flows 暴露给一个视图,如何收集它们,以及如何对其进行微调以满足特定需求。我们一直在谈论使用 Flows 来连接应用程序的不同部分,除了视图和 ViewModel。 现在我们有了一种从 Android UI 收集流的更安全的方法,我们可以创建一个完整的迁移指南。
在这篇文章中,您将学习如何将 Flows 暴露给一个视图,如何收集它们,以及如何对其进行微调以满足特定需求。
Flow:简单的事情更难,复杂的事情更容易
LiveData 做了一件事并且做得很好:它在缓存最新值和了解 Android 的生命周期的同时公开数据。 后来我们了解到它也可以启动协程并创建复杂的转换,但这有点复杂。
让我们看看一些 LiveData 模式和它们的 Flow 等价物:
#1:使用可变数据持有者公开一次性操作的结果
这是经典模式,您可以使用协程的结果来改变状态持有者:
<!-- Copyright 2020 Google LLC.
SPDX-License-Identifier: Apache-2.0 -->
class MyViewModel {
private val _myUiState = MutableLiveData<Result<UiState>>(Result.Loading)
val myUiState: LiveData<Result<UiState>> = _myUiState
// Load data from a suspend fun and mutate state
init {
viewModelScope.launch {
val result = ...
_myUiState.value = result
}
}
}
为了对 Flows 做同样的事情,我们使用 (Mutable)StateFlow:
class MyViewModel {
private val _myUiState = MutableStateFlow<Result<UiState>>(Result.Loading)
val myUiState: StateFlow<Result<UiState>> = _myUiState
// Load data from a suspend fun and mutate state
init {
viewModelScope.launch {
val result = ...
_myUiState.value = result
}
}
}
StateFlow 是一种特殊的 SharedFlow(它是一种特殊类型的 Flow),最接近 LiveData:
它总是有价值的。
它只有一个值。
它支持多个观察者(因此流程是共享的)。
它总是 replays 订阅的最新值,与活跃观察者的数量无关。
向视图公开 UI 状态时,请使用 StateFlow。 它是一个安全高效的观察者,旨在保持 UI 状态。
#2:公开一次性操作的结果
这与前面的代码片段等效,公开了没有可变支持属性的协程调用的结果。
对于 LiveData,我们为此使用了 liveData 协程构建器:
class MyViewModel(...) : ViewModel() {
val result: LiveData<Result<UiState>> = liveData {
emit(Result.Loading)
emit(repository.fetchItem())
}
}
由于状态持有者总是有一个值,因此最好将我们的 UI 状态包装在某种支持 Loading、Success 和 Error 等状态的 Result 类中。
Flow 等效项涉及更多,因为您必须进行一些配置:
class MyViewModel(...) : ViewModel() {
val result: StateFlow<Result<UiState>> = flow {
emit(repository.fetchItem())
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000), // Or Lazily because it's a one-shot
initialValue = Result.Loading
)
}
stateIn 是将 Flow 转换为 StateFlow 的 Flow 运算符。 现在让我们相信这些参数,因为我们稍后需要更多的复杂性来正确解释它。
3:带参数的一次性数据加载
假设您想加载一些取决于用户 ID 的数据,并且您从暴露流的 AuthManager 获取此信息:
使用 LiveData,您将执行类似以下操作:
class MyViewModel(authManager..., repository...) : ViewModel() {
private val userId: LiveData<String?> =
authManager.observeUser().map { user -> user.id }.asLiveData()
val result: LiveData<Result<Item>> = userId.switchMap { newUserId ->
liveData { emit(repository.fetchItem(newUserId)) }
}
}
switchMap 是一个转换,它的主体被执行,并且当 userId 改变时,订阅的结果也随之改变。
如果 userId 没有理由成为 LiveData,那么更好的替代方法是将流与 Flow 结合起来,最后将公开的结果转换为 LiveData。
class MyViewModel(authManager..., repository...) : ViewModel() {
private val userId: Flow<UserId> = authManager.observeUser().map { user -> user.id }
val result: LiveData<Result<Item>> = userId.mapLatest { newUserId ->
repository.fetchItem(newUserId)
}.asLiveData()
}
使用 Flows 执行此操作看起来非常相似:
class MyViewModel(authManager..., repository...) : ViewModel() {
private val userId: Flow<UserId> = authManager.observeUser().map { user -> user.id }
val result: StateFlow<Result<Item>> = userId.mapLatest { newUserId ->
repository.fetchItem(newUserId)
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)
}
请注意,如果你需要更大的灵活性,您还可以使用 transformLatest 并显式 emit 项目:
val result = userId.transformLatest { newUserId ->
emit(Result.LoadingData)
emit(repository.fetchItem(newUserId))
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.LoadingUser // Note the different Loading states
)
4:观察带参数的数据流
现在让我们让这个例子更具反应性。 数据不是获取的,而是观察到的,因此我们将数据源中的更改自动传播到 UI。
继续我们的例子:我们没有在数据源上调用 fetchItem,而是使用一个假设的 observeItem 操作符,它返回一个 Flow。
使用 LiveData,您可以将 Flow 转换为 LiveData 并 emitSource 所有更新:
class MyViewModel(authManager..., repository...) : ViewModel() {
private val userId: LiveData<String?> =
authManager.observeUser().map { user -> user.id }.asLiveData()
val result = userId.switchMap { newUserId ->
repository.observeItem(newUserId).asLiveData()
}
}
或者,最好使用 flatMapLatest 组合两个流,并仅将输出转换为 LiveData:
class MyViewModel(authManager..., repository...) : ViewModel() {
private val userId: Flow<String?> =
authManager.observeUser().map { user -> user?.id }
val result: LiveData<Result<Item>> = userId.flatMapLatest { newUserId ->
repository.observeItem(newUserId)
}.asLiveData()
}
Flow 的实现类似,但没有 LiveData 转换:
class MyViewModel(authManager..., repository...) : ViewModel() {
private val userId: Flow<String?> =
authManager.observeUser().map { user -> user?.id }
val result: StateFlow<Result<Item>> = userId.flatMapLatest { newUserId ->
repository.observeItem(newUserId)
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.LoadingUser
)
}
每当用户更改或存储库中的用户数据更改时,公开的 StateFlow 都会收到更新。
#5 组合多个来源:MediatorLiveData -> Flow.combine
MediatorLiveData 可让您观察一个或多个更新源(LiveData 可观察对象)并在它们获得新数据时执行某些操作。 通常你更新 MediatorLiveData 的值:
val liveData1: LiveData<Int> = ...
val liveData2: LiveData<Int> = ...
val result = MediatorLiveData<Int>()
result.addSource(liveData1) { value ->
result.setValue(liveData1.value ?: 0 + (liveData2.value ?: 0))
}
result.addSource(liveData2) { value ->
result.setValue(liveData1.value ?: 0 + (liveData2.value ?: 0))
}
Flow 等价物更直接:
val flow1: Flow<Int> = ...
val flow2: Flow<Int> = ...
val result = combine(flow1, flow2) { a, b -> a + b }
您还可以使用 combineTransform 函数或 zip。
配置暴露的 StateFlow(stateIn 操作符)
我们之前使用 stateIn 将常规流转换为 StateFlow,但它需要一些配置。 如果你现在不想详细介绍,只需要复制粘贴,我推荐这种组合:
val result: StateFlow<Result<UiState>> = someFlow
.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)
但是,如果您不确定这个看似随机的 5 秒 started 参数,请继续阅读。
stateIn 有 3 个参数(来自文档):
@param scope the coroutine scope in which sharing is started.
@param started the strategy that controls when sharing is started and stopped.
@param initialValue the initial value of the state flow.
This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy with thereplayExpirationMillis
parameter.
started可以采用 3 个值:
- Lazily:在第一个订阅者出现时开始,在范围取消时停止。
- Eagerly:立即开始并在范围取消时停止
- WhileSubscribed:这很复杂。
对于一次性操作,您可以使用 Lazily 或 Eagerly。 但是,如果您正在观察其他流程,则应该使用 WhileSubscribed 来执行小而重要的优化,如下所述。
WhileSubscribed 策略
WhileSubscribed 在没有收集器时取消 upstream flow。 使用 stateIn 创建的 StateFlow 向 View 公开数据,但它也在观察来自其他层或应用程序(上游)的流。 保持这些流处于活动状态可能会导致资源浪费,例如,如果它们继续从其他来源(例如数据库连接、硬件传感器等)读取数据。**When your app goes to the background, you should be a good citizen and stop these coroutines.
WhileSubscribed 有两个参数:
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
)
停止超时
来至于它的文档:
stopTimeoutMillis 配置最后一个订阅者消失和上游流停止之间的延迟(以毫秒为单位)。 它默认为零(立即停止)。
这很有用,因为如果视图停止侦听几分之一秒,您不想取消上游流。 这一直发生。例如,当用户旋转设备并且视图被快速连续地破坏和重新创建时。
liveData 协程构建器中的解决方案是添加 5 秒的延迟,如果没有订阅者,协程将在此后停止。 WhileSubscribed(5000) 正是这样做的:
class MyViewModel(...) : ViewModel() {
val result = userId.mapLatest { newUserId ->
repository.observeItem(newUserId)
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)
}
这种方法检查所有框:
- 当用户将您的应用程序发送到后台时,来自其他层的更新将在 5 秒后停止,从而节省电量。
- 最新的值仍会被缓存,这样当用户回到它时,视图会立即有一些数据。
- 订阅重新启动,新值将出现,可用时刷新屏幕。
Replay expiration
如果您不希望用户在他们离开太久后看到陈旧数据并且你更喜欢显示加载屏幕,请查看 WhileSubscribed 中的 replayExpirationMillis 参数。 在这种情况下它非常方便,并且还节省了一些内存,因为缓存的值恢复到 stateIn 中定义的初始值。 返回应用程序不会那么快,但您不会显示旧数据。
replayExpirationMillis— configures a delay (in milliseconds) between the stopping of the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the shareIn operator and resets the cached value to the original initialValue for the stateIn operator). It defaults to Long.MAX_VALUE (keep replay cache forever, never reset buffer). Use zero value to expire the cache immediately.
从视图中观察 StateFlow
到目前为止,我们已经看到,让视图让 ViewModel 中的 StateFlows 知道它们不再监听是非常重要的。 然而,与生命周期相关的所有事情一样,事情并没有那么简单。
为了收集流,你需要一个协程。 活动和片段提供了一堆协程构建器:
- Activity.lifecycleScope.launch:立即启动协程,活动销毁时取消。
- Fragment.lifecycleScope.launch:立即启动协程,并在片段销毁时取消协程。
- Fragment.viewLifecycleOwner.lifecycleScope.launch:立即启动协程,并在片段的视图生命周期被销毁时取消协程。 如果您正在修改 UI,您应该使用视图生命周期。
LaunchWhenStarted、launchWhenResumed…
称为 launchWhenX 的特殊版本的 launch 将等到 lifecycleOwner 处于X 状态并在lifecycleOwner 低于X 状态时暂停协程。 重要的是要注意,在其生命周期所有者被销毁之前,它们不会取消协程。
在应用程序处于后台时接收更新可能会导致崩溃,这可以通过暂停视图中的集合来解决。 但是,当应用程序在后台时,上游流会保持活动状态,这可能会浪费资源。
这意味着到目前为止我们为配置 StateFlow 所做的一切都将毫无用处; 然而,这是一个新的 API。
Lifecycle.repeatOnLifecycle 来救援
这个新的协程构建器(可从生命周期运行时-ktx 2.4.0-alpha01 获得)正是我们所需要的:它在特定状态下启动协程,并在生命周期所有者低于它时停止它们。
例如,在一个 Fragment 中:
onCreateView(...) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
myViewModel.myUiState.collect { ... }
}
}
}
这将在 Fragment 的视图 STARTED 开始收集,将继续通过 RESUMED,并在返回到 STOPPED 时停止。可以读下这篇文章: A safer way to collect flows from Android UIs
将 repeatOnLifecycle API 与上面的 StateFlow 指南混合在一起,可以在充分利用设备资源的同时获得最佳性能。
Warning: The StateFlow support recently added to Data Binding uses
launchWhenCreated
to collect updates, and it will start using `repeatOnLifecycle``instead when it reaches stable.
For Data Binding, you should use Flows everywhere and simply add
asLiveData()
to expose them to the view. Data Binding will be updated whenlifecycle-runtime-ktx 2.4.0
goes stable.
总结:
从 ViewModel 公开数据并从视图收集数据的最佳方法是:
✔️ 使用 WhileSubscribed
策略公开 StateFlow
,并带有超时。
class MyViewModel(...) : ViewModel() {
val result = userId.mapLatest { newUserId ->
repository.observeItem(newUserId)
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)
}
✔️ 使用 repeatOnLifecycle
收集。
onCreateView(...) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
myViewModel.myUiState.collect { ... }
}
}
}
任何其他组合都会使上游 Flows 保持活动状态,从而浪费资源:
❌ 使用 WhileSubscribed
公开并在生命周期范围内收集。launch
/launchWhenX
❌ 使用 Lazily
/Eagerly
公开并使用 repeatOnLifecycle
收集
当然,如果你不需要 Flow 的全部功能……只需使用 LiveData。 :)
以下附带 Android 开发者官我那个对 Kolin 的 Flow 的介绍:
https://developer.android.com/kotlin/flow
在协程中,Flow 是一种可以顺序发出多个值的类型,而不是只返回一个值的挂起函数。例如,您可以使用流从数据库接收实时更新。
Flows 建立在协程之上,可以提供多个值。Flow 在概念上是可以异步计算的数据流。发出的值必须是相同的类型。例如, Flow<Int> 是一个发出整数值的流。
流与生成值序列的迭代器非常相似,但它使用挂起函数异步生成和消费值。这意味着,例如,Flow 可以安全地发出网络请求以生成下一个值,而不会阻塞主线程。
数据流涉及三个实体:
生产者产生添加到流中的数据。多亏了协程,流也可以异步产生数据。
(可选)中介可以修改发送到流中的每个值或流本身。
消费者使用流中的值。
在 Android 中,存储库通常是 UI 数据的生产者,其用户界面 (UI) 作为最终显示数据的使用者。 其他时候,UI 层是用户输入事件的生产者,而层次结构的其他层则使用它们。 生产者和消费者之间的层通常充当中间人,修改数据流以使其适应下一层的要求。
创建一个 Flow
要创建 flows,请使用 flow builder APIs。 Flow 构建器函数创建一个新 Flow,您可以在其中使用发射函数手动将新值 emit 到数据流中。
在以下示例中,数据源以固定时间间隔自动获取最新消息。 由于挂起函数不能返回多个连续值,因此数据源创建并返回一个 Flow 来满足此要求。 在这种情况下,数据源充当生产者。
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow builder 在协程中执行。 因此,它受益于相同的异步 API,但存在一些限制:
Flows 是连续的。 由于生产者在协程中,当调用挂起函数时,生产者挂起直到挂起函数返回。 在这个例子中,生产者挂起直到 fetchLatestNews 网络请求完成。 只有这样,结果才会发送到流中。
使用流构建器,生产者不能从不同的 CoroutineContext 发出值。 因此,不要通过创建新的协程或使用 withContext 代码块在不同的 CoroutineContext 中调用发射。 在这些情况下,您可以使用其他流构建器,例如 callbackFlow。
修改流
中介可以使用中间操作符来修改数据流而不消耗值。 这些运算符是函数,当应用于数据流时,会设置一系列操作,直到将来使用这些值时才会执行这些操作。 在 Flow reference documentation 中了解有关中间运算符的更多信息。
在下面的示例中,存储库层使用中间运算符 map 来转换要在视图上显示的数据:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
中间运算符可以一个接一个地应用,形成一个操作链,当一个项目被发送到 Flow 中时,这些操作链会延迟执行。 请注意,简单地将中间运算符应用于流并不会启动 Flow 集合。
从 Flow 中收集
使用终端运算符触发 Flow 以开始侦听值。 要获取流中发出的所有值,请使用 collect。
由于 collect 是一个挂起函数,它需要在协程中执行。 它接受一个 lambda 作为参数,在每个新值上调用该参数。 由于它是一个挂起函数,调用 collect 的协程可能会挂起,直到 Flow 关闭。
继续前面的示例,这里是一个使用存储库层数据的 ViewModel 的简单实现:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
收集 Flow 触发更新最新消息的生产者,并以固定的时间间隔发出网络请求的结果。由于生产者在 while(true)
循环中始终保持活动状态,因此当 ViewModel 被清除并取消 viewModelScope
时,数据流将关闭。
由于以下原因,Flow 收集可能会停止:
收集的协程被取消,如上例所示。这也阻止了底层生产者。
生产者完成发射项目。在这种情况下,数据流关闭,调用
collect
的协程恢复执行。
除非与其他中间操作符指定,否则 Flow 是冷的和惰性的。这意味着每次在流上调用终端操作符时都会执行生产者代码。在前面的示例中,拥有多个流收集器会导致数据源以不同的固定时间间隔多次获取最新消息。要在多个消费者同时收集时优化和共享流,请使用 shareIn 运算符。
捕获意外异常
生产者的实现可以来自第三方库。 这意味着它可以抛出意外的异常。 要处理这些异常,请使用 catch 中间运算符。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
在前面的示例中,当发生异常时,不会调用 collect
lambda,因为尚未收到新项目。
catch
还可以向流 emit
项目。 示例存储库层可以改为 emit
缓存值:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
在这个例子中,当一个异常发生时,collect
lambda 被调用,因为一个新的项目因为异常被发送到流中。
在不同的 CoroutineContext 中执行
默认情况下,Flow
构建器的生产者在从它收集的协程的 CoroutineContext
中执行,并且如前所述,它不能从不同的 CoroutineContext
发出值。 在某些情况下,这种行为可能是不可取的。 例如,在本主题中使用的示例中,存储库层不应在 viewModelScope
使用的 Dispatchers.Main
上执行操作。
要更改流的 CoroutineContext,请使用中间运算符 flowOn。 flowOn 改变了上游流的 CoroutineContext,这意味着生产者和任何在 flowOn 之前(或之上)应用的中间操作符。 下游流(flowOn 之后的中间运算符以及消费者)不受影响,并在用于从流中收集的 CoroutineContext 上执行。 如果有多个 flowOn 操作符,每个操作符都会改变其当前位置的上游。
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
使用此代码,onEach
和 map
操作符使用 defaultDispatcher
,而 catch
操作符和使用者在 viewModelScope
使用的 Dispatchers.Main
上执行。
由于数据源层正在进行 I/O 工作,因此您应该使用针对 I/O 操作进行优化的调度程序:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack 库中的流程
Flow 被集成到许多 Jetpack 库中,它在 Android 第三方库中很受欢迎。 Flow 非常适合实时数据更新和无休止的数据流。
您可以使用 Flow with Room 来通知数据库中的更改。 使用数据访问对象 data access objects (DAO) 时,返回 Flow
类型以获取实时更新。
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
每次示例表中发生更改时,都会发出一个包含数据库中新项目的新列表。
将基于回调的 API 转换为流
callbackFlow 是一个流构建器,可让您将基于回调的 API 转换为流。 例如, Firebase Firestore Android API 使用回调。 要将这些 API 转换为流并侦听 Firestore 数据库更新,您可以使用以下代码:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
与 Flow
构建器不同,callbackFlow
允许使用 send 函数从不同的 CoroutineContext
发出值,或者使用 offer 函数从协程外部发出值。
在内部,callbackFlow
使用一个 channel,它在概念上与阻塞 queue 非常相似。 一个通道配置了一个容量,即可以缓冲的最大元素数。 在 callbackFlow
中创建的通道默认容量为 64 个元素。 当您尝试将新元素添加到完整频道时,发送会暂停生产者,直到有新元素的空间,而 offer
不会将元素添加到频道并立即返回 false
。
额外 Flow 资料链接: