Retrofit2+Rxjava-Rxjava2.x-篇一-用起来

从Retrofit基本用法MonkeyLei:Retrofit+Rxjava-以自己的方式重头开始-入门篇 -> 到Convert MonkeyLei:Retrofit+Rxjava-ConverterFactory-篇一-先了解一下 -> 到CallAdapter MonkeyLei:Retrofit+Rxjava-CallAdapterFactory-篇一-先简单自定义看看 -> 到今天的RxJava ReactiveX/RxJava(看了一眼,东西还蛮多。头一回正儿八经的看,有些见都没见过。只怪平时搬砖太多了~~~~) - 这篇官方也看,砖也搬,主要是官方东西太多了,段时间搞不定!想快速搞搞...

RxJava Javadoc 2.2.9 - 小萌新会先了解下基本的介绍呀,用法呀。然后重点是Retrofit+RxJava的结合用法,然后基本上串联起来!之后补一些遗漏的基本知识,再之后就是深入和封装。

开始前,我们复习一下Retrofit的基本用法流程,以及过程中部分的Okttp的知识

直接上代码:都是之前的案例,新建了一个页面来实践 FanChael/RxNet

Repo.kt

package com.hl.rxnettest

class Repo{
    var id: Long = -1
    public var name: String? = null
    var full_name: String? = null
    // class owner - not need
    var html_url: String? = null
    var description: String? = null
} 

GitHubService.kt

package com.hl.rxnettest

import okhttp3.ResponseBody
import retrofit2.Call
import retrofit2.http.Body
import retrofit2.http.GET
import retrofit2.http.POST
import retrofit2.http.Path

interface GitHubService {
    // 添加GsonConverterFactory解析器 - 返回Json解析的对象列表
    @GET("users/{user}/repos")
    fun listRepos(@Path("user") user: String): Call<List<Repo>>
}

Main2Activity.kt

package com.hl.rxnettest

import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import okhttp3.OkHttpClient
import retrofit2.Call
import retrofit2.Callback
import retrofit2.Response
import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory
import java.io.IOException

class Main2Activity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 布局都不依赖了
        // setContentView(R.layout.activity_main2)
        // setContentView(View(this))

        // 1\. 创建Retroft实例对象
        var retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://api.github.com/")
                .build();

        // 2\. 创建GitHub请求服务
        var gitHubService = retrofit.create(GitHubService::class.java);

        // 3\. 获取服务对应的某个请求实例
        var gitCall  = gitHubService.listRepos("FanChael");

        // 4.1 UI线程不能进行同步请求,ANR
        // var executeRst = gitCall.execute();
        // 4.2\. 进行异步请求
        gitCall.enqueue(object : Callback<List<Repo>>{  // object的作用是调用内部匿名类
            override fun onResponse(call: Call<List<Repo>>?, response: Response<List<Repo>>?) {
                // {protocol=http/1.1, code=200, message=OK, url=https://api.github.com/users/FanChael/repos}
                Log.e("Main2Activity", "" + response)
                Log.e("Main2Activity", "" + (response?.body() ?: ""))
                var repoList = response?.body();
                for (item in repoList!!) {
                    // SuperStartElectronic subtitle display - 电子字幕展示-接机、演唱会、见面、展示专用.https://github.com/FanChael/SuperStart
                    Log.e("Main2Activity", item.name + item.description + item.html_url)
                }
            }
            override fun onFailure(call: Call<List<Repo>>?, t: Throwable?) {
            }
        })
        // 4.3 获取okhttp3.Request原始请求对象实例 - 用原始请求对象进行请求
        var request = gitCall.request()
        // --然后创建一个Call对象,用于网络请求
        var okHttpCall = OkHttpClient().newCall(request)
        // ---然后进行异步请求
        okHttpCall.enqueue(object: okhttp3.Callback{
            override fun onFailure(call: okhttp3.Call?, e: IOException?) {
            }

            override fun onResponse(call: okhttp3.Call?, response: okhttp3.Response?) {
                // [{"id":140240588,"node_id":"MDEwOlJlcG9zaXRvcnkxNDAyNDA1ODg=",....]
                Log.e("Main2Activity", "okhttp3: " + (response?.body()?.string() ?: ""))
            }

        }) // 我们采用Okhttp的方式进行请求调用,与Retrofit如出一辙,Retrofit进行了包装,简化了请求定义!

    }
}

