Android官方响应式框架Agera详解:二、Repository的创建和操作符

Android

前言

在上一篇文章中给小伙伴们介绍了 Agera 的相关概念以及基本使用。如果你对 Agera 还不了解,建议先看一下
Android官方响应式框架Agera详解:一、相关概念和基本使用

后一篇文章已经更新啦~
Android官方响应式框架Agera详解:三、Repository的更新规则及Agera+Retrofit+Okhttp实战

这里对把上一篇中几个重要的类/接口再列出来,加深一下印象:

类/接口 描述 作用
Updatable 观察者 更新事件
Observable 被观察者 添加/删除Updatable
Supplier 数据提供者 提供一个数据
Receiver 数据接收者 接收一个数据
Repository 拥有提供数据能力的被观察者 添加/删除Updatable、提供一个数据
MutableRepository 拥有 提供/更新 数据能力的被观察者 添加/删除Updatable、提供/更新一个数据

在这篇文章中,我将给小伙伴们介绍 Rpository 的用法。这是 Agera 中最精髓最重要的部分,我尽量把它讲清楚,如果有什么疑问的话可以留言交流

注:在上一篇文章中我们说过,Repository 分为 简单的 Repository 和复杂的 Repository 。简单的Repository 创建和使用在上一篇已经讲过,所以本篇的重点就是复杂的 Repository 的创建和使用。所以本文中以下所说的 Repository 默认指的是复杂的Repository

目录

一、如何创建Repository
二、数据处理流和操作符
三、编译表达式中的相关配置
四、Attempts && Result
五、总结
六、相关代码
七、预告

一、如何创建Repository

最开始介绍 Agera 的时候,有这么一句话:

通过加入函数式响应式编程,Agera 可以在 什么时机, 什么线程什么数据 层面上更清晰的分离数据处理流程,并且使用一个接近自然语言的单个表达式就能编写一个复杂的异步流

这段话就是对 Repository 的精确描述。一个 Repository 可以用单个Java表达式编译出来。Java表达式由下面几部分构成,顺序如下:

// 声明RepositoryCompiler,并初始化,返回REventSource实例;
1. `Repositories.repositoryWithInitialValue(...)`;
// 指定事件源(Observable),可以多个,返回RFrequency实例;
2. Event sources - `.observe(...)`;
// 设置通知频率(比如click频率限制),返回RFlow实例;
3. Frequency of reaction - `.onUpdatesPer(...)` or `.onUpdatesPerLoop()`;
// 设置数据源(Supplier),返回RFlow或RTermination实例;
4. Data processing flow - `.getFrom(...)`, `.mergeIn(...)`, `.transform(...)`, etc.;
// 其他配置,返回RConfig;
5. Miscellaneous configurations - `.notifyIf(...)`, `.onDeactivation(...)`, etc.;
// 编译成Repository实例。
6. `.compile()`.

上面就是创建一个 Repository 的伪代码。
接下来,我们看一下具体到代码中 Repository 是如何创建的:

    Repository<String> repository =
            Repositories.repositoryWithInitialValue("init")  //1.声明RepositoryCompiler,初始值为"init",返回REventSource实例
                    .observe()          //2. 指定事件源(Observable),可以多个或者不指定,返回RFrequency实例
                    .onUpdatesPerLoop()  //3. 设置通知频率,返回RFlow实例
                    .getFrom(new Supplier<Double>() {  //4. 设置数据源(Supplier),返回RFlow或RTermination实例
                        @NonNull
                        @Override
                        public Double get() {
                            //通过getFrom方法,获得一个数据值。当前值可以与声明时的初始值不同(数值和类型都可以不同)
                            //当前数据流的值为 5000000.00
                            return 5000000.00d;
                        }
                    })
                    .mergeIn(new Supplier<String>() { //5. 将当前数据流中的值 和 一个通过 Supplier 提供的新值进行合并,并返回一个值
                        @NonNull
                        @Override
                        public String get() {
                            //这里是新提供的一个值
                            return "祝大家的银行卡里余额为:";
                        }
                    }, new Merger<Double, String, String>() {
                        @NonNull
                        @Override
                        public String merge(@NonNull Double integer, @NonNull String tAdd) {
                            //这里将前数据流中的值和新值合并,并返回
                            //新值为 "祝大家的银行卡里余额为:"
                            //前数据流中的值 为 5000000.00d
                            //合并后的值为 "祝大家的银行卡里余额为:5000000.00"
                            return tAdd + integer;
                        }
                    })
                    .sendTo(new Receiver<String>() { //6. 将当前数据流中的值发送到 Receiver 对象中
                        @Override
                        public void accept(@NonNull String value) {
                            Log.d("tag", value);
                        }
                    })
                    .thenTransform(new Function<String, String>() {//7. 将数据流中的值进行最后转换
                        @NonNull
                        @Override
                        public String apply(@NonNull String input) {
                            //注意,转换后的数据类型必须与初始化时的数据类型相同
                            return input + " 吼吼吼~";
                        }
                    })
                    .compile(); //8. 最后一步,编译此数据流,返回Repiository对象

    Updatable updatable = new Updatable() {
        @Override
        public void update() {
            Log.d("tag", repository.get());
        }
    };

