RxJava技术探索及其Android中的应用

1、RxJava简介

RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,Rx的全称是Reactive Extensions,直译过来就是响应式扩展。Rx基于观察者模式,他是一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。RxJava说白了就是用Java语言边写的使用各种操作符形成链式结构来处理异步数据流的工具。

2、RxJava基础及常用操作符

RxJava 有四个基本概念:
Observable(可观察者,即被观察者);
Observer(观察者);
subscribe(订阅)事件;
Observable和Observer通过 subscribe() 方法实现订阅关系,从而 Observable可以在需要的时候发出事件来通知 Observer。

(1) 创建 Observable被观察者

Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {    
@Override    
public void call(Subscriber<? super String> subscriber) {  
    subscriber.onNext("Hello, RxJava!");        
    subscriber.onCompleted();    
}});

(2) 创建 Observer观察者

Subscriber<String> mySubscriber = new Subscriber<String>() {   
    @Override    
    public void onCompleted() { }   
    @Override    
    public void onError(Throwable e) { }    
    @Override    
    public void onNext(String s) {       
    Log.d("haijiang", "--->" + s); }
};

Subscriber是Observer的扩展,在使用过程中,我们直接用Subscriber作为观察者对象就OK了!

(3) 建立订阅关系

myObservable.subscribe(mySubscriber);

一旦建立订阅关系,OnSubscribe中的call方法就会被调用,在call方法中主动触发了观察者的onNext,onCompleted方法,可看到输出“Hello, RxJava!” 。
以上是一个最基本的流程,我们可以写成链式调用:

Observable.create(new Observable.OnSubscribe<String>() {    
@Override    
public void call(Subscriber<? super String> subscriber) {  
    subscriber.onNext("Hello, RxJava!");        
    subscriber.onCompleted();    
}}).subscribe(new Subscriber<String>() {   
    @Override    
    public void onCompleted() { }   
    @Override    
    public void onError(Throwable e) { }    
    @Override    
    public void onNext(String s) {       
    Log.d("haijiang", "--->" + s); }
});

更多操作符欣赏:

just

subscribe方法有一个重载版本,接受三个Action1类型的参数,分别对应OnNext,OnComplete, OnError函数 
Observable.just("hello","rxjava","rxandroid").subscribe(new Action1<String>() {    
   @Override    
   public void call(String s) {       
        Log.d("haijiang", "--->" + s);   
 }});

ActionX没有返回,还有一种FuncX有返回的操作,后面会说。
just操作符是创建一个Observable,一次发送传入的参数。

from

/** * Observable.from()方法,它接收一个集合作为输入 */
String[] strArrsy = {"hello","rxjava","rxandroid"};
Observable.from(strArrsy).subscribe(new Action1<String>() {    
   @Override    
   public void call(String s) {        
        Log.d("haijiang", "--->" + s);    
}});

下面介绍两个重量级操作符map和flatMap,核心变换,灵活操作。

map

map是一对一的变化,将一个Observable<T>变换成Observable<R>

Observable.just("I LOVE YOU!").map(new Func1<String, Integer>() {   
    @Override    
    public Integer call(String s) {        
         return 520;    
   }}).subscribe(new Action1<Integer>() {    
    @Override    
    public void call(Integer s) {       
         Log.d("haijiang", "--->" + s);   
 }});

通过map,Func1将String转换成了Int;其中FuncX是RxJava的一个包装接口,跟ActionX类似,只不过FuncX是有返回对象的。

flatMap

flatMap()中返回的是个 Observable对象,并且这个 Observable对象并不是被直接发送到了 Subscriber
的回调方法中。flatMap()的原理是这样的:

  1. 使用传入的事件对象创建一个 Observable对象;
  2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的 Observable发送的事件,都被汇入同一个 Observable,而这个 Observable负责将这些事件统一交给Subscriber 的回调方法。
    这三个步骤,把事件拆成了两级,通过一组新创建的 Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
