最近,我抽出了几个晚上的时间,把咖啡和啤酒变成了代码与文字。
引子
三个月以来,我翻译了一些关于RxJava的文章,说实话这些翻译,真的搞得我很头疼,那么现在是时候回来写点什么了。
最近,我在看两本书,《Learning Reactive Programming with Java 8》,《RxJava Essentials》,不过,没关系,我已经买到了电子版,我会在文章结尾附上网盘链接和密码,但我还是希望你将文章继续读下去,因为那是文章结尾的事。
其实关于RxJava的文章和消息远不止我们能了解到的,但又拜英语所赐,所以它看起来又没那么多。好在,国内有许多优秀的开发专家hi大头鬼hi
,BlackSwift,程序亦非猿,Drakeet,扔物线,流火枫林等等在为之做着贡献,以及简直不能更优秀的文章《给 Android 开发者的 RxJava 详解》。
但是,现在,我不得不再次做啰嗦一下,RxJava究竟会改变我们什么。
响应式编程Reactive Programming
什么是响应式编程呢?在Java程序中:
int a = 4;
int b = 5;
int c = a + b;
System.out.println(c); // 9
a = 6;
System.out.println(c);
// 9 again, but if 'c' was tracking the changes of 'a' and 'b',
// it would've been 6 + 5 = 11
当我们改变“a”和“b”的值时,“c”并没有改变。换句话说,“a”和“b”的改变并没有响应到“c”。这就是响应式:程序以流的形式,传递数据的改变。
那我,我们又为什么需要响应式呢?
以下翻译自《Learning Reactive Programming with Java 8》
10-15年前,对于网站开发来说,最平常的日常工作就是进行维护和缩短响应时间,那么今天,一切程序都应该保证七天二十四小时不间断运行,并且能够极快的做出响应;如果你的网站响应慢或者宕机,那么用户将会对你们真爱一秒变备胎,转而选择其他网站服务。当今的慢意味着不可用甚至是有故障的。如今的互联网是在和大数据打交道,所以我们需要快速的处理数据。
过去的几年中HTTP错误已经不是什么新鲜事了,但是现在,我们不得不进行容错机制,还要提供用户易读以及合理的消息更新。
在过去,我们写简单的桌面应用,但如今我们写能够做出快速响应的Web应用。多数情况下,这些应用要与大量的远程服务器进行数据传递。
如果我们想让自己的软件保持竞争性,就不得不实现这些新需求,所以,换言之就是我们应该这样做:
- 模块的/动态的:用这种方式,我们就能够拥有一个七天二十四小时的系统了,因为这些模块能够在不停止整个系统的情况下进行脱机和联机。另外,随着系统的不断庞大,还能帮助我们更好地组织应用结构,同时还能管理底层代码。
- 可扩展的:用这种方式,我们就能够处理大量的数据和用户请求了。
- 容错性:用这种方式,能够为用户提供稳定的系统。
- 响应式:这不仅意味着快速,还意味着可用性强。
让我们思考如何实现它:
- 如果我们的系统是事件驱动型的,那就把它模块化。我们可以将系统分成多个彼此之间通过通知进行交互的微服务/组件/模块。这样,我们就能够以通知为代表,响应系统的数据流了。
- 可扩展意味着能够应对日益增长的数据,在负载的情况下不会崩溃。
- 对故障/错误做出及时的响应,能够提高系统的容错性。
- 响应意味着对能够对用户操作及时的做出反应。
如果应用是事件驱动型的,那么,它就能够解耦成多个自包含组件。这能够帮我们更好的实现扩展性,因为我们总是可以在不停掉或者打断系统的情况下添加新组建或者移除旧组件。如果错误和故障传递给正确的组件,把它们当做通知来处理并作出响应,那么应用能变得更具有容错性和弹性。所以,如果把系统构建成事件驱动型的。我们可以更容易的实现扩展性和容错性,而且一个具有扩展性,低耦合和防错的应用能够快速的响应用户操作。
Reactive Manifesto文档定义了我们刚刚提到的四点响应式准则。每一个响应式系统都应该是消息驱动型(事件驱动型)的。这样它不仅能变得低耦合,而且扩展性和容错性将更高,这就意味着它可靠和具有响应式。
要注意的是,Reactive Manifesto只是描述了一个响应式系统,并不是对响应式编程的定义。当然,你也可以不使用任何响应式类库或者语言,打造一款弹性可扩展,具有消息驱动的响应式应用。
应用程序中数据的变化,以通知的方式传递给正确的Handler。所以,使用响应式构造应用是符遵循Manifesto最简单的方式。
回调地狱
如果你是一个能够时刻保持头脑清醒,逻辑清晰和思维缜密的人,是个Callback
高手,善用并且能够用好FutureTask
。
那么在Android中你的代码可能会频繁的使用async
+callbacks
,或者service composition
+ error handing
。
那么关于异步回调的逻辑,你会写成这样getData(Callback<T>)
、这样Future<T> getData()
,还是这样Future<List<T>> getData()
,甚至这样Future<List<Future<T>>> getData()
,嗷!拜托,我简直不能再举例下去了,这简直就是Callback Hell,这样的程序或许写起来很舒服,但是如何测试和维护呢。
如果哪天你的程序出了问题而必须马上修复,但你却不能马上赶来或者需要别人协助(这在很多公司是很常见的),或者当他人在review你的代码时,那么,是时候拿出这张图了。
然而使用RxJava的操作符,我们可以避免这些烦人甚至糟糕的回调,让结构和思路看起来更清晰,通过组合API,只需要约定最终的结果Observable<T>
就行了。
并且scheduler
的出现,不仅解放了线程的切换,让UI线程与工作线程间的跳转变得简单,而且,它的API很丰,也提供了很多使用场景的建议,比如,适用计算任务的Schedulers.computation( );处理密集IO任务的Schedulers.io( );以及Schedulers.trampoline( )能够有效避免StackOverflowError,所以非常适合函数的递归调用。好了,我不再举例了,因为官方文档已经给出了很详细的解释了,但是值得一提的是,如果使用Schedulers
的工厂方法创建的Worker
,一旦任务执行完毕,都应该调用worker.unsubscribe( )
方法,然后转向之前定义的Scheduler
实例上来。
当然RxJava的出现并不仅仅是为了解决回调地狱的。
这是我通过学习和不断地练习,一路走来很辛苦,总结的一些经验,分享给大家:
1 . error handling
3 . caching (roation)
@Override public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setRetainInstance(true);
/*.cache()操作符:
当第一个subscribe订阅的时候,才会连接原始Observable,缓存事件,重发给后续订阅的subscribe
值得注意的事,它和使用了.replay()操作符的ConnectableObservable的不同。
另外,为了避免内存开销,不建议缓存大量事件*/
cacheObservable = weatherManager.getWeather().cache();
}
@Override public void onViewCreated(View view, Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
cacheObservable.subscribe(/*your subscribe*/);
}
4 . composing multiple calls
5 . more robust interface than asyncTask
6 . easy to do complex threading
7 . functional nature is more expressive
/*一个数组,每个元素乘以2,然后筛选小于10的元素,放入集合中*/
Integer[] integers = { 0, 1, 2, 3, 4, 5 };
/*一般写法,看上去并不是那么的“函数”*/
Integer[] doubles = new Integer[integers.length];
for (int i = 0; i < integers.length; i++) {
doubles[i] = integers[i] * 2;
}
List<Integer> integerList = new ArrayList<>(doubles.length);
for (Integer integer : doubles) {
if (integer < 10) integerList.add(integer);
}
/*Observable写法,一切都好多了*/
List<Integer> funactionalList = Observable.from(integers).map(new Func1<Integer, Integer>() {
@Override public Integer call(Integer integer) {
return integer * 2;
}
}).filter(new Func1<Integer, Boolean>() {
@Override public Boolean call(Integer integer) {
return integer < 10;
}
}).toList().toBlocking().first();
9 . fluent API
10 . easy debugging
//值得一提的是,关于@RxLogSubscriber要放在继承自Subscriber的类上
@RxLogSubscriber class MySubscriber extends Subscriber<Void> {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Void aVoid) {
}
}
//而不是实现Observer接口的类上
@RxLogSubscriber class MySubscriber implements Observer<Void> {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Void aVoid) {
}
}
当然,随着学习的深入,你会发现,收益不止如此。
在响应式编程中,应该牢记以下两点:
-
everything is a stream(一切皆流)
-
don't break the chain(不要打断链式结构)
谈谈Backpressure
Android这种嵌入式系统,尤其是生产者-消费者(producer-consumer)模式中,一定要小心Backpressure(背压,反压)的出现。一个宽泛的解释就是:事件产生的速度比消费快。一旦发生overproducing,当你的链式结构不能承受数据压力的时候,就会抛出MissingBackpressureException异常。
在Android中最容易出现的Backpressure就是连续快速点击跳转界面、数据库查询、文件扫面、键盘输入,甚至联网等操作都有可能造成Backpressure,可能有些情况并不会导致程序崩溃,但是会造成一些我们不想见到的小麻烦。那么一起来看看如何用RxJava解决Backpressure,OK,让我们的程序变得健壮起来吧。
groupBy操作符
在写这篇文章的时候,刚好看到一段代码,看来有必要说一说这个操作符了。
.groupBy( ),分组操作符,虽然目前这个项目中没有用到,但是我还是蛮喜欢它的,而且我看到很多人在使用,将原始Observable根据不同的key分组成多个GroupedObservable
,由原始Observable
发射(原始Observable
的泛型将变成这样Observable<GroupedObservable<K, T>>
),每一个GroupedObservable
既是事件本身也是一个独立的Observable
,每一个GroupedObservable
发射一组原始Observable
的事件子集。
引用自:GroupBy中文翻译
注意:groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃自己的缓存的操作符。
如果你取消订阅一个GroupedObservable,那个Observable将会终止。如果之后原始的Observable又发射了一个与这个Observable的Key匹配的数据,groupBy将会为这个Key创建一个新的GroupedObservable。
那么问题恰恰出在.take(n)操作符上。
只返回前面指定的n项数据,然后发送完成通知,忽略后面的事件。
那么看一下这个例子:
Observable.just(0, 1, 2, 3, 4, 5).groupBy(new Func1<Integer, Boolean>() {
@Override public Boolean call(Integer integer) {
return integer % 2 == 0;
}
}).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(GroupedObservable<Boolean, Integer> groupedObservable) {
return groupedObservable.getKey() ? groupedObservable.take(1) : groupedObservable;
}
}).subscribe(new Action1<Integer>() {
@Override public void call(Integer i) {
System.out.println(i);
}
});
输出结果:
0
1
2
3
4
5
然而在1.0.0-RC5之前的版本中,在GroupedObservable上使用.take(n)操作符将会在发送完n个事件后,对GroupedObservable进行unsubscribe。并且GroupedObservable内部将会记录这个unsubscribed状态,然后忽略后面的事件。所以输出结果将是这样的:
0
1
3
5
而在这之后的版本,使用.take(n)
操作符,虽然也会发生unsubscribe,但是当原始Observable
再次发送一个满足key的事件后,将会重新创建一个GroupedObservable
,然后发送这个GroupedObservable
,不会发生之前那样的,忽略后续事件的现象。
当然,不要忘记,对不感兴趣的GroupedObservable
使用.take(0)
,来避免泄露。
所以,我的建议是,在使用RxJava之前看看官方文档或者change log。
关于RxWeather
我尽量减少对这个工程的文字描述。因为代码才是最好的老师。
通过对Android技术栈,1#架构(译文)和Android架构演化之路(译文)的解读和学习,按照架构和思路进行了实现,并且加入了RxBus。
关于REST API,我选择了和风天气,而放弃了Openweathermap的理由如下:
Openweathermap免费用户所在的服务器不稳定。
付费方面,和风天气更经济实惠。
但是和风天气目前并不支持同时查询多个地区的天气预报,也不支持根据经纬度查询天气预报。但是以后的事情谁又能说的准呢?
由于应用并不支持动态的上拉加载。所以,所有的列表展示结果,取决于city.txt文件。
我从Openweathermap给出的资源(下载city.list.json)中,整理需要的城市Json字符串,整合了经纬度,以备不时之需。
找到了一个通过Location查询所在地的API。
就这样基本实现了列表展示页ListActivity的功能:
根据Loaction查询所在地城市名称,然后查询当地天气。
读取
domain
->assets
->city.txt
,然后依次查询每个城市的天气,所以,这个文件不建议放入太多json。整合1和2的并发请求结果,显示界面。
详情页DetailActivity通过RxBus发送黏性事件接收列表页传递过来的数据,然后进行展示。这里会有七天内的天气以及穿衣建议。由于我么并没有找到一个正确的算法,所以当进入详情页后,旋转屏幕之后的退出动画会有所不同。这个类涉及的代码大部分都是动画(注意Hardware Layer的使用)以及对屏幕旋转的处理,所以代码看起有点多。ForkView使用了一个简单的自定义Behavior
。
搜索界面SearchActivity,输入的关键字请不要以市、区结尾,例如,北京而不是北京市,因为API不支持,我也没办法 :( 。
启动页
我认为,出彩的引导页是对细节的重视,但是我实在不能忍受,在启动页等太久。注意:不要混淆这两种场景。
所以,我在看了正确使用启动页之后,决定采取这种方式实现SplashActivity。而且不建议使用大图,一个icon足以。
Code
所有代码都可以从Github上获得。
片尾Tips:
文章开头提到的资料,需要pdf或者kindle版本的请自行选择下载,不得用于商业用途。
Learning Reactive Programming with Java 8 - pdf版,提取密码:2d88。
Learning Reactive Programming with Java 8 - kindle版,提取密码:5nec。
RxJava Essentials - pdf版,提取密码:z3r8。
RxJava Essentials - kindle版,提取密码:l67e。