其中Retrofit也返回了原始的Okhttp请求实例,那样某些用户就可以进行个性化定制,想到还是周到嘛。虽然我们不用,但是其他人可能需要妮!

image

OkHttp的知识可以了解下,好早以前,小萌新也有用过,当时主流的是XUtils请求框架,不过该框架已经停止维护了!都是大佬,点个赞!https://square.github.io/okhttp/

image

如果要想自己,就看官网的案例,简单过过,然后看API就可以花时间研究了.... 再懒点就看网友的分析: OKhttp3 的同步和异步请求基本用法

另外小萌新之前记录了一个关于https认证的问题 MonkeyLei:Android-Okhttp3之https证书未认证的问题(转载完善,修复过时方法)

Now,Let's Start....

  1. 添加依赖呀...最新2.2.9
image

2. Hello World走起, 此时Flowable不知道是啥,不过案例可以跑起来,一调用just,哇,好多参数提示哟...厉害..

image
        // lambdas的搞法..Flowable为何物?
        Flowable.just("hello world", "hello world2", "hello world3")
                .subscribe(System.out::println)
        // 不支持java8的lambdas的用法
        Flowable.just("hello world all")
                .subscribe(object: Consumer<String> {
                    override fun accept(t: String) {
                        System.out.println(t);
                    }
                })
image

3. 此时看看RxJava都提供了哪些类了

Base classes
RxJava 2 features several base classes you can discover operators on:

io.reactivex.Flowable: 0..N flows, supporting Reactive Streams and backpressure
io.reactivex.Observable: 0..N flows, no backpressure,
io.reactivex.Single: a flow of exactly 1 item or an error,
io.reactivex.Completable: a flow without items but only a completion or error signal,
io.reactivex.Maybe: a flow with no items, exactly one item or an error.

小萌新会先了解下Flowable、然后Single、Completable、Maybe简单看看先。完事了重点看下Observable,因为之前项目都是它.