private ArrayList<Data> mData;
Observable.from(mData).flatMap(new Func1<Data, Observable<ChildData>>() {
      @Override 
      public Observable<ChildData> call(Data data) { 
             return Observable.from(data.getChildData); 
        } 
}).subscribe(new Action1<ChildData>() { 
      @Override 
      public void call(ChildData cd) {
             Log.d("haijiang", "--->" + cd.getName); 
  }});

concatMap

flatMap()操作符使用你提供的原本会被原始Observable发送的事件,来创建一个新的Observable。而且这个操作符,返回的是一个自身发送事件并合并结果的Observable。可以用于任何由原始Observable发送出的事件,发送合并后的结果。记住,flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序。为了防止交错的发生,可以使用与之类似的concatMap()操作符。综上所述,就是利用concatMap替换flatMap操作符,输入顺序就防止了交错,跟原始Obervable顺序一致。

timer()

timer操作符:用于创建Observabl,延迟发送一次。
下面延时两秒,输出log

Observable.timer(2, TimeUnit.SECONDS)
              .subscribe(new Observer<Long>() {
                  @Override
                  public void onCompleted() {
                      log.d ("completed");
                  }

                  @Override
                  public void onError(Throwable e) {
                      log.e("error");
                  }

                  @Override
                  public void onNext(Long number) {
                      log.d ("hello world");
                  }
              });

interval()

interval:用于创建Observable,用于每个XX秒循环进行某个操作

Observable.timer(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() { 
     @Override 
     public void call(Long aLong) { /
    /TODO WHAT YOU WANT 
   } 
});

delay()

delay:用于事件流中,可以延迟发送事件流中的某一次发送。

retryWhen

retryWhen()是RxJava的一种错误处理机制,当遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable。下面封装一个处理网络错误的类

public class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {

private long mInterval;

    public RetryWhenProcess(long interval) {

        mInterval = interval;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                        @Override
                        public Observable<?> call(Throwable throwable) {
                            if (throwable instanceof UnknownHostException) {
                                return Observable.error(throwable);
                            }
                            return Observable.just(throwable).zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Integer>() {
                                @Override
                                public Integer call(Throwable throwable, Integer i) {

                                    return i;
                                }
                            }).flatMap(new Func1<Integer, Observable<? extends Long>>() {
                                @Override
                                public Observable<? extends Long> call(Integer retryCount) {

                                    return Observable.timer((long) Math.pow(mInterval, retryCount), TimeUnit.SECONDS);
                                }
                            });
                        }
                    });
            }
        });
    }
}

使用方法:

.retryWhen(new RetryWhenProcess(5))

compose

compose()是针对 Observable自身进行变换。假设在程序中有多个 Observable,并且他们都需要应用一组相同的变换可以使用。
场景是这样的:work thread 中处理数据,然后 UI thread 中处理结果。当然,我们知道是要使用 subscribeOn() 和 observeOn() 进行处理。最常见的场景是,调server 的 API 接口取数据的时候,那么,那么多接口,反复写这两个操作符是蛋疼的,为了避免这种情况,我们可以通过 compse() 操作符来实现复用,下面面这段代码就实现了这样的功能。

/**
 * 这个类是 小鄧子 提供的!
 */