上面的这段代码比较长,我详细讲解一下,里面涉及到一到方法和操作符,在接下来会具体讲到。这里我们先关心的是创建 Repository 的流程及整个数据流中的数据是如何变化的

首先需要明确的是,创建 Repository 表达式的不同阶段都返回 RepositoryCompilerStates 中内嵌的接口(compiler state interfaces)对象,这样可以每个阶段只暴露合适的方法, 引导开发者完成正确的表达式

这三个阶段分别是:

  1. 事件源和响应频率: RFrequency 和 REventSource
  2. 数据处理流程: RFlow 和 RSyncFlow
  3. 其它配置: RConfig

好,我们现在开始分析上面的代码。大家看我标注的注释,我们根据序号进行分析:

1. 初始化声明

首先,我们使用Repositories.repositoryWithInitialValue("init") 声明 RepositoryCompiler ,并指定了一个 String 类型的初始值:"init"。这一条语句返回的是一个 REventSource 实例,对应上面提到的第一阶段 事件源和响应频率: RFrequency 和 REventSource

注意,由于这一步返回的是 REventSource 实例,所以接下来我们能调用的方法只有 observe() 方法。这就体现了上面所说的每个阶段只暴露合适的方法, 引导开发者完成正确的表达式
后面的步骤也是同样的道理

2. 设置观察源

第2步调用了方法 observe() ,这个方法可以接收0个到多个参数(参数类型是 Observable),返回 RFrequency 实例

没有接收参数的时候,说明该 Repository 不需要监听其他额外的事件源,只有 Repository 第一次变为激活状态时(即之前没有注册过 Updatable,第一次注册 Updatable 后变为激活状态)整个数据流开始执行

有接收参数的时候,除了第一次变为激活状态时会执行数据流,当接收参数中的被观察者状态改变的时候,整个数据流也会重新开始执行

该方法返回 RFrequency 实例,对应第一阶段 事件源和响应频率: RFrequency 和 REventSource

3. 设置通知频率

RFrequency 对象对应的方法有两个,onUpdatesPer(int millis) 和 onUpdatesPerLoop()。这两个方法的作用都是设置通知频率,返回 RFlow 实例,对应第二阶段数据处理流程: RFlow 和 RSyncFlow

onUpdatesPerLoop 代表线程中的 looper 循环一次就更新一次数据流
onUpdatesPer(int millis) 则是根据我们传入的时间间隔来更新数据流

Agera 底层的消息通知是通过 Handler 实现的,onUpdatesPerLoop 意思就是和 Looper 循环器保持同样的更新频率

4. 获取一个新的数据

上面一步操作返回的是返回 RFlow 实例,对应第二阶段数据处理流程: RFlow 和 RSyncFlow。也就是说在从这一步,我们开始对数据流中的数据进行操作了

这一步使用 getFrom 方法从一个 Supplier 对象中拿到了一个 Double 类型的数据:5000000.00d

注意,我们需要重点掌握的就是数据流中的数据是如何变化和转换的。在最开始的时候我们声明的是一个 String 类型的数据 "init",到这一步变成了 Double 类型的数据 5000000.00d

getFrom 返回 RFlow 实例,对应第二阶段数据处理流程: RFlow 和 RSyncFlow。也就是说,接下来,我们还是处于第二阶段,仍然可以对数据流进行操作

在整个数据流中,不仅仅可以改变数据的值,连类型也是可以改变的。只需要保证最后的结果和最开始声明的初始值类型一致即可,中间的数据可以根据需要任意转换和处理

5. 合并一个数据

