使用ReactiveX进行移动开发

前言


随着移动互联网的发展, 程序复杂度越来越高, 很多的PC端和Web端的开发技术被引入到移动端, ReactiveX并不是什么新技术, 但在移动开发上的使用却是近来兴起的, 就像热补丁技术一样.
本文旨在分析ReactiveX的长处, 并分析ReactiveX可以解决哪些移动开发中遇到的问题.

什么是ReactiveX


简单的说, ReactiveX (Rx) 是一个简化异步调用的库. Rx是从微软的函数响应式编程库(Reactive Extensions)发展而来的, 提供了一种新的组织和协调异步事件的方式. 例如协调多个从网络上返回的多个异步的数据流, Rx能够是的我们用一个简单的方式来处理这些数据流, 极大的简化了代码的编写.
Rx作为一个通用库, 现在已经多种语言的实现版本(都是开源的), 包含RxJava, RxCpp, RxSwift, RxKotlin, RxGroovy, RxJavaScript等, 具体可以参考所有支持语言.

Reactive Extensions 原始描述: The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

ReactiveX基础知识


1. Observable和Observer

在Rx里, 两个最主要的角色就是Observable和Observer, 跟Java的观察者模式非常类似, 虽然会有细微的区别, 但作为初学的话, 将他们当成一个东西更方便理解.
Observable代表一个事件源(也可以叫被观察者), 可以被订阅.
Observer代表一个订阅者(也可以叫观察者), 订阅Observable, 获取数据.
Observable是一个类, 有个subscribe方法, 接收一个Observer类型的参数, 用于订阅事件.
Observer是一个接口(在iOS里是protocol, 一个意思), 有个onEvent接口, 当Observable发出事件时被调用, 使用者实现此接口来处理事件.

示例: �事件源和订阅者
Java

//从数组创建一个Observable
Integer[] arr = {1, 2, 3};
Observable<Integer> observable = Observable.from(arr);

//订阅前面创建在Observable
observable.subscribe(new Subscriber<Integer>() {
//Subscriber是抽象类, 继承了Observer接口, 但是未实现其中的方法
    @Override
    public void onNext(Integer item) {
        //处理接收到的事件
        System.out.println(item);
    }

    @Override public void onCompleted() { }
    @Override public void onError(Throwable e) { }
});

Swift:

//从数组创建一个Observable
let observable = [1, 2, 3].toObservable()

//订阅前面创建在Observable
observable.subscribe(onNext: { item in
        print(item)
    }, onError: { error in
        print(error)
    }, onCompleted: { 
        //Observable is complete
    })

在Rx里, 几乎所有的方法都返回一个Observable, 在RxJava和RxSwift里, Observable都是泛型. Observable发出有三种事件: 一种是Next事件, 可能有一个或多个; 事件的数据类型就是绑定的泛型, 一种是Error事件, 最多1个该事件, 表示发生了错误, 在RxJava里是Throwable, 在RxSwift里是ErrorType; 还有一种是Complete事件, 最多1个该事件, 表示该Observable完成了. Error事件和Complete事件是二选一, 二者有且只有一个.
如果只关心Next事件, 有重载方法可以单独订阅Next事件.

示例: 只订阅Next事件
Java

Integer[] arr = {1, 2, 3};
Observable<Integer> observable = Observable.from(arr);

observable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer item) {
        System.out.println(item);
    }
});

Swift:

let observable = [1, 2, 3].toObservable()

observable.subscribe(onNext: { item in
        print(item)
    })

2. 几个常用的操作符

在Rx里, 事件流经常需要转换和综合, 比如转换类型(将整形转成字符串型), 又比如合并两个事件源成一个. Rx里有很多方法和函数用操作事件流, 这些方法叫做操作符.
所有的操作符都是将原Observable转换成另一个新的Observable, 每个操作符方法的返回值都是Observable类型, 但是新的Observable绑定的泛型可能不同于源Observable的泛型, 例如源Observable的具体类型可能是Observab<Integer>, 转换后的具体类型可能是Observable<String>.
为了方便描述, Observable发生Next事件时, 我们就说Observable发射(emit)了一个元素(item).