3.1 [Flowable](https://link.zhihu.com/?target=http%3A//reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html) - 0..N flows, supporting Reactive-Streams and backpressure

Reactive-Streams operates with Publishers which Flowable extends. Many operators therefore accept general Publishers directly and allow direct interoperation with other Reactive-Streams implementations.

The Flowable hosts the default buffer size of 128 elements for operators, accessible via [bufferSize()](https://link.zhihu.com/?target=http%3A//reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html%23bufferSize--), that can be overridden globally via the system parameter rx2.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.

The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:

image

For more information see the ReactiveX documentation.

一脸懵逼:大概说什么响应式,缓存buffer-size啥的 - 支持响应式和背压 (Look it. https://blog.csdn.net/weixin_33691598/article/details/86861027 )。看网友的分析吧。。先了解各大概吧..

image

Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解

人家顺便把其他几个简化版的单一处理的方式的Observable也说了。厉害了。。我忒么一点不懂呀!希望后面自己有所感悟...链接先放这里。自己回头感悟感悟再来膜拜一下可好...

Rxjava2入门教程六:Single、Completable、Maybe——简化版的Observable

https://blog.csdn.net/fengluoye2012/article/details/79297186

3.2 [Observable](https://link.zhihu.com/?target=http%3A//reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html) - 0..N flows, no backpressure

**[Flowable](https://link.zhihu.com/?target=http%3A//reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html)**比较完善,是RxJava2.0相对1.0的改进,支持了多个响应,同时支持背压处理,保证发射的很多数据不会因为积压问题导致内存溢出等问题,但是同时也带来了性能和处理速度的问题,这种一般适合相对大的项目大的处理请求的情况。而Observable则不支持背压处理,像我们Android的话,每个页面数据相对都是比较单一,不会很大的数据发射以及处理,所以一般看到的都是Observable的方式。Flowable就相对陌生多了....

当一段时间内项目相对较少(<1000)时使用Observable和/或没有生产者过度消费消费者的风险,从而导致OOM. 当您拥有相对大量的项目时使用Flowable,您需要仔细控制Producer的行为方式,以避免资源耗尽和/或拥塞. 背压
如果你有一个可观察到的物品如此快速地发射物品,消费者无法跟上流量,导致已发射但未消耗的物品存在.

背压策略处理的是由观察者发出但未被订户消费的未消耗物品如何被管理和控制. 

ReactiveX/RxJava 上面也有如似说明

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated for the non-backpressured 
operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they;
 there is always room to store one item temporarily.

3.1/3.2 跟着一起练习下怎么用吧,官方的看着好难呀,去搬砖一下别人写的好的吧... https://mp.weixin.qq.com/s?__biz=MzIwMzYwMTk1NA%3D%3D&mid=2247484711&idx=1&sn=c3837b7cad21f0a69d7dccd1aaaf7721&chksm=96cda46aa1ba2d7ce145472449e5a832cd3ac0bc1f766fe09f1ce68ca46c16bab645e507f0b5

然后来个练习熟悉下Flowable(说的不当的注释后面补救):

       // 上游Flowable
        var upstream = Flowable.create(object: FlowableOnSubscribe<Integer> {
            override fun subscribe(emitter: FlowableEmitter<Integer>) {
                for (index in 1..10000){
                    emitter.onNext(Integer(index))
                }
                emitter.onComplete()
            }
        }, BackpressureStrategy.BUFFER)
        // 为上游指定一个线程 - 又比如 -> .subscribeOn(Schedulers.io())
        upstream.subscribeOn(Schedulers.newThread()) // Now, 不懂
        // 为下游指定一个线程 - 比如Android指定AndroidUI线程 -> .observeOn(AndroidSchedulers.mainThread())
        upstream .observeOn(Schedulers.newThread())
        upstream.subscribe(object: Subscriber<Integer>{
            override fun onComplete() {
                Log.e("rxjava", "onComplete");
                Toast.makeText(this@Main2Activity, "OK", Toast.LENGTH_SHORT).show()
            }

            override fun onSubscribe(s: Subscription?) {
                // 发起请求尼 - 跳过去看源码不懂,反正就是:只有调用了这个方法上游才会发送事件
                // n the strictly positive number of elements to requests to the upstream - 需要一个严格的正数去请求上游发送数据?
                s?.request(java.lang.Long.MAX_VALUE)
            }

            override fun onNext(t: Integer?) {
                // 一秒钟处理一条
                Thread.sleep(1000)
                Log.e("rxjava", "upstream's value=" + t);
            }

            override fun onError(t: Throwable?) {
            }

        })

其中作者关于request有解释,小萌新看了下,不错,还是比较好理解一点的:

  这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量和控制速度不太一样, 
这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 
然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10),  叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 
然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...
  所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 
就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 
而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !
   但是太完美的东西也就意味着陷阱也会很多, 你可能只是被它的外表所迷惑, 失去了理智, 如果你滥用或者不遵守规则, 一样会吃到苦头.
比如这里需要注意的是, 只有当上游正确的实现了如何根据下游的处理能力来发送事件的时候, 才能达到这种效果, 如果上游根本不管下游的处理能力, 
一股脑的瞎他妈发事件, 仍然会产生上下游流速不均衡的问题, 这就好比小日本管他叶问要打几个, 老子直接拿出1万个鬼子, 这尼玛有种打死给我看看? 
那么如何正确的去实现上游呢, 这里先卖个关子, 之后我们再来讲解.

东西还是多呀,我好菜,(。・_・。)ノI’m sorry~ 有点看不懂了,今天累累的...

快速接触下**[Observable](https://link.zhihu.com/?target=http%3A//reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html)**

一个官方的案例,先跑起来看一眼:

        /*
        由于在RxJava在2.x以上版本,api改动还是比较大的.
        其中订阅时有两个Api : subscribe和subscribeWith,很多人可能不太明白应该使用哪个
        我的理解就是subscribeWith中会把方法参数返回回去接收的是ResourceSubscriber,
        而ResourceSubscriber实现了Disposable接口所以,一般subscribeWith用到使用Rx请求接口的这种情况,订阅后把请求参数返回回去,可以添加到CompositeDisposable中方便绑定Activity生命周期取消
        其实subscribe中除了重载参数是Observer的其他也都返回了Dispose对象,至于为什么这个方法没有返回暂时也不知道作者怎么想的.
        因为它返回值是void所以在请求接口时最好还是使用subscribeWith,添加订阅关系更方便了
         */
        // 别引错包了,是io.reactivex,不是rx.Observable
        var d = Observable.just("Hello world!")
                .delay(1, TimeUnit.SECONDS)
                .subscribeWith(object : DisposableObserver < String >() {
                    override fun onStart() {
                        Log.e("rxjava", "Start");
                    }
                    override fun onNext(t: String) {
                        Log.e("rxjava", "" + t);
                    }
                    override fun onError(t: Throwable) {
                        t.printStackTrace();
                    }
                    override fun onComplete() {
                        Log.e("rxjava", "Done");
                    }
                });
         Thread.sleep(500);
        // the sequence can now be disposed via dispose() - 隔了一会里面取消,则收不到结果
        // d.dispose();
image

那我们接着看看别的写法:

       // 1\. 创建一个被观察者
        val observable = Observable.create(ObservableOnSubscribe<String> { emitter ->
            Log.d("observable", "observable")
            emitter.onNext("1")
            emitter.onNext("2")
            emitter.onNext("3")
            emitter.onComplete()
        })

        // 2\. 创建一个订阅者(观察者)
        val observer = object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                Log.e("observable", "onSubscribe")
            }

            override fun onNext(s: String) {
                Log.e("observable", s)
            }

            override fun onError(e: Throwable) {
                Log.e("observable", "onError")
            }

            override fun onComplete() {
                Log.e("observable", "onComplete")
            }
        }

        // 订阅收消息(关联观察者与被观察者)
        observable.subscribe(observer);

这个看着好像还行的样子...这篇文章写的和比喻的比较可以

image

RxJava2系列第一篇---基本使用 作为我们这些小白,还是适合看看的。。

目前来看,也只是知道简单使用。。还不行。小萌新还想就是看子线程,UI线程那种方式怎么搞?所以Observer还需要实践一下才行,就像这样的解释 RxJava2系列第二篇---异步

再来段解释加深下印象:

image

**再接着-接着看看别的写法: **Consumer也可以作为简单的观察者,接收上游数据(就是很多状态回调都没得了) - 有时候或许我们就只需要接收数据,什么错误,完成可能都不需要(比如推送信息来了,如果正常有数据就提示用户,类似这样的操作可能就不需要做任何错误完成处理...)

        // 简单点也可以Consumer作为观察者 - 很多状态可能就没有了
        observable.subscribe(object: Consumer<String>{
            override fun accept(t: String?) {
                // "a" "b" "c"
                Log.e("observable accept", t)
            }
        })

接着我们看看线程相关的,就是说我们目前的上游,下游是在哪个线程,为什么之前的实践可以Toast,如下:

        // 1\. 默认是在当前的主线程(UI线程中) - 结果都是: thread'name=main
        val observable2 = Observable.create(ObservableOnSubscribe<String> { emitter ->
            Log.e("observable", "thread'name=" + Thread.currentThread().name)
            emitter.onNext("Hello I'm comming.")
            emitter.onComplete()
        })
        observable2.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                // main线程中,可以进行ui操作
                Toast.makeText(this@Main2Activity, "OK", Toast.LENGTH_SHORT).show()
                Log.e("observable", "thread'name=" + Thread.currentThread().name)
            }
        })

        // 将处理放到一个子线程中去进行处理...我们可能要进行网络请求
        Thread(Runnable {
            // 2\. 默认是在当前的子线程中 - 结果都是: thread'name=Thread-2
            val observable3 = Observable.create(ObservableOnSubscribe<String> { emitter ->
                Log.e("observable", "thread'name=" + Thread.currentThread().name)
                emitter.onNext("Hello I'm comming.")
                emitter.onComplete()
            })
            observable3.subscribe(object : Consumer<String> {
                override fun accept(t: String?) {
                    // 不能操作UI - 非main(UI)线程
                    // Toast.makeText(this@Main2Activity, "OK", Toast.LENGTH_SHORT).show()
                    Log.e("observable", "thread'name=" + Thread.currentThread().name)
                }
            })
        }).start()