mergeIn 方法作用是将两个数据合并。它有两个参数,第一个参数是 Supplier 对象,它提供的值就是要合并的值。第二个参数是一个 Merger 对象,它有三个泛型,第一个泛型代表 当前数据流中的数据类型,也就是 Double,第二个泛型代表 要合并数据类型,即 Supplier 对象中提供的值的类型,也就是 String。第三个泛型代表 前面两个值合并后返回的值的数据类型,这里我们是把两个值拼接到一起了,所以就是 String

我们来看一下数据流中的值的变化,当前数据流中的值是 Double 类型的 5000000.00d,mergeIn 方法中第一个参数提供了一个 String 类型的值 "祝大家的银行卡里余额为:",后面的 Merger 中的操作是将这两个值拼接起来,所以经过这一步,数据流中的值就变成了 String 类型的 "祝大家的银行卡里余额为:5000000.00"

mergeIn 方法返回 RFlow 实例,对应第二阶段数据处理流程: RFlow 和 RSyncFlow。接下来,我们还是处于第二阶段,仍然可以对数据流进行操作

6. 发送数据

sendTo 方法的作用是将数据流中的数据传递给一个 Receiver 对象。我们在 Receiver 对象中接收到后,打印出了当前数据流中的值,即"祝大家的银行卡里余额为:5000000.00"

sendTo 方法返回的是当前对象this,也就是说整个数据流还处于第二阶段,仍然可以对数据进行操作

7. 转换数据,结束数据流处理

thenTransform 方法作用是对一个数据进行转换,并返回 RConfig 实例,对应第三阶段其它配置: RConfig

.then** 代表了数据流的结束,也就是说,经过这个方法,我们对数据流的操作就结束了。
它的返回值是一个 RConfig 对象,对应第三阶段

thenTransform 方法参数是一个 Function 对象。
Function 对象的作用是根据提供的数据,返回另一个数据,也就是对数据进行了一次转换。它有两个泛型参数,第一个代表输入值的类型,第二个代表输出值的类型

输入值,也就是当前数据流中的值,即 String 类型的"祝大家的银行卡里余额为:5000000.00"
输出值,因为这是对数据处理的最后一步,输出值的类型必须与最开始提供的初始值类型相同,即 String 类型

在拿到当前数据流中的值后,我们在其后面追加了" 吼吼吼~",所以数据流的最终结果为 "祝大家的银行卡里余额为:5000000.00 吼吼吼~"

8. 编译此数据流

最后一步,编译此数据流,返回 Repiository 对象

注意,在 .then** 操作后返回 RConfig 对象,这里还可以进行一些特殊配置,下面我们会讲。在这个例子中没有用到,直接编译成 Repiository 对象了

我们运行一下程序,看一下输出值:

05-21 16:27:58.463 11791-11791/com.cmos.agerademo D/tag: 祝大家的银行卡里余额为:5000000.0
05-21 16:27:58.566 11791-11791/com.cmos.agerademo D/tag: 祝大家的银行卡里余额为:5000000.0 吼吼吼~

可以看到,第一条log是我们在第6步的时候输出的,
第二条log是在 Updatable 中输出的,也就是最终的结果

到这里,关于创建一个 Repiository 对象的流程就已经分析完了。我并没有直接把所有的方法和操作符列举出来,而是想先通过这个例子,让大家了解一下整个数据流中的数据是如何变化的。这才是最最重要的,只有理解了这个,然后再去配合操作符,才能正确地流畅的去使用 Repository

我当时在学习的时候,把重点放在了各种操作符还有方法上,用起来迷迷糊糊的,用了好久我都不清楚数据到底是怎么一步一步变化的!所以,一定要先把数据是如何变化的搞清楚,然后再去学习各种操作符

二、数据处理流和操作符

在上一部分,给大家介绍了创建 Repository 的表达式分为3个部分:

  1. 事件源和响应频率: RFrequency 和 REventSource
  2. 数据处理流程: RFlow 和 RSyncFlow
  3. 其它配置: RConfig

这三个部分所对应的分别是 RepositoryCompilerStates 中内嵌的接口(compiler state interfaces)对象
每一个接口对象拥有不同的方法,这样可以每个阶段只暴露合适的方法, 引导开发者完成正确的表达式

我把所有的接口和相关方法都列了出来:

接下来给大家介绍常用的操作符和方法

1. Supplier && getFrom