map

map操作符可以将Observable里元素转换成另一种元素. 例如, 我们可以将刚刚创建数字一一映射成字符串.

示例: 将数字一一映射成字符串
Java:

Integer[] arr = {1, 2, 3};
Observable.from(arr);
.map(new Func1<Integer, String>() {
    @Override
    public String call(Integer item) {
        return "number is: " + item;  //将每个元素映射成一句话
    }
})
.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer item) {
        System.out.println(item);  // will print: the number is x
    }
});

Swift:

[1, 2, 3].toObservable()
    map { item in 
        "number is \(item)"  //将每个元素映射车一句话
    }
    .subscribe(onNext: { item in
        print(item)  // will print: the number is x
    })

flatMap

flatMap操作符稍微复杂一点, 为了便于理解, 我们将其逻辑拆成两步:
第一步相当于map, 但是每个元素都被映射成了一个子Observable, 如果没有第二步, 那么每个Next事件都会是一个子Observable, 订阅者将会收到很多个子Observable.
第二步就是让订阅者收到的不是子Observable, 而是子Observable里的元素, 是由于flatMap创建了一个外层Observable代替其内部的子Observable发射其元素, 这个操作我们称为拉平, 这个外层Observable最终将会返回给调用者.

示例: 将每个数字item转换成能发射对应个数元素的Observable
Java:

Integer[] arr = {1, 2, 3};
Observable.from(arr);
.flatMap(new Func1<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Integer item) {
        //调用另外方法创建
        return createRangeObservable(item);
    }
});
.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer item) {
        System.out.println(item+",");
    }
});

...
// 创建一个Observable, 会发射出number个元素, 从0开始
private Observable<Integer> createRangeObservable(Integer number) {
    List<Integer> list = new ArrayList<>();
    for (int i = 0; i < number; ++i) {
        list.add(i);
    }
    return Observable.from(list);
}

Swift:

[1, 2, 3].toObservable()
    .flatMap { item in
        从Range创建Observable
        (0..<item).toObservable()
    }
    .subscribe(onNext: { item in
        print(item, terminator: ",")
    })

运行结果:
0,0,1,0,1,2,

merge

merge很简单, 就是将多个Observable合成一个, 要求这些Observable的泛型相同

示例: 合并多个Observable
Java:

Integer[] arr1 = {1, 2, 3};
Observable<Integer> observable1 = Observable.from(arr1);
Integer[] arr2 = {4, 5, 6};
Observable<Integer> observable2 = Observable.from(arr2);
Integer[] arr3 = {7, 8, 9};
Observable<Integer> observable3 = Observable.from(arr3);

Observable.merge(observable1, observable2, observable3)
.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer item) {
        System.out.println(item+",");
    }
});

Swift:

let observable1 = [1, 2, 3].toObservable()
let observable2 = [4, 5, 6].toObservable()
let observable3 = [7, 8, 9].toObservable()
Observable.of(observable1, observable2, observable3).merge()
    .subscribe(onNext: { item in
        print(item)
    })

运行结果:
1,2,3,4,5,6,7,8,9,

使用ReactiveX解决几个实际问题


1. 优雅的解决 "回调地狱"

在实际项目开发中, 我们应该经常遇到互相依赖的异步操作, 即A操作完成后要执行B操作, B操作完成后又要执行C操作, 但是ABC操作都是异步的, 这个时候就会出现内嵌很多层的回调, 就是著名的 "回调地狱" 问题, 最典型的就是我们经常遇到一个列表的数据要多条协议才能拉回来的问题, 使用Rx可以很优雅的解决这类问题.

示例: 简化多个数据合并, 避免回调地狱.
不用Rx是这样写的:
Java:

private void updateDataList() {
    DataAccess.requestData1(myUserId, new CallBack1(){
        @Override
        public void onData(final Struct1[] list1) {
            DataAccess.requestData2(list1, new CallBack2(){
                @Override
                public void onData(final Struct2[] list2) {
                    DataAccess.requestData3(list2, new CallBack3(){
                        @Override
                        public void onData(final Struct3[] list3) {
                            updateUI(list3)
                        }
                    })
                }
            })
        }
    })
}

