响应式编程RxJava(四)

3.组合

(1) Merge
merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。你可以简单的将它理解为两个Obsrvable合并成了一个Observable,合并后的数据是无序的。

Paste_Image.png
        String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
        String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
        Observable<String> merge = Observable.merge(Observable.from(array1),Observable.from(array2));
        merge.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.e("main","输出结果:"+s);

            }
        });
结果输出:
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:kpioneer
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Tiger
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Cook
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Zhang
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Haocai
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:WangWu
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Zhangsan
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Lisi
08-08 08:50:12.518 3958-3958/com.haocai.architect.rxjava E/main: 输出结果:Luo

在Observable中,一旦某一个事件抛异常,后面的序列将会终止
注意:mergeDelayError如果你希望在序列中出错的时候,不影响后面的序列,那么可以使用mergeDelayError方法

Paste_Image.png

(2) Zip
zip(Observable, Observable, Func2)用来合并两个Observable发射的数据项,根据Func2函数生成一个新的值并发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停在发射数据。

简单来说zip操作符就是合并多个数据流,
然后发送(Emit)最终合并的数据。

流程图:

Paste_Image.png
        String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
        String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
        //1.合并算法自己决定
        //2.序列长度有原始的最小数组的长度决定
        Observable<String> zip = Observable.zip(Observable.from(array1),Observable.from(array2), new Func2<String, String, String>() {
            @Override
            public String call(String s, String s2) {
                //这个Func是我们合并数组算法
                return s+"--"+s2;
            }
        });
        zip.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.e("main","输出结果:"+s);

            }
        });

结果输出:
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:kpioneer--WangWu
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:Tiger--Zhangsan
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:Cook--Lisi
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 输出结果:Zhang--Luo

同时发现Haocai 那项没有输出 以最小数组为单位
        String[]  array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
        String[]  array2 = {"WangWu","Zhangsan","Lisi","Luo"};
        Integer[] array3 = { 100, 1000, 10000 };

        Observable<String> zip = Observable.zip(Observable.from(array1),Observable.from(array2),Observable.from(array3), new Func3<String, String,Integer, String>() {


            @Override
            public String call(String s, String s2, Integer integer) {
                return s+"---"+s2+"---"+integer;
            }
        });
        zip.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.e("main","输出结果:"+s);

            }
        });

结果输出:
08-08 10:02:06.519 4330-4330/com.haocai.architect.rxjava E/main: 输出结果:kpioneer---WangWu---100
08-08 10:02:06.519 4330-4330/com.haocai.architect.rxjava E/main: 输出结果:Tiger---Zhangsan---1000
08-08 10:02:06.520 4330-4330/com.haocai.architect.rxjava E/main: 输出结果:Cook---Lisi---10000

(3) Join
前面两个方法,zip()和merge()方法作用在发射数据的范畴内,在决定如何操作值之前有些场景我们需要考虑时间的。RxJava的join()函数基于时间窗口将两个Observables发射的数据结合在一起。
join(Observable, Func1, Func1, Func2)我们先介绍下join操作符的4个参数:

Observable:源Observable需要组合的Observable,这里我们姑且称之为目标Observable;
Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据组合后返回。
所以Join操作符的语法结构大致是这样的:onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)

Paste_Image.png
        String[]  array1 = {"A","B","C"};
        String[]  array2 = {"1","2"};

        Observable<String> observable1 = Observable.from(array1);
        Observable<String> observable2 = Observable.from(array2);

        Observable<String> join = observable1.join(observable2, new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        }, new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(2,TimeUnit.SECONDS);
            }
        }, new Func2<String,String,String>() {
            @Override
            public String call(String o, String o2) {
                return o +"---" +o2;
            }
        });
        join.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.e("main","输出结果:"+s);

            }
        });

结果输出:
08-08 12:21:13.407 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:A---1
08-08 12:21:13.407 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:B---1
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:C---1
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:A---2
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:B---2
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 输出结果:C---2

4.线程

(1)Scheduler
默认情况下,RxJava遵循线程不变原则。即:在哪个线程调用subscribe()方法,就在哪个线程生产事件,在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到Scheduler(调度器)。
  在RxJava中,Scheduler相当于线程控制器,RxJava通过它来指定每一段代码运行在什么线程中。RxJava内置了几个Scheduler,适合大多数使用场景:

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。
Android 专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn()和 observeOn()两个方法来对线程进行控制了。

subscribeOn(): 指定subscribe()(订阅/注册)所发生的线程,即 Observable.OnSubscribe被激活时所处的线程。或者叫做事件产生的线程。

observeOn(): 指定Subscriber(观察者)所运行在的线程。或者叫做事件消费的线程。