我们分别在主线程(UI线程)和新开的线程中进行了当前线程信息的打印,如果默认没有指定上下游线程的情况下,外面的都是在main线程中;里面的都是出于新的子线程中。从打印结果也可以看到:

image

如果要结合Android实际情况,上游做网络请求处理,然后给到下游,下游可能涉及到提示,界面刷新等操作。很明显程序就炸了!So,此前我们经常用的subscribeOn、observeOn就发挥了至关作用! 一开始和我一样懵懵的我们,只是搬来用的我们,总算可以多理解一些了:

subscribeOn - 指定上游所在线程

observeOn - 指定下游所在线程

上线处于子线程,负责发送网络请求,下游处于主线程,负责更新UI,RxJava线程调度器就干了这个事情。所以为什么,我们要引入Retrofit+RxJava,还得配合AndroidSchedulers来实现更优雅的网络请求!实现小萌新的MVP架构尼...

改造如下:

        // 将处理放到一个子线程中去进行处理...我们可能要进行网络请求
        Thread(Runnable {
            // 2\. 默认是在当前的子线程中 - 结果都是: thread'name=Thread-2
            val observable3 = Observable.create(ObservableOnSubscribe<String> { emitter ->
                Log.e("observable", "thread'name=" + Thread.currentThread().name)
                emitter.onNext("Hello I'm comming.")
                emitter.onComplete()
            })
            observable3.subscribe(object : Consumer<String> {
                override fun accept(t: String?) {
                    // 不能操作UI - 非main(UI)线程
                    // Toast.makeText(this@Main2Activity, "OK", Toast.LENGTH_SHORT).show()
                    Log.e("observable", "thread'name=" + Thread.currentThread().name)
                }
            })

            // 3\. 默认是在当前的子线程中 - 结果都是: thread'name=Thread-2
            val observable4 = Observable.create(ObservableOnSubscribe<String> { emitter ->
                Log.e("observable", "thread'name=" + Thread.currentThread().name)
                emitter.onNext("Hello I'm comming.")
                emitter.onComplete()
            })
            observable4.subscribeOn(Schedulers.newThread()) // 指定上游是一个新的线程
                    .observeOn(AndroidSchedulers.mainThread())  // 指定下游是一个Android 主线程/UI线程
                    .subscribe(object : Consumer<String> {
                        override fun accept(t: String?) {
                            // 切换到main(UI)线程
                            Toast.makeText(this@Main2Activity, "OJBK", Toast.LENGTH_SHORT).show()
                            Log.e("observable", "thread'name=" + Thread.currentThread().name)
                        }
                    })
        }).start()

