有几种模式是你真的想要弄明白的。
Single~SingleObserver 单一发送
CompositeDisposable~DisposableObserver 防止内存泄露
Completable~CompletableObserver
PublishSubject~ConnectableObservable 热观察者
ReplaySubject !Observer重复发送
Flowable 背压
BehaviorSubject~Observer 在订阅的时候接收上一次发送的最后一个数据
~1 single
如果你发现需要用到一个 Observable 的操作符而 Single 并不支持,你可以用 toObservable 操作符把 Single<T> 转换为 Observable<T> 。
<pre>
Single.just("Amit")
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};;
</pre>
执行到
SingleJust的subscribeActual的方法
<pre>
@Override
protected void subscribeActual(SingleObserver<? super T> s) {
s.onSubscribe(Disposables.disposed());
s.onSuccess(value);
}
</pre>
我们可以看到这个只是调用一个,single单一的发送一个数据
single是一个observable ,但不是发射一系列的值,无论是从无处到无限数,它总是发出一个值或一个错误通知。
2~Completable
使用完备时我们忽略OnNext事件,只处理完全和OnError事件
Completable 本质上来说和 Observable 与 Single 不一样,因为它不发射数据。因此 Completable 的操作符也有所区别,最常用的是 andThen 。在这个操作符中你可以传任何 Observable 、 Single 、 Flowable 、 Maybe 或者其他 Completable ,它们会在原来的 Completable 结束后执行。例如。你想执行一些其他操作(Single):
<pre>
apiClient.updateMyData(myUpdatedData)
.andThen(performOtherOperation()) // a Single<OtherResult>
.subscribe(new Consumer<OtherResult>() {
@Override
public void accept(OtherResult result) throws Exception {
// handle otherResult
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception{
// handle error
}
});
</pre>
跟 Single 不同的是 RxJava 不允许直接把 Observable 转换为 Completable,因为没办法知道一个 Observable 什么时候 complete。但是你可以把 Single 转换为 Completable,因为 Single 保证 onComplete 会被调用,这个操作符是 toCompletable 。
3~Flowable 专门处理背压
<pre>
Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);
observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).subscribe(getObserver());
</pre>
3~CompositeDisposable
这个作用其实主要是节省内存用
它提供了了add,clear等方法。
在
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // do not send event after activity has been destroyed
}
防止内存溢出
4PublishSubject
先说一点介绍
一旦订阅了,将所有随后观察到的项发送给订阅方。 如同字面解释一样。 下面是官方的解释
<pre>
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onComplete events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onComplete
subject.subscribe(observer2);
subject.onNext("three");
subject.onComplete();
这个PublishSubject就是我们说的热订阅者。他在订阅了之后就会一直订阅数据。第二次订阅了之后不会从头开始接受数据。而是从后面开始。
</pre>
然后说说这个ConnectableObservable
就是一种特殊的Observable对象,并不是Subscrib的时候就发射数据,而是只有对其应用connect操作符的时候才开始发射数据,所以可以用来更灵活的控制数据发射的时机
需要注意的是如果发射数据已经开始了再进行订阅只能接收以后发射的数据。
<pre>
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(2); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());
</pre>
Replay操作符返回一个Connectable Observable 对象并且可以缓存其发射过的数据,这样即使有订阅者在其发射数据之后进行订阅也能收到其之前发射过的数据。不过使用Replay操作符我们最好还是限定其缓存的大小,否则缓存的数据太多了可会占用很大的一块内存。对缓存的控制可以从空间和时间两个方面来实现。
这个就是还存了两个数据
5ReplaySubject
/* ReplaySubject emits to any observer all of the items that were emitted
* by the source Observable, regardless of when the observer subscribes.
*/
从这个图中我们应该很好理解,当你订阅的时候就会发送数据
<pre>
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 1, 2, 3, 4 for second observer also as we have used replay
*/
source.subscribe(getSecondObserver());
</pre>
因为这个是重复发送数据。当你订阅的时候。就会重复发送
7BehaviorSubject
<pre>
BehaviorSubject<Integer> source = BehaviorSubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete
source.onNext(1);
source.onNext(2);
source.onNext(3);
/*
* it will emit 3(last emitted), 4 and onComplete for second observer also.
*/
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete()
</pre>
<pre>
// observer will receive all 4 events (including "default").
BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onComplete
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onComplete();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
上面试官网的一些例子。 从这里可以看到是BehaviorSubject的一些特性。他接受最后一个
</pre>
重点
completes方法之后,才开始接受最后一个发送的数据
<pre>
AsyncSubject<Integer> source = AsyncSubject.create();
source.subscribe(getFirstObserver()); // it will emit only 4 and onComplete
source.onNext(1);
source.onNext(2);
source.onNext(3);
/*
* it will emit 4 and onComplete for second observer also.
*/
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();
</pre>
4~reduce可以相互转化