文字叙述总归难理解,上代码:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程(即 Action1对象)
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面这段代码中,由于 subscribeOn(Schedulers.io())的指定,被创建的事件的内容 1、2、3、4将会在 IO 线程发出;而由于observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber数字的打印将发生在主线程 。事实上,这种在subscribe()之前写上两句 subscribeOn(Scheduler.io())和 observeOn(AndroidSchedulers.mainThread())的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

已加载图片为例:

加载图片传统方式写法:

private ImageView iv_image;
    private static String URL_STR ="http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_scheduler);
         iv_image = (ImageView)findViewById(R.id.iv_image);
        loadImage();
    }
    //传统方式:下载图片
    //方案一: Thread+Handler
    //方案二: AsyncTask

    private Bitmap download(String urlString) {
        try {
            URL url = new URL(urlString);
            HttpURLConnection connection = (HttpURLConnection) url
                    .openConnection();
            InputStream inputStream = connection.getInputStream();
            return BitmapFactory.decodeStream(inputStream);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private void loadImage(){
        new DownloadTask().execute();
    }
    class DownloadTask extends AsyncTask<Void,Void,Bitmap>{

        @Override
        protected Bitmap doInBackground(Void... params) {
            return download(URL_STR);
        }

        @Override
        protected void onPostExecute(Bitmap bitmap) {
            super.onPostExecute(bitmap);
            iv_image.setImageBitmap(bitmap);
        }
    }

RxJava方式写法:

   private static String URL_STR = "http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";
   private ImageView iv_image;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_scheduler);
        iv_image = (ImageView) findViewById(R.id.iv_image);
        rxJavaLoadImage();
    }
    //RxJava实现下载
    private void rxJavaLoadImage() {
        //Url地址变换成Bitmap
        //我们需要指定这些事件的执行所在的线程
        Observable.just(URL_STR).map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String s) {
                return download(s);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())//需要引入RxAndroid库
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {
                        //更新UI
                        iv_image.setImageBitmap(bitmap);
                    }
                });
    }

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。

注意:
1.在RxJava中整个流程分为事件的生产和消费
2.RxJava整体架构分为4个角色:Observable、Observer、Subscriber、Subjects
其中Observables和Subjects是两个“生产”实体,Observers和Subscribers是两个“消费”实体

subscribeOn和observeOn线程控制的区别?--线程切换
1.subscribeOn:指定生产事件所在的线程
observeOn :指定消费事件所在的线程

2.subscribeOn:按照顺序执行序列,作用于他前后的序列,直到遇到observeOn才切换新的线程
onserveOn :按照顺序执行序列,只能作用于他之后的序列

注意:subscribeOn可以在序列中调用(执行)多次,但是前提条件(在生产序列之前调用,在访问网络之前,我们需要初始化一些UI)

例子证明:

   //RxJava实现下载
    private void rxJavaLoadImage() {
        //Url地址变换成Bitmap
        //我们需要指定这些事件的执行所在的线程
        Observable.just(URL_STR).doOnSubscribe(new Action0() {

            @Override
            public void call() {
                // 在执行生产事件之前,回调方法,我们需要做一些初始化工作,而这个工作可以在子线程,也可以在主线程
                // 更新UI必须在主线程
                iv_image.setVisibility(View.VISIBLE);
                // 第一步:初始化
                Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
            }
        }).subscribeOn(AndroidSchedulers.mainThread()).observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String s) {
                        Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
                        return download(s);
                    }
                }).map(new Func1<Bitmap, Bitmap>() {
            @Override
            public Bitmap call(Bitmap t) {
                Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
                // 加水印、裁剪、灰度处理等等(图像处理相关)......
                // 第三步:图像处理
                return t;
            }
        }).observeOn(AndroidSchedulers.mainThread())//需要引入RxAndroid库
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {
                        Log.e("main", "当前线程状态 : " + Thread.currentThread().getName());
                        //更新UI
                        iv_image.setImageBitmap(bitmap);
                    }
                });
    }

结果输出:
08-09 06:53:29.852 17616-17616/com.haocai.architect.rxjava E/main: 当前线程状态 : main
08-09 06:53:29.854 17616-17647/com.haocai.architect.rxjava E/main: 当前线程状态 : RxIoScheduler-2
08-09 06:53:29.983 17616-17647/com.haocai.architect.rxjava E/main: 当前线程状态 : RxIoScheduler-2
08-09 06:53:30.072 17616-17616/com.haocai.architect.rxjava E/main: 当前线程状态 : main
#其中main表示主线程, RxIoScheduler表示子线程
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,451评论 7 62
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,158评论 6 151
  • 作者寄语 很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,...
    戴定康阅读 7,613评论 13 85
  • 响应式编程简介 响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者...
    说码解字阅读 3,050评论 0 5
  • 隔了一周时间才来写(三),听说舍友胖子都等急了,期末考试备战太辛苦了,今晚想换换脑子调整一下。 11.求友不专,则...
    春风凉意阅读 300评论 0 4