RxJava线程调度器可以呀!

image

注意1,有时候我们用的是http://Schedulers.io()来指定上游线程?如下

image

解释 from 网友:

您应该考虑使用线程池的主要原因是它们保留了一些空闲并等待工作的预创建线​​程。
这意味着当你有工作要做时,你不需要花费创建线程的开销。一旦你的工作完成,该线程也可以重新用于未来的工作,而不是不断创建和销毁线程。

创建线程可能会很昂贵,因此最大限度地减少正在创建的线程数量通常很好。

有关线程池的更多信息,我建议:

Java中的线程池有什么用?
什么是线程池?
线程池模式(Wikipedia)

注意2 多次指定上下游所在线程的情况 - 给老哥打个广告

image

另外还需要关注一下RxJava1.x和RxJava2.x的一些区别,比如Observable的不同,创建方式的区别,小萌新之前用的是1.x的版本,现在发现如果导入以前的包,官方的一些案例里面的一些方法都没法调用!这个需要关注下,核心的逻辑思想一致,不过用法需要特别注意!

**注意4: **Rxjava 1.x 版本、Rxjava 2.x版本关于CallAdapterFactory的区别,由于旧版本的RxJavaCallAdapterFactory不支持RxJava2.x,但是jakewharton大神怎么能不解决了,随后就有了新版的rxjava2版本的适配器 square/retrofit 完善下如下依赖!

image

继续在Rxjava2.x上用旧版本适配器适配器问题:addCallAdapterFactory(RxJavaCallAdapterFactory.create()),报错如下: Unable to create call adapter for io.reactivex.Observable

  06-19 10:53:38.840 11284-11291/? E/zygote64: Failed sending reply to debugger: Broken pipe