public interface Supplier<T> {
  @NonNull
  T get();
}
 <TCur> RFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier)

Supplier 是一个数据提供者,它是一个没有输入值,有一个输出值的操作符

getFrom(Supplier<TCur> supplier) 方法作用是从Supplier中拿到数据值作为当前数据流中的值

getFrom.png

除了 getFrom(Supplier) 方法,其变种方法也是同样的道理。比如:attemptGetFrom(Supplier)、thenGetFrom(Suppiler) 等

2. Function && transform

public interface Function<TFrom, TTo> {
  @NonNull
  TTo apply(@NonNull TFrom input);
}
 /** 
 * Transform the input value using the given function into the output value.
 */
@NonNull
<TCur> RSyncFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);

Function 的作用是 基于一个输入值,返回一个输出值。它有两个泛型参数,第一个代表输入值的数据类型,第二个代表输出值的数据类型

transform(Function<? super TPre, TCur> function) 方法的作用是 将输入值根据传入的Function对象转换成输出值

transform.png

除了 transform(Supplier) 方法,其变种方法也是同样的道理。比如:attempTransform(Function)、thenTransform(Function) 等

3. Merger && mergeIn

public interface Merger<TFirst, TSecond, TTo> {

  /**
   * Computes the return value merged from the two given input values.
   */
  @NonNull
  TTo merge(@NonNull TFirst first, @NonNull TSecond second);
}
  @NonNull
  @Override
  <TAdd, TCur> RFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,
  @NonNull Merger<? super TPre, ? super TAdd, TCur> merger);

Merger 是作用是根据两个输入值,返回一个输出值。它有3个泛型参数,第一个代表第一个输入值的数据类型,第二个代表第二个输入值的数据类型,第三个代表输出值的数据类型

mergeIn(Supplier<TAdd> supplier,
@NonNull Merger<? super TPre, ? super TAdd, TCur> merger)的作用就是以当前数据流中的值作为merger对象中的第一个输入值,以supplier对象中的值作为merger对象中的第二个输入值,并根据这两个值返回一个数据

merge.png

除了 mergeIn 方法,其变种方法也是同样的道理。比如:attemptMergeIn、thenAttemptMergeIn 等

4. Receiver && sendTo

public interface Receiver<T> {
  /**
   * Accepts the given {@code value}.
   */
  void accept(@NonNull T value);
}
TSelf sendTo(@NonNull Receiver<? super TPre> receiver);

Receiver 的作用是接收一个数据。它有一个泛型参数,该泛型的类型就是接收值的数据类型

sendTo(@NonNull Receiver<? super TPre> receiver)方法的作用是将当前数据流中的数据发送给Receiver对象

sendTo.png

关于 sendTo 这里需要说明一下,它是把当前的数据流中的值发送给 Receiver 对象。并不是复制一份给 Receiver 对象

这说明了什么?说明了 sendTo 这个操作是同步的!。当在执行 Receiver 对象中的方法的时候,整个数据流会阻塞,等到 Receiver 对象方法执行完毕后数据流才继续往下执行

5. Binder 和 bindWith

public interface Binder<TFirst, TSecond> {

  /**
   * Accepts the given values {@code first} and {@code second}.
   */
  void bind(@NonNull TFirst first, @NonNull TSecond second);
}
<TAdd> TSelf bindWith(@NonNull Supplier<TAdd> secondValueSupplier,
@NonNull Binder<? super TPre, ? super TAdd> binder);

Binder 的作用是 接收两个值。它的泛型参数值分别代表第一个输入值的数据类型、第二个输入值数据类型。

它与上面提到的 Suppiler 是类似的,不同的是多接收了一个参数

bindWith 方法有两个参数,第一个参数是一个 Suppiler 对象,代表传递给 Binder 对象的第二个输入值,Binder 对象中的第一个输入值是当前数据流中的值

binder.png

注意,这个方法和 sendTo 方法类似,也是同步的

6. Predicate && check

public interface Predicate<T> {
  /**
   * Returns whether the predicate applies to the input {@code value}.
   */
  boolean apply(@NonNull T value);
}
RTermination<TVal, TPre, TSelf> check(@NonNull Predicate<? super TPre> predicate);

Predicate 的作用是根据一个输入值,返回ture或者false

check(@NonNull Predicate<? super TPre> predicate) 的作用就是检查 Predicate 对象中的返回值,如果为 true ,则继续进行后续数据流操作。如果为 false ,则跳过后续数据流或走失败逻辑

