响应式编程与RxJava

1. 响应式编程

1.1 响应式编程概念

  • 响应式编程是一种通过异步和数据流来构建事物关系的编程模型。
  • 事物的关系 也可以说成是 业务逻辑 ,是响应式编程的核心理念。
  • 数据流异步 是实现这个核心理念的关键。异步和数据流都是为了正确的构建事物的关系而存在的。

1.2 响应式编程demo

int a=1;
int b=a+1;
System.out.print(“b=”+b)    //  b=2
a=10;
System.out.print(“b=”+b)    //  b=2

上面是一段很常见的代码,简单的赋值打印语句,但是这种代码有一个缺陷,那就是如果我们想表达的并不是一个赋值动作,而是b和a之间的关系,即无论a如何变化,b永远比a大1。那么可以想见,我们就需要花额外的精力去构建和维护一个b和a的关系。
而响应式编程的想法正是企图用某种操作符帮助你构建这种关系。
它的思想完全可以用下面的代码片段来表达:

int a=1;
int b <= a+1;   // <= 符号只是表示a和b之间关系的操作符
System.out.print(“b=”+b)    //  b=2
a=10;
System.out.print(“b=”+b)    //  b=11

响应式编程的思想,它希望有某种方式能够构建关系,而不是执行某种赋值命令。

应用初始化.png

比如在收单应用初始化逻辑中,先完成SDK初始化,数据库初始化,签到,才会跳转到交易菜单界面。

在响应式编程中,这一流程可以这样解读


应用初始化2.png

在初始化过程中,SDK初始化,数据库初始化,签到这些业务完成之后才会去安排页面跳转的操作,那么这些上游的业务在自己工作完成之后,就需要通知下游,通知下游的方式有很多种,响应式编程的方式就是通过数据(事件)流。

每一个业务完成后,都会有一条数据(一个事件)流向下游,下游的业务收到这条数据(这个事件),才会开始自己的工作。

我们能发现SDK初始化,数据库初始化,签到这三个业务本身相互独立,应当在不同的线程环境中执行,以保证他们不会相互阻塞。而假如没有异步编程,我们可能只能在一个线程中顺序调用这三个相对耗时较多的业务,最终再去做页面跳转,这样做不仅没有忠实反映业务本来的关系,而且会让你的程序“反应”更慢。

总的来说,异步和数据流都是为了正确的构建事务的关系而存在的。只不过,异步是为了区分出无关的事务,而数据流(事件流)是为了联系起有关的事务。

2. RxJava

Rx是响应式拓展,即支持响应式编程的一种拓展,用来处理事件和异步任务。

2.1 RxJava的优点

简洁。而且当业务越繁琐越复杂时这一点就越显出优势——它能够保持简洁。

2.2 RxJava的基本概念

我们都知道监听者模式,订阅模式这些概念。而Observable和Subscribers的英文意思就是如此。我们大概也知道差不多和监听者模式差不多。

  • Observable事件源,被观察者。
  • Observer / Subcriblers 观察者,事件订阅者
  • subscribe() 方法,绑定Observable与Subcribler或者Observabler
  • 事件 (包括 onNext,onComplete,onError 等事件)

以第一章的初始化应用为例:

Observable obserInitSDK=Observable.create((context)->{initSDK(context)}).subscribeOn(Schedulers.newThread())

Observable obserInitDB=Observable.create((context)->{initDatabase(context)}).subscribeOn(Schedulers.newThread())

Observable obserLogin=Observable.create((context)->{Login(context)})
                            .subscribeOn(Schedulers.newThread())
// 合并多个Observables的发射物                           
Observable observable = Observable.merge(obserInitSDK,obserInitDB,obserLogin)
// 订阅被观察者
observable.subscribe(()->{startActivity()})

当initSDK,initDB,Login都是耗时较长的操作时,遵照业务关系编写响应式代码可以极大的提高程序的执行效率,降低阻塞。
从上面代码中,可以看出,响应式编程有如下优点

  • 在业务层面实现代码逻辑分离,方便后期维护和拓展
  • 极大提高程序响应速度,充分发掘CPU的能力
  • 帮助开发者提高代码的抽象能力和充分理解业务逻辑
  • Rx丰富的操作符会帮助我们极大的简化代码逻辑

2.3 操作符决策树

RxJava的几种主要操作符:

  • 创建操作:直接创建一个Observable
  • 组合操作:组合多个Observable
  • 变换操作:对Observable发射的数据执行变换操作
  • 过滤操作:从Observable发射的数据中取特定的值
  • 条件/布尔/过滤操作:转发Observable的部分值
  • 算术/聚合操作:对Observable发射的数据序列求值