06-19 10:54:04.780 11284-11363/com.hl.rxnettest E/AndroidRuntime: FATAL EXCEPTION: Thread-2
    Process: com.hl.rxnettest, PID: 11284
    java.lang.IllegalArgumentException: Unable to create call adapter for io.reactivex.Observable<java.util.List<com.hl.rxnettest.Repo>>
        for method GitHubService.listReposStringRxJavaObservable
        at retrofit2.Utils.methodError(Utils.java:52)
        at retrofit2.HttpServiceMethod.createCallAdapter(HttpServiceMethod.java:105)
        at retrofit2.HttpServiceMethod.parseAnnotations(HttpServiceMethod.java:66)
        at retrofit2.ServiceMethod.parseAnnotations(ServiceMethod.java:37)
        at retrofit2.Retrofit.loadServiceMethod(Retrofit.java:170)
        at retrofit2.Retrofit$1.invoke(Retrofit.java:149)
        at java.lang.reflect.Proxy.invoke(Proxy.java:913)
        at $Proxy0.listReposStringRxJavaObservable(Unknown Source)
        at com.hl.rxnettest.Main2Activity$onCreate$1.run(Main2Activity.kt:271)
        at java.lang.Thread.run(Thread.java:764)
     Caused by: java.lang.IllegalArgumentException: Could not locate call adapter for io.reactivex.Observable<java.util.List<com.hl.rxnettest.Repo>>.
      Tried:
       * retrofit2.adapter.rxjava.RxJavaCallAdapterFactory
       * retrofit2.CompletableFutureCallAdapterFactory
       * retrofit2.DefaultCallAdapterFactory
        at retrofit2.Retrofit.nextCallAdapter(Retrofit.java:241)
        at retrofit2.Retrofit.callAdapter(Retrofit.java:205)
        at retrofit2.HttpServiceMethod.createCallAdapter(HttpServiceMethod.java:103)
            ... 8 more

RxJava2出现:Unable to create call adapter for io.reactivex.Flowable 网友也有说明尼!

HL. 我们就可以来个github仓库的请求看看,走起...

        // 来个网络请求案例压压惊
        // 1\. 创建Retroft实例对象
        var retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                // 别忘记了加支持RxJava2的适配器:旧版的别了呀 https://github.com/square/retrofit/tree/master/retrofit-adapters/rxjava2
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .baseUrl("https://api.github.com/")
                .build();

        // 2\. 创建GitHub请求服务
        var gitHubService = retrofit.create(GitHubService::class.java);

        // 3\. 获取服务对应的某个请求实例
        var gitCall = gitHubService.listReposStringRxJavaObservable("FanChael");

        // 4\. RxJava请求走起
        var ds: Disposable ? = null
        gitCall.subscribeOn(Schedulers.io()) // 指定上游一个子线程
                .observeOn(AndroidSchedulers.mainThread())  // 指定下游UI/主main线程
                .subscribe(object : Observer<List<Repo>> {
                    override fun onComplete() {
                    }

                    override fun onSubscribe(d: Disposable) {
                        // 保存起来,后面可以做一些取消处理
                        ds = d;
                    }

                    override fun onNext(t: List<Repo>) {
                        Toast.makeText(this@Main2Activity, "OJBK", Toast.LENGTH_SHORT).show()
                        Log.e("observable", "thread'name=" + Thread.currentThread().name)
                        for (item in t) {
                            // SuperStartElectronic subtitle display - 电子字幕展示-接机、演唱会、见面、展示专用.https://github.com/FanChael/SuperStart
                            Log.e("observable", item.name + item.description + item.html_url)
                        }
                    }

                    override fun onError(e: Throwable) {
                    }
                })
        // 可以做取消请求处理
        //        if (null != ds && !ds!!.isDisposed) {
        //            ds!!.dispose()
        //        }

一看,OK啦...

  用过了RxJava1.x,再来用用RxJava2.x,还是有好处的哈! 有些问题能追溯一下...
image

对了,还有个.map的数据中间处理的过程(之前rxjava1.x的时候用的),小萌新还没添加,添加下:

image