check.png

7. .then**

这个.then是什么意思呢?它代表的意思就是对当前数据流中数据处理部分(即数据处理流程: RFlow 和 RSyncFlow)已经处理完毕,接下来要进行的是第三部分的操作

我们看一下最开始的代码

   ...
   ... //这里还有很多对数据流的操作
  .sendTo(new Receiver<String>() {
          ...
          ...
     })
    //注意,这里的thenTransform就代表了这是对数据流处理的最后一步,执行完这一步就进入到下一部分了
 .thenTransform(new Function<String, String>() 
    ...  
    ...
  )
 .compile();

.then** 有很多方法,比如 thenGetFrom、 thenTransform、thenAttemptTransform等

记住,当我们对数据流处理的最后一步,一定是 .then** 的方法,这样才能顺利的进入到下一步,否则的话会永远停留在数据流处理的阶段

一般来说,最后一步使用最多的就是 thenTransform 这个方法。我们在最开始就讲过,数据流结束后的数据类型必须和初始类型相同,而 .then** 表示的是数据处理的最后一步,所以一般情况下我们需要用 thenTransform 在数据流的最后一步把数据转换成初始值的数据类型

8. goTo(Executor executor)

我们在上一篇就讲过,Agera 切换线程非常的方便。没错,goTo 这个方法就是用来切换线程的

      ...
     .goTo(Executors.newSingleThreadExecutor())
      ...

当调用 goTo 方法后,此方法下面的数据流将会切换到指定的线程中去执行

注,Agera 是支持多次线程切换的,所以可以多处调用 goTo 方法,将不同部分的处理切换到不同的线程中

好了,以上就是对 Repository 中数据流部分的方法和操作符的讲解。如果你对这些还不是特别熟悉的话,建议你按照本篇最开始的代码那样,多写几遍,方法和操作符任意组合,数据任意转换

注意,上面介绍的一些方法会有一些变种,比如 getFrom 方法就会有 attemptGetFrom 等。他们的作用是类似的,但又有不同的地方。下面我们会讲解到

三、编译表达式中的相关配置

在上面的部分,主要给大家介绍了关于数据流操作的方法和操作符,所有的操作都对应于编译表达式的第二阶段:数据处理流程: RFlow 和 RSyncFlow

在这一部分,将会给大家介绍编译表达式中的第一部分和第三部分

这里特别说明一下,下面介绍的有些内容到目前为止可能并没有接触到,而且会有很少使用的一些方法,所以大家在看的时候可以先有个大体印象就好,不必深究

事件源和响应频率: RFrequency 和 REventSource

1. REventSource

 interface REventSource<TVal, TStart> {
    /**
     * Specifies the event source of the compiled repository.
     */
    @NonNull
    RFrequency<TVal, TStart> observe(@NonNull Observable... observables);
  }

REventSource 是一个接口,包含一个 observe 方法,它的作用就是设置监听的事件源

REventSource 实例是如何产生的呢?

 Repositories.repositoryWithInitialValue("init")           

就是我们编译表达式的第一步,当我们声明一个 Repository 的初始值后,它返回的就是一个 REventSource 实例

Agera 这样做的目的就是告诉我们,声明完初始值下一步应该设置事件源了,用于引导我们正确的完成表达式的编译

也就是说,上面的代码继续写下去,就只能是

Repositories.repositoryWithInitialValue("init") 
            .observe()   

2. RFrequency

  interface RFrequency<TVal, TStart> extends REventSource<TVal, TStart> {
    @NonNull
    RFlow<TVal, TStart, ?> onUpdatesPer(int millis);

    @NonNull
    RFlow<TVal, TStart, ?> onUpdatesPerLoop();
  }

RFrequency 也是一个接口,用于设置更新频率

onUpdatesPerLoop 代表线程中的 looper 循环一次就更新一次数据流
onUpdatesPer(int millis) 则是根据我们传入的时间间隔来更新数据流

Agera 底层的消息通知是通过 Handler 实现的,onUpdatesPerLoop 意思就是和 Looper 循环器保持同样的更新频率

RFrequency 实例是如何来的呢?它是通过 REventSource 的 observe 方法返回的

Repositories.repositoryWithInitialValue("init")
                    .observe() //返回RFrequency 实例

当我们得到 RFrequency 实例后,就只能去设置更新频率