创建操作

用于创建Observable的操作符

  • Create — 通过调用观察者的方法从头创建一个Observable
  • Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
  • Empty/Never/Throw — 创建行为受限的特殊Observable
  • From — 将其它的对象或数据结构转换为Observable
  • Interval — 创建一个定时发射整数序列的Observable
  • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
  • Range — 创建发射指定范围的整数序列的Observable
  • Repeat — 创建重复发射特定的数据或数据序列的Observable
  • Start — 创建发射一个函数的返回值的Observable
  • Timer — 创建在一个指定的延迟之后发射单个数据的Observable

变换操作

用于对Observable发射的数据进行变换

  • Buffer — 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
  • GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
  • Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
  • Scan — 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值
  • Window — 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集

过滤操作

用于从Observable发射的数据中进行选择

  • Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作
  • Distinct — 去重,过滤掉重复数据项
  • ElementAt — 取值,取特定位置的数据项
  • Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
  • First — 首项,只发射满足条件的第一条数据
  • IgnoreElements — 忽略所有的数据,只保留终止通知(onError或onCompleted)
  • Last — 末项,只发射最后一条数据
  • Sample — 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
  • Skip — 跳过前面的若干项数据
  • SkipLast — 跳过后面的若干项数据
  • Take — 只保留前面的若干项数据
  • TakeLast — 只保留后面的若干项数据

组合操作

用于将多个Observable组合成一个单一的Observable

  • And/Then/When — 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
  • CombineLatest — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
  • Join — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
  • Merge — 将两个Observable发射的数据组合并成一个
  • StartWith — 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
  • Switch — 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
  • Zip — 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

错误处理

用于从错误通知中恢复

  • Catch — 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复
  • Retry — 重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止

辅助操作

用于处理Observable的操作符

  • Delay — 延迟一段时间发射结果数据
  • Do — 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作
  • Materialize/Dematerialize — 将发射的数据和通知都当做数据发射,或者反过来
  • ObserveOn — 指定观察者观察Observable的调度程序(工作线程)
  • Serialize — 强制Observable按次序发射数据并且功能是有效的
  • Subscribe — 收到Observable发射的数据和通知后执行的操作
  • SubscribeOn — 指定Observable应该在哪个调度程序上执行
  • TimeInterval — 将一个Observable转换为发射两个数据之间所耗费时间的Observable
  • Timeout — 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
  • Timestamp — 给Observable发射的每个数据项添加一个时间戳
  • Using — 创建一个只在Observable的生命周期内存在的一次性资源

条件和布尔操作

用于单个或多个数据项,也可用于Observable

  • All — 判断Observable发射的所有的数据项是否都满足某个条件
  • Amb — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
  • Contains — 判断Observable是否会发射一个指定的数据项
  • DefaultIfEmpty — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
  • SequenceEqual — 判断两个Observable是否按相同的数据序列
  • SkipUntil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
  • SkipWhile — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
  • TakeUntil — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
  • TakeWhile — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

算术和聚合操作

用于整个数据序列

  • Average — 计算Observable发射的数据序列的平均值,然后发射这个结果
  • Concat — 不交错的连接多个Observable的数据
  • Count — 计算Observable发射的数据个数,然后发射这个结果
  • Max — 计算并发射数据序列的最大值
  • Min — 计算并发射数据序列的最小值
  • Reduce — 按顺序对数据序列的每一个应用某个函数,然后返回这个值
  • Sum — 计算并发射数据序列的和

另外还有连接操作转换操作,可以通过文档查看使用方法。

2.4 RxJava 基础框架解析

  • 先从比较常用的create方法看
public static Completable create(CompletableOnSubscribe source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new CompletableCreate(source));
    }

在 create 方法中,其实很简单,只是对 source 进行判空处理,并将 source 用 ObservableCreate 包装起来,并返回回去。下面让我们一起来看一下 ObservableCreate方法

public final class CompletableCreate extends Completable {

    final CompletableOnSubscribe source;

    public CompletableCreate(CompletableOnSubscribe source) {
        this.source = source;
    }