Swift:

private func updateDataList() {
    DataAccess.requestData1(myUserId) { list1 in
        DataAccess.requestData2(list1) { list2 in
            DataAccess.requestData3(list2) { [weak self] list3 in
                self?.updateUI(list3)
            }
        }
    }
}

用了Rx之后是这样写的:
Java:

private void updateDataList() {
    RxDataAccess.requestData1(myUserId)
        .flatMap(new Func1<List<Struct1>, Observable<List<Struct2>>>() {
            @Override
            public Observable<List<Struct2>> call(final List<Struct1> list) {
                return RxDataAccess.requestData2(list)
            }
        })
        .flatMap(new Func1<List<Struct2>, Observable<List<Struct3>>>() {
            @Override
            public Observable<List<Struct3]> call(final List<Struct2> list) {
                return RxDataAccess.requestData3(list)
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer item) {
                updateUI(list)
            }
        })
}

Swift:

private func updateDataList() {
    RxDataAcess.requestData1(myUserId)
        .flatMap { list in 
            RxDataAcess.requestData2(list)
        }
        .flatMap { list in
            RxDataAcess.requestData3(list)
        }
        .subscribe(onNext: { [weak self] list in
            self?.updateUI(list)
        })
}

2. 快速实现简单需求

示例: 即时搜索天气


WeatherExampeImage

功能说明: 在文本框输入城市名字, 一边输入一边搜索, 输完立马显示该城市的天气, (为了简单这里先不做关键字联想的逻辑, 要加也是很简单的事). 这里使用了openweathermap作为数据来源.
Java:

searchText  //searchText是输入框里变化的文本, 类型为Observable<String>
    //debounce这里作用是用户停手0.3秒后才启动搜索
    .debounce(300, TimeUnit.MILLISECONDS)
    //distinctUntilChanged这里的作用是当文本内容发生变化时才启动搜索
    .distinctUntilChanged()
    .flatMap(new Func1<String, Observable<Weather>>() {
        @Override
        public Observable<Weather> call(String searchString) {
            //weatherService是一个封装了拉取天气数据的类, 返回Observable<Weather>是搜索结果, Weather是一个数据类
            return weatherService.search(searchString);
        }
    })
    .subscribe(new Action1<Weather>() {
        @Override
        public void call(Weather weather) {
            //从Weather数据类里提取字段
            String cityName = weather.cityName  //城市名称
            float temp = weather.currentWeather.temp  //温度
            String description = weather.currentWeather.description //天气描述
            String weatherImageUrl = "http://api.openweathermap.org/img/w/" + weather.currentWeather.imageID + ".png"  //天气图标
            String backgroundImageUrl = "http://api.openweathermap.org/data/2.5/forecast/" + weather.currentWeather.imageID + ".png"  //背景图片

            //更新界面
            updateUI(cityName, temp, description, weatherImageUrl, backgroundImageUrl)
        }
    });

Swift:

searchText  //searchText是输入框里变化的文本, 类型为Observable<String>
    //debounce这里作用是用户停手0.3秒后才启动搜索
    .debounce(0.3, scheduler: MainScheduler.instance)
    //distinctUntilChanged这里的作用是当文本内容发生变化时才启动搜索
    .distinctUntilChanged()
    .flatMap { searchString -> Observable<Weather> in
        //weatherService是一个封装了拉取天气数据的类, 返回Observable<Weather>是搜索结果, Weather是一个数据类
        return weatherService.search(withCity: searchString)
    }
    //订阅搜索结果
    .subscribe(onNext: { [weak self] weather in
        //从Weather数据类里提取字段
        let cityName = weather.cityName  //城市名称
        let temp = weather.currentWeather.temp  //温度
        let description = weather.currentWeather.description //天气描述
        let weatherImageUrl = "http://api.openweathermap.org/img/w/" + weather.currentWeather.imageID + ".png"  //天气图标
        let backgroundImageUrl = "http://api.openweathermap.org/data/2.5/forecast/" + weather.currentWeather.imageID + ".png"  //背景图片
        
        //更新界面
        self?.updateUI(cityName, temp, description, weatherImageUrl, backgroundImageUrl)
    })