Repositories.repositoryWithInitialValue("init")
            .observe()
            .onUpdatesPerLoop() //设置更新频率

其它配置: RConfig

interface RConfig<TVal> {

     RConfig<TVal> notifyIf(@NonNull Merger<? super TVal, ? super TVal, Boolean> checker);

     RConfig<TVal> onDeactivation(@RepositoryConfig int deactivationConfig);

     RConfig<TVal> onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig);

     Repository<TVal> compile();
}

RConfig 接口代表的是整个编译表达式的最后一部分。从最开始的初始化,然后进行一系列的数据流操作之后,这个时候,就可以对 Repository 进行最后的一些设置

其实还有两个方法,由于不常使用,我没有写出来。如果感兴趣的去可以自行去查看源码

1. notifyIf(Merger<? super TVal, ? super TVal, Boolean> checker)

这个方法的作用是根据 Merger 对象的返回值决定是否要通知 Updatable 更新

上面我们讲过 Merger 是一个 两个输入值,一个输出值的操作符,在 notifyIf 方法中,第一个输入值代表的是旧的数据(即上一次数据处理流执行完后得到的数据),第二个输入值代表的是新的数据(即本次数据处理流执行完毕后得到的新数据)

我们可以看到,Merger 对象的第三个泛型参数,也就是输出值的类型,已经被指定为 Boolean 型了,说明需要我们根据旧的数据和新的数据,决定是否要通知Updatable 更新(返回值为true代表更新,false为不更新)

2. onDeactivation(@RepositoryConfig int deactivationConfig)

这个方法的作用是当数据仓库变为不活跃状态时数据处理流的行为

它的参数是 RepositoryConfig 接口中的4个变量

//继续数据流的执行
1. RepositoryConfig.CONTINUE_FLOW  

//取消数据流的执行
2. RepositoryConfig.CANCEL_FLOW 

//将仓库中的数据值重新设置成初始值(针对onDeactivation方法)
//不重置数据流中的值,只是取消数据流的执行(针对并发配置,即 onConcurrentUpdate 方法。下面会讲到)
3. RepositoryConfig.RESET_TO_INITIAL_VALUE = 2 | CANCEL_FLOW  

//当数据流处于正在执行中并且是异步状态的时候(从第一次 goTo 指令后到 goLazy 指令前),中断当前运行流程的线程
4. RepositoryConfig.SEND_INTERRUPT = 4 | CANCEL_FLOW;

上面对4个参数值都做了对应的说明,我们使用onDeactivation 方法的时候,就可以控制数据仓库变为不活跃状态时数据处理流该如何执行

举个列子,我们设置成 .onDeactivation(RepositoryConfig.CANCEL_FLOW)。当数据流正在执行的时候,比如说正在加载一张图片,这时候我们把页面退出了,也就是调用了 repository.removeUpdatable(updatable); 这个时候仓库就由活动状态变成了非活动状态,那么根据我们的设置,正在加载图片的数据流就会被取消,不再继续往下执行了

这个方法默认的值是RepositoryConfig#CONTINUE_FLOW,即当仓库变为不活跃状态时,仍然执行数据流直到数据流执行完毕

3. onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig)

这个方法的作用是声明当数据流正在执行的同时又收到了一个事件源更新的情形下的数据流的行为

说简单直白一点,就是数据流正在执行呢,收到了一个被观察者的更新,这时候数据流应该是继续执行还是取消执行

这个方法的参数也是上面那4个值,上面已经讲解过

注,这个方法的默认值是 RepositoryConfig#CONTINUE_FLOW,继续数据流的执行

4. compile()

这个方法的作用是编译上面的数据流,返回一个 Repository 对象,这是整个编译表达式的最后一步

小结

到这里,我们就对创建 Repository 的整个编译表达式做了讲解,其实还是很有规律的,总共分为3个部分:

  1. 先初始化,然后设置事件源和更新频率
  2. 对数据进行各种操作
  3. 设置一些需要的配置,最后调用compile()
    Repository<String> repository =
            //1. 先初始化,然后设置事件源和更新频率
            Repositories.repositoryWithInitialValue("init")
                    .observe()
                    .onUpdatesPerLoop()
            //2. 对数据进行各种操作
                    .getFrom(new Supplier<Float>() {
                        @NonNull
                        @Override
                        public Float get() {
                            return 50000000.00f;
                        }
                    })
                    ...
                    ...
                    .thenTransform(new Function<String, String>() {
                        @NonNull
                        @Override
                        public String apply(@NonNull String input) {
                            return input + " 吼吼吼~";
                        }
                    })

                 //3.设置一些需要的配置,最后调用compile()
  
                    .onDeactivation(RepositoryConfig.CANCEL_FLOW)
                    .onConcurrentUpdate(RepositoryConfig.CANCEL_FLOW)
                    .compile();