    // daizy -- 持有了上游 source 的引用,并重写 subscribeActual 方法
    @Override
    protected void subscribeActual(CompletableObserver observer) {
        Emitter parent = new Emitter(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

ObservableCreate 也很简单,它是 Observable 的子类,持有了上游 source 的引用,并重写 subscribeActual 方法,这个方法要结合订阅Subscribe源码看。

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(CompletableObserver observer) {
        // 检查 observer 是否为 null,为 null 抛出异常
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // RxJavaPlugins 插件的,暂时不管
            observer = RxJavaPlugins.onSubscribe(this, observer);
            // 检查 observer 是否为 null,为 null 抛出异常
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null CompletableObserver. Please check the handler provided to RxJavaPlugins.setOnCompletableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException ex) { // NOPMD
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            RxJavaPlugins.onError(ex);
            throw toNpe(ex);
        }
    }

subscribe 方法也比较简单,大概可以分为以下两步:

  • 第一步,对observer 进行判空,为空则抛出异常
  • 第二步,调用 subscribeActual 方法,在Observable类 中,subscribeActual 是一个抽象方法,要关注的是其实现类的subscribeActual方法。从上面的分析,我们知道,当我们调用 Observable create(ObservableOnSubscribe source) 方法的时候,最终会返回 ObservableCreate 实例。因此,我们只需要关注 ObservableCreate 的 subscribeActual 方法。
protected void subscribeActual(Observer<? super T> observer) {
        // CreateEmitter 是 ObservableCreate 的一个静态内部类
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            // source 是上游 ObservableOnSubscribe 的引用
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

继续看ObservableCreate的subscribeActual方法,在执行observer.onSubscribe 方法的时候,会将parent对象作为方法参数暴露出去,parent即是CreateEmitter,可以通过它的dispose方法取消订阅关系。
接着在调用source.subscribe(parent)的时候,会先调用ObservableOnSubscribe 的 subscribe 方法。
因此,我们可以得出,调用的顺序是:
Observable.subscrible -> Observable.subscribleActual -> Observable.subscribleActual -> observer.onSubscribe -> ObservableOnSubscribe.subscribe(emitter)
emitter是CreateEmitter的实例,包装了observe,调用emitter的方法,就会调用observe的 onNext 、onComolete/onError方法。

以上是RxJava基本原理,Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。

2.5 RxJava 线程切换

Observable通过subscribeOn方法来指定线程

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

通过代码可以看出,先对scheduler进行判空,然后用ObservableSubscribeOn 将scheduler 包装起来,接下来研究看看ObservableSubscribeOn这个类的源码。

final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

首先先来看他的构造函数 ,有两个参数 source ,scheduler。

  • source 代表上游的引用,是 Observable 的一个实例
  • scheduler 调度器可以通过 Schedulers.newThread() 或者 Schedulers.io() 创建相应的实例。
    RxJava 可用的调度器大概有下面几种,根据需求选择:
图片.png

我们主要看下这个方法

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

SubscribeTask 这个类,他是 ObservableSubscribeOn 的一个非静态内部类,可以看到 其实也比较简单,他实现了 Runnable 接口,并且持有 parent 引用。

 final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

在 run 方法中,通过 source.subscribe(parent) 建立联系。因而,当我们的 SubscribeTask 的 run 方法运行在哪个线程,相应的 observer 的 subscribe 方法就运行在哪个线程。

接下来再看看scheduleDirect的实现

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

这个方法主要是将task包装成DisposeTask,然后通过Worker进行调度。再看看Worker 是在做啥。
Scheduler我们以NewThreadScheduler为例子

public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    /**
     * daizy -- 线程池
     */
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }


    @NonNull
    @Override
    public Worker createWorker() {
        // 通过线程池来调度
        return new NewThreadWorker(threadFactory);
    }
}

通过代码可以看出来,Worker里头封装了线程池,所以RxJava的线程切换,也是基于线程池来处理。

回过来看DisposeTask

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

        @NonNull
        final Runnable decoratedRun;

        @NonNull
        final Worker w;

        @Nullable
        Thread runner;

        DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }

        @Override
        public Runnable getWrappedRunnable() {
            return this.decoratedRun;
        }
    }

DisposeTask 实现了 Disposable,Runnable ,SchedulerRunnableIntrospection 接口,Disposable 接口主要是用来取消订阅关系的 Disposable。

从上面的分析,可以得出Observable.subscribeOn方法,控制Observable的执行线程是通过将 Observable.subscribe(Observer) 的操作放在了指定线程中,当我们调用 subcribe 的时候,它的过程是从下往上的,即下面的 Observable 调用上面的 Observable。

用流程图描述如下:


图片.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容

  • 响应式编程简介 响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者...
    长夜西风阅读 3,039评论 0 5
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 899评论 0 2
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 2,947评论 0 21
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,178评论 2 8
  • 一、Retrofit详解 ·Retrofit的官网地址为 : http://square.github.io/re...
    余生_d630阅读 1,775评论 0 5