所以,改造下,**增加一个map - 操作符的相关 **https://blog.csdn.net/oneblue123/article/details/79784620 - 有必要了解一下,改天可以针对你的项目做优化和完善滴!

         // 来个网络请求案例压压惊
        // 1\. 创建Retroft实例对象
        var retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                // 别忘记了加支持RxJava2的适配器:旧版的别了呀 https://github.com/square/retrofit/tree/master/retrofit-adapters/rxjava2
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .baseUrl("https://api.github.com/")
                .build();

        // 2\. 创建GitHub请求服务
        var gitHubService = retrofit.create(GitHubService::class.java);

        // 3\. 获取服务对应的某个请求实例
        var gitCall = gitHubService.listReposStringRxJavaObservable("FanChael");

        // 4\. RxJava请求走起
        var ds: Disposable ? = null
        gitCall.subscribeOn(Schedulers.io()) // 指定上游一个子线程
                .observeOn(AndroidSchedulers.mainThread()) // 指定下游UI/主main线程
                // 创建一个动态键的 observable 映射。 如果你不但想对一个特定项的更改做出反应,而且对添加或删除该项也做出反应的话,那么 observable 映射会非常有用
                .map { t ->  // lambda表达式样式呀
                    Log.e("observable", "map thread'name=" + Thread.currentThread().name)
                    var userList: MutableList<Repo> = ArrayList()
                    for (item in t){
                        if (item.name?.contains("banner")!!){
                            userList.add(item)
                        }
                    }
                    userList
                }
                .subscribe(object : Observer<List<Repo>> {
                    override fun onComplete() {
                    }

                    override fun onSubscribe(d: Disposable) {
                        // 保存起来,后面可以做一些取消处理
                        ds = d;
                    }

                    override fun onNext(t: List<Repo>) {
                        Toast.makeText(this@Main2Activity, "OJBK", Toast.LENGTH_SHORT).show()
                        Log.e("observable", "thread'name=" + Thread.currentThread().name)
                        for (item in t) {
                            // SuperStartElectronic subtitle display - 电子字幕展示-接机、演唱会、见面、展示专用.https://github.com/FanChael/SuperStart
                            Log.e("observable", item.name + item.description + item.html_url)
                        }
                    }

                    override fun onError(e: Throwable) {
                    }
                })
        // 可以做取消请求处理
        //        if (null != ds && !ds!!.isDisposed) {
        //            ds!!.dispose()
        //        }
image

附上: 这有个网友的升级笔记,很nice RxJava1.X升级到RxJava2.X笔记

小萌新之前的RxJava1.x方式(没有做封装的一个项目的写法,封装后利用泛型做了通用处理,所以AuthorBean这些都是泛型T替代了的。。。)如下:

image

这篇基础认识先这样吧,搞了两天了。。官方看,别人的分析看,自己实践试试看,都一起整整了看了下。。算是更熟悉一步了。。下一篇的话,打算把RxJava2.x + Retrofit2.x的网络请求中异常处理,数据请求预处理,请求信息预包装等来搞搞。为进一步认知以及后续的封装做准备!

补上一篇感觉还不错的方法的全面说明: RxJava2 只看这一篇文章就够了 - 每个说明都有案例,帮助你理解入门

FanChael/RxNet 仓库地址

补补提醒:onNext经过指定AndroidSchedulers.mainThread()后,切换到了UI线程,但是如果你Thread.sleep(10000)发现并不会ANR,But,如下:

   ANR是指系统没有响应,但是耗时操作不一定会造成ANR
   但ANR通常是因为在UI线程中做了耗时操作引起的
   就是说耗时操作有可能会造成ANR,也有可能不回

如果你进行网络同步请求,指定会炸的:

                       // UI线程规定不能进行耗时的操作,但是不一定耗时都会引起ANR;只要不影响UI渲染的卡顿,不一定会引起ANR
                        Thread.sleep(100000)
                        /* UI线程中不能进行 okhttp同步请求
                        val client = OkHttpClient.Builder()
                                .build();
                        val request = Request.Builder().url("http://www.baidu.com").get().build()
                        val requestCall = client.newCall(request)
                        val response = requestCall.execute()
                        if (response.isSuccessful()) {
                            val json = response.body()?.string();
                            Log.e("observable", "json=" + json)
                        } */

操作符除了map,还有flatmap这些,能解决for循环处理数据嵌套的问题,帮你简化预处理数据的操作流程,熟悉了应该事半功倍!

https://blog.csdn.net/baidu_31093133/article/details/78744911

如果你想针对Retrofit请求数据进行预处理,然后给到onNext,你可以再包装一层Observable进行耗时处理,然后给到onNext的UI线程进行渲染...暂时先入门记录到这吧。只能说又熟悉了一天了。。。和网友请教学习了一番...思路清晰多了!目前来看是有了相对清晰的脉络了...加油,我是菜鸟,我很丑但是很温柔!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容