3. 轻松拆分综合异步事件流

Rx不仅可以合并多路异步事件流, 还可以拆分一路异步事件流为多路, 多路并行发展之后又可以重新综合起来, 这里举一个更接近实际项目的稍复杂一点的例子.

示例: 拆分过后再综合异步事件流
功能说明: 这个需求首先要从服务器拉去一个符号特定条件的商品的id列表, 比如所有打折的商品, 之后再用这个id列表去服务器拉这些商品的详情, 不过有一个限制, 这些商品详情服务器可能不支持一次性拉这么多, 有一个最大条数限制(比如6个), 可能是为了防止包过大, 但是由于产品需要, 客户端要求全部拉回这些商品详情才能展示.
思路: 我们的做法是将拉回来的列表先拆成多组, 最多6个一组, 然后按组去服务器拉详情, 所有的组拉回来之后, 我们在对这些结果综合重组.
Java:

GoodsManager.requestGoodsId
    .buffer(6)
    .flatMap(new Func1<List<Integer>, Observable<List<GoodsInfo>>>() {
        @Override
        public Observable<List<GoodsInfo>> call(List<Integer> splitIds) {
            return GoodsManager.requestGoodsDetail();
        }
    })
    .reduce(new ArrayList<GoodsInfo>(), new Func2<List<GoodsInfo>, List<GoodsInfo>, List<GoodsInfo>>() {
        @Override
        public List<GoodsInfo> call(List<GoodsInfo> result, List<GoodsInfo> item) {
            result.addAll(item);
            return result;
        }
    })
    .subscribe(new Action1<List<GoodsInfo>>() {
        @Override
        public void call(List<GoodsInfo> list) {
            updateUI(list)
        }
    });

Swift:

let pageSize = 6
GoodsManager.requestGoodsId(type: Discount)
    .buffer(timeSpan: 0, count: pageSize, scheduler: MainScheduler.instance)
    .flatMap { splitIds in
        GoodsManager.requestGoodsDetail(ids: splitIds)
    }
    .reduce([]) { (result, item) -> [GoodsInfo] in
        result + item
    }
    .subscribe(onNext: { [weak self] goodsInfo in
        self?.updateUI(goodsInfo)
    })

用Rx这么简短就解决了, 假如不用Rx你会怎么做? 想想就知道很麻烦.

总结:


以上只是随便列举了几个用Rx解决的实际, Rx能解决的实际问题远不止于此, 希望大家多多尝试, 如果有更好的案例, 欢迎分享给我, 谢谢大家 !

下面我们回顾总结一下Rx的特点.
Rx的长处是处理异步事件流, 虽然其接口简单, 但较之于普通编程, 在思维上却是一次极大的跳跃, 在Rx里, 原有的程序逻辑有了一套全新的编程方式.

  1. 所有变化都抽象成事件, 例如从回来的数据网络, 一次按钮点击, 输入框文字发生变化, 或一个系统广播.
  2. 数据由拉变为推, 普遍编程的数据一般是从函数返回的, 相当于调用者主动拉回来的, 而在ReactiveX里, 调用函数时不会直接得到数据, 得到的是一个信号源, 调用者订阅这个信号源, 被动的等待数据推过来, 在ReactiveX里, 所有的程序逻辑都是这样的.
  3. 没有回调, 统一使用Observable, 在Rx里, 所有的方法和函数都返回一个Observable, 调用者订阅这个Observable就可以得到操作结果.
  4. 统一的错误处理方法, 不在需要定义自己的错误处理回调, 有一个统一的onError方法, 所有错误通过onError传回, 包括抛出的异常.

后语:


学Rx有一段时间了, 此文一方面作为学习的总结整理, 另一方面作为知识分享, 希望对大家的日常开发有所帮助. 由于作者水平有限, 文章的可能存在瑕疵和纰漏, 欢迎大家批评指正.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容