四、Attempts && Result

这部分会给大家讲解一个非常非常重要的知识点:Agera 中的异常处理

在前面所有的讲解中,我们都默认了这一步的操作是可以正确执行的。实际上并不是这样,异常无处不在。比如进行一个网络请求,我们就需要处理网络请求失败的情况

我们先看一下没有使用异常处理是什么样的:

    Repository repository = Repositories.repositoryWithInitialValue(1)
            .observe()
            .onUpdatesPerLoop()
            .thenGetFrom(new Supplier<Integer>() {
                @NonNull
                @Override
                public Integer get() {
                    return 1 / 0;
                }
            })
            .compile();

    Updatable updatable = new Updatable() {
        @Override
        public void update() {
            Log.d("tag", repository.get() + "");
        }
    };

我们想要去拿到 1/0 的结果,而且没有做异常处理。运行程序,崩溃了

 java.lang.ArithmeticException: divide by zero

为了捕获异常,Agera 提供了一个封装类 Result,它可以封装易失败操作的结果(成功或者失败)或者attempt的结果值

1. Result

Result 类中对可能失败的操作做了异常封装,保证程序能够正常运行下去而不是像上面那样崩溃

Result 能够做什么?

  1. 表示成功并提供一个数据 -> Result.success(num)
  2. 表示失败并提供异常信息 -> Result.failure(new Throwable("网络请求失败")
  3. 表示默认缺省 -> Result.absent()

在 Result 类中,有一个 value 字段。当 Result 表示成功时候,可以通过 get() 方法获取到该值。如果 Result 是失败或者默认缺省时,该值为空

Result 类中的常用方法:

//根据给定值返回一个成功的Result对象
 public static <T> Result<T> success(@NonNull final T value) {
    return new Result<>(checkNotNull(value), null);
  }
//根据给定值返回一个成功的Result对象,它上面方法的别名
  public static <T> Result<T> present(@NonNull final T value) {
    return success(value);
  }

//根据传入的Throwable 返回一个失败的Result对象
 public static <T> Result<T> failure(@NonNull final Throwable failure) {
    return failure == ABSENT_THROWABLE
        ? Result.<T>absent() : new Result<T>(null, checkNotNull(failure));
  }

//返回一个失败的Result对象
  public static <T> Result<T> failure() {
    return (Result<T>) FAILURE;
  }

  //返回该对象中封装的值
  public T get() throws FailedResultException {
    if (value != null) {
      return value;
    }
    throw new FailedResultException(failure);
  }

//如果 Result 对象是成功的(即下面的value != null),将结果值发送给一个 Receiver 对象
 public Result<T> ifSucceededSendTo(@NonNull final Receiver<? super T> receiver) {
    if (value != null) {
      receiver.accept(value);
    }
    return this;
  }

//如果 Result 对象是失败的(即下面的failure != null),将失败信息发送给一个 Receiver 对象
 public Result<T> ifFailedSendTo(@NonNull final Receiver<? super Throwable> receiver) {
    if (failure != null) {
      receiver.accept(failure);
    }
    return this;
  }

注:关于 Result 这个类,是在是不知道该怎么把它讲解好。如果你看得不太理解的话,建议多在代码中去使用,并且去看一下这个类的源码(这个类其实挺简单的)

2. .attempt*

Agera 提供了能感知错误的指令,这样就可以在错误的情况下终止流程

.attemptGetFrom(Supplier).or…;
.attemptTransform(Function).or…;
.attemptMergeIn(Supplier, Merger).or…,

我们把感知错误的指令与之前讲到的普通指令对比一下:

普通指令不能处理异常,并且指令后面返回的是 RFlow 或 RSyncFlow ,能够直接继续进行数据流操作

感知错误的指令能够感知到异常,并且指令后面返回的是一个 RTerminationOrContinue 或者 RTermination 对象。这两个对象提供的方法决定了当发生异常后数据流该如何处理

我们来看一下发生异常后的处理方法:

//当前一个指令发生异常时,跳过剩下的数据流(即不在继续执行),并且不通知 Updatable
1. orSkip()

//当前一个指令发生异常时,在 Function 对象中,根据传入的错误异常,返回一个值(这个值可以是成功也可以是失败,按具体需要处理)
2. orEnd(@NonNull Function<? super TTerm, ? extends TVal> valueFunction)

//当前面的指令发生异常时,继续剩下的数据处理流。并使用 Result.getFailure() 中的值作为下一个指令的输入值
3.orContinue()

好,接下来我们把上面的例子改造一下:

    Repository repository2 = Repositories.repositoryWithInitialValue(0)
            .observe()
            .onUpdatesPerLoop()
            //1.我们使用thenAttemptGetFrom方法,去处理一个可能发生异常的操作
            .thenAttemptGetFrom(new Supplier<Result<? extends Integer>>() {
                @NonNull
                @Override
                public Result<? extends Integer> get() {
                    try {
                        int num = 1 / 0;
                        //2. 如果不发生异常,返回一个表示成功的Result
                        return Result.success(num);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //3.如果发生了异常,返回一个表示失败的Result
                        return Result.failure(e);
                    }
                }
            })
            //4.对异常情况进行处理
            .orEnd(new Function<Throwable, Integer>() {
                @NonNull
                @Override
                public Integer apply(@NonNull Throwable input) {
                    //5. 当发生异常的时候,返回一个-1
                    return -1;
                }
            })
            .compile();

我们对于异常进行处理后,运行看一下效果,发现程序正常运行,并且最后得到的值是-1

当我们使用了.attempt* 指令和 Result 封装后,整个数据流中的泛型就变得比之前复杂很多。这一块的的知识讲解起来比较抽象,不好描述。所以我还是建议大家多去写写,才能好更好的理解

五、总结

这一篇文章主要给大家介绍了如何创建一个 Repository 、以及 Repository 数据流中的方法和操作符还有对异常的处理

  1. 创建一个 Repository 分为3个部分,每一部分都有其特定的指令
 Repository<String> repository =
            //1. 先初始化,然后设置事件源和更新频率
            Repositories.repositoryWithInitialValue("init")
                    .observe()
                    .onUpdatesPerLoop()
            //2. 对数据进行各种操作
                    .getFrom(new Supplier<Float>() {
                        @NonNull
                        @Override
                        public Float get() {
                            return 50000000.00f;
                        }
                    })
                    ...
                    ...
                    .thenTransform(new Function<String, String>() {
                        @NonNull
                        @Override
                        public String apply(@NonNull String input) {
                            return input + " 吼吼吼~";
                        }
                    })

                 //3.设置一些需要的配置,最后调用compile()
  
                    .onDeactivation(RepositoryConfig.CANCEL_FLOW)
                    .onConcurrentUpdate(RepositoryConfig.CANCEL_FLOW)
                    .compile();
  1. Agera 提供了能感知错误的指令
    普通指令不能处理异常,并且指令后面返回的是 RFlow 或 RSyncFlow ,能够直接继续进行数据流操作。
    感知错误的指令能够感知到异常,并且指令后面返回的是一个 RTerminationOrContinue 或者 RTermination 对象。这两个对象提供的方法决定了当发生异常后数据流该如何处理

  2. 关于异常封装的 Result 类 结合.attempt* 指令后,整个数据流中的泛型就变得比之前复杂很多。数据流中整体的数据是如何变化的,大家亲自动手去写去分析,这样才能理解透彻

六、相关代码

https://github.com/smashinggit/Study

注:此工程包含多个module,本文所用代码均在AgeraDemo下

七、预告

本来计划是这篇文章讲解 Repository 并且配套讲解一个实例,然后写着写着发现内容实在是太多了。所以这一篇整体就比较偏理论

在下一篇文章中,我将会用实例去带大家在实战中体验一下 Agera 的强大,敬请期待~

后一篇文章已经更新啦~
Android官方响应式框架Agera详解:三、Repository的更新规则及Agera+Retrofit+Okhttp实战

注:由于本人水平有限,所以难免会有理解偏差或者使用不正确的问题。如果小伙伴们有更好的理解或者发现有什么问题,欢迎留言批评指正~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,552评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,666评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,519评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,180评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,205评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,344评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,781评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,449评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,635评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,467评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,515评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,217评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,775评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,851评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,084评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,637评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,204评论 2 341