public class SchedulersCompat {
    private static final Observable.Transformer computationTransformer =
            new Observable.Transformer() {
                @Override public Object call(Object observable) {
                    return ((Observable) observable).subscribeOn(Schedulers.computation())
                            .observeOn(AndroidSchedulers.mainThread());
                }
            };
    private static final Observable.Transformer ioTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    private static final Observable.Transformer newTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    private static final Observable.Transformer trampolineTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.trampoline())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    private static final Observable.Transformer executorTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.from(ExecutorManager.eventExecutor))
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    /**
     * Don't break the chain: use RxJava's compose() operator
     */
    public static <T> Observable.Transformer<T, T> applyComputationSchedulers() {
        return (Observable.Transformer<T, T>) computationTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
        return (Observable.Transformer<T, T>) ioTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyNewSchedulers() {
        return (Observable.Transformer<T, T>) newTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyTrampolineSchedulers() {
        return (Observable.Transformer<T, T>) trampolineTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyExecutorSchedulers() {
        return (Observable.Transformer<T, T>) executorTransformer;
    }
}

使用方式:

.compose(SchedulersCompat.ioTransformer );

3、线程控制 — Scheduler

在RxJava 中,Scheduler—调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Schedule,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()比 newThread()更有效率。不要把计算工作放在 io()中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler,就可以使用 subscribeOn() 和 observeOn()两个方法来对线程进行控制。subscribeOn() 指定subscribe()所发生的线程,即 Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。 observeOn()指定Subscriber所运行在的线程,或者叫做事件消费的线程。

4RxJava在android开发中的一些应用

参考:可能是东半球最全的RxJava使用场景小结 (http://blog.csdn.net/theone10211024/article/details/50435325)

RxBinding
节流(防止按钮的重复点击)
轮询,
定时操作
RxPermissions
RxBus
RxJava与Retrofit

(1)RxBinding

RxBindingJakeWharton大牛用RxJava为Android控件编写的一个控件绑定库。
例子:

Button button = (Button) findViewById(R.id.button); 
RxView.clicks(button).subscribe(new Action1<Void>() { 
@Override 
public void call(Void aVoid) { 
Log.i("test", "clicked"); 
}
 });

(2)防止重复点击

RxView.clicks(button).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Action1<Void>() {
            @Override
            public void call(Void aVoid) {
                Log.i("test", "clicked");
            }
        });

(3)EditText输入请求。避免每次输入产生频繁的请求

RxTextView.textChangeEvents(inputEditText)
      .debounce(400, TimeUnit.MILLISECONDS) 
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted() {
        log.d("onComplete");
    }

    @Override
    public void onError(Throwable e) {
        log.d("Error");
    }

    @Override
    public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
        log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});

(4)RxPermissions

RxPermissions也是国外的大牛开发的基于RxJava的Android权限管理库,他让6.0以上的权限管理更加的简单,如果有适配6.0以上的手机的需求,这个库是个不错的选择。下面我们来看看基本的用法。

 // 请求相机权限
    RxPermissions.getInstance(this)
    .request(Manifest.permission.CAMERA)
    .subscribe(granted -> {
        if (granted) { // 用户同意了(在6.0之前的手机始终都为true)
          //可以拍照了
        } else {
           //可以在这里提示用户,或者再次请求
        }
    });

更多功能研究github吧

(5)RxBus

参考:http://www.jianshu.com/p/ca090f6e2fe2
不多说,上代码

/**
* RxBus
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
    private static volatile RxBus defaultInstance;

    private final Subject<Object, Object> bus;
    // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
    public RxBus() {
      bus = new SerializedSubject<>(PublishSubject.create());
    }
    // 单例RxBus
    public static RxBus getDefault() {
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new RxBus();
                }
            }
        }
        return rxBus;
    }
    // 发送一个新的事件
    public void post (Object o) {
        bus.onNext(o);
    }
    // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
    public <T> Observable<T> toObservable (Class<T> eventType) {
        return bus.ofType(eventType);
//        这里感谢小鄧子的提醒: ofType = filter + cast
//        return bus.filter(new Func1<Object, Boolean>() {
//            @Override
//            public Boolean call(Object o) {
//                return eventType.isInstance(o);
//            }
//        }) .cast(eventType);
    }
}

(6)RxJava 与 Retrofit 结合的最佳实践

参考扔物线文章:http://gank.io/post/56e80c2c677659311bed9841

参考:
1、http://www.jianshu.com/users/df40282480b4/latest_articles
2、http://gank.io/post/560e15be2dca930e00da1083#toc
3、http://www.jianshu.com/p/8cf84f719188

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,667评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,361评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,700评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,027评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,988评论 5 361
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,230评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,705评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,366评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,496评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,405评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,453评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,126评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,725评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,803评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,015评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,514评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,111评论 2 341

推荐阅读更多精彩内容