Advanced RxJava and Retrofit

译自: May 2017 Meetup: Advanced RxJava and Conductor
Slides

前言

我猜在你学习 RxJava + Retrofit 的历程中,肯定见过这种代码:

interface SeatGeekApi {
  @GET("events") Observable<EventResponse> getUpcomingEvents();
}
  
Retrofit mRetrofit = new Retrofit.Builder()
    .baseUrl("https://api.seatgeek.com/")
    .build();
  
SeatGeekApi mApi = mRetrofit.create(SeatGeekApi.class);
  
mApi.getUpComingEvents()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(events -> {
      // handle events
    }, error -> {
      // handle errors
    });

上面的代码缺少了哪些东西?

  • 请求失败时需要 retry
  • 处理特定的 响应码
  • UI 中显示 loading indicators
  • 对系统 configuration changes 的响应
  • 其他特殊事件

I. 重试请求 Retrying Request

网络请求在移动设备上会因各种原因发生错误,通常我们需要添加一些重试请求的机制。

一种 naive 的实现:

private void makeRequest() {
  getRequestObservable()
      .subscribe(getObserver());
}
  
private Observer<Response> getObserver() {
  return new Observer<Response>() {
    ...
    @Override public void onError(Throwable e) {
      // 发生错误时发送新的请求
      if (someCondition) {
        makeRequest();
      }
    }
  };
}

使用 retryWhen() 实现:

RxJava 的 retryWhen() 方法可以便捷地实现这个需求:

  • retryWhen() 只会在每个 subscription 中调用一次
  • retryWhen() 返回的是 parent 数据流的 errors
  • 只要从 retryWhen() 返回的 Observable 对象不是 complete 或者 error 的,parent Observable 将会被重新订阅。
// 简单的演示重试 3 次请求,分别延迟 5s,10s,15s.
getRequestObservable()
  .retryWhen(attempt -> {
    attempt
        .zipWith(Observable.range(1, 3), (n, i) -> i)
        .flatMap(i -> {
          return Observable.timer(5 * i, TimeUnit.SECONDS);
        })
  })
  .subscribe(viewModel -> {
    // handle updated request state
  });

一些优化:

  • 频繁的自动 retry 可能会导致 app DDoS 服务器。
  • 比较好的处理方式是让用户选择 retry,我们可以使用 RxRelay 中的Relay/Subject
PublishRelay<Long> retryRequest = PublishRelay.create();
    
getRequestObservable()
    .retryWhen(attempt ->retryRequest)
    .subscribe(viewModel -> {
      // handle updated request state
    });

@OnClick(R.id.retry_view)
public void onRetryClicked() {
  retryRequest.call(System.currentTimeMillis);
}

II. 处理特殊响应码 Response Codes

使用 Response<T>

  • 利用 Observable<Response<T>> 可以使用响应的元数据 (metadata)。
  • 这种方法的优点是服务器响应码在 400-500 之间不会产生异常。
interface SeatGeekApi {
  @GET("events") Observable<Response<EventResponse>> getUpComingEvents();
}
  
Retrofit mRetrofit = new Retrofit.Builder()
    .baseUrl("https://api.seatgeek.com/")
    .build();

mApi.getUpComingEvents()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(eventsResponse -> {
      int responseCode = eventsResponse.code();
      switch (responseCode) {
        HTTP_301:
          ...
        HTTP_403:
          ...
      }
    }, error -> {
      // ONLY handle i/o errors
    });

一些优化

  • 仅仅使用 Response<T> 处理所有的 response code
    可能会显得比较累赘,幸运的是 Retrofit 提供了 HttpException 处理 errors
  • 可以在 onError 回调里检查 exception 是否为 HttpException
mApi.getUpComingEvents()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(eventsResponse -> {
      // handle something with events
    }, error -> {
      if (error instanceof HttpException) {
        Response response = ((HttpException)error).response();
        switch (response.code()) {...}
      } else {
        // handle other errors
      }
    });
  • 为了增强代码可读性,我们还可以使用 share().
interface SeatGeekApi {
  @GET("events") Observable<Response<EventResponse>> getUpComingEvents();
}

Observable<Response<EventResponse>> eventsResponse = mApi.getUpcomingEvents()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .share();

eventsResponse
    .filter(Response::isSuccessful)
    .subscribe(this::handleSuccessfulResponse);

eventsResponse
    .filter(response -> response.code() == HTTP_403)
    .subscribe(this::handle403Response);

eventsResponse
    .filter(response -> response.code() == HTTP_304)
    .subscribe(this::handle304Response);

III. 显示进度条 Loading Indicator

一种 naive 的实现

mApi.getUpcomingEvents()
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .doOnSubscribe(() -> loadingIndicator.show())
  .doOnUnsubscribe(() -> loadingIndicator.hide())
  .subscribe(events -> {
    // do something with events
  }, error -> {
    // handle errors
  });

上例中存在的问题:

  • UI 跟数据流深度耦合
  • 无法响应数据流的变化,仅在简单的请求时有效

使用分割数据流 Splitting Stream

  • 尝试新的方法,UI 更新位于另一条 stream
  • 我们可以灵活的根据 loading 状态更新 UI.
enum RequestState {
  IDLE, LOADING, COMPLETE, ERROR
}

BehaviorRelay<RequestState> state = BehaviorRelay.create(RequestState.IDLE);

void publishRequestState(RequestState requestState) {
  Observable.just(requestState)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(state);
}

mApi.getUpcomingEvents()
  .doOnSubscribe(() -> publishRequestState(RequestState.LOADING))
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .doOnError(t -> publishRequestState(RequestState.ERROR))
  .doOnComplete(() -> publishRequestState(RequestState.COMPLETE))
  .subscribe(events -> {
    // do something with events
  }, error -> {
    // handle errors
  });
  • 请求状态发生变化时,在另一条 stream 修改 UI 的变化:
state.subscribe(requestState -> {
    switch (requestState) {
      IDLE:
        break;
      LOADING:
        loadingIndicator.show();
        errorView.hide();
        break;
      COMPLETE:
        loadingIndicator.hide();
        break;
      ERROR:
        loadingIndicator.hide();
        errorView.show();
        break;
    }
  });

一些优化

  • 我们可以将这种模式扩展到各种 request result
BehaviorRelay<RequestState> state = BehaviorRelay.create(RequestState.IDLE);
BehaviorRelay<EventsResponse> response = BehaviorRelay.create();
BehaviorRelay<Throwable> errors = BehaviorRelay.create();
...
void executeRequest() {
  mApi.getUpcomingEvents()
      .doOnSubscribe(() -> publishRequestState(RequestState.LOADING))
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnError(t -> publishRequestState(RequestState.ERROR))
      .doOnComplete(() -> publishRequestState(RequestState.COMPLETE))
      .subscribe(response, errors);
  }

  • 我们同样可以将所有分割的 streams 合并成一条:
BehaviorRelay<RequestState> state = BehaviorRelay.create(RequestState.IDLE);
BehaviorRelay<Optional<EventsResponse>> response = BehaviorRelay.create(Optional.empty());
BehaviorRelay<Optional<Throwable>> errors = BehaviorRelay.create(Optional.empty());
  
class RequestViewModel {
  public final RequestState mState;
  public final Optional<EventsResponse> mResponse;
  public final Optional<Throwable> mErrors;
  
  RequestViewModel(...) {...}
}

Observable.combineLatest(state, response, errors, RequestViewModel::new)
  .subscribe(viewModel -> {
    // handle updated request state
  });

IV. 管理 Configuration Changes

RxJava,以及像 AsyncTask 等其他异步模型默认情况下不会响应 Android 的生命周期。RxJava 提供了数据流 unsubscribe 的方法,但是问题是该如何正确的使用它。

Disposable mDisposable = Observable.combineLatest(state, response, errors, RequestViewModel::new)
  .subscribe(viewModel -> {
    // handle updated request state
  });

...

mDisposable.dispose(); // when?

一种基本的实现

创建 Disposable 时保存 Disposable 的引用,在生命周期发生回调时调用 dispose() 方法。

class MyActivity extends Activity {
  Disposable mDisposable;

  @Override protected void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    ...
    mDisposable = Observable
        .combineLatest(state, response, errors, ViewModel::new)
        .subscribe(getSubscriber());
  }

  @Override protected void onStop() {
    super.onStop();
    mDisposable.dispose();
  }
}

CompositeDisposable

  • CompositeDisposable 非常适合 UI 订阅了多条 stream 的场景。
  • 使用 add() 方法添加每条 stream 的 Disposable 对象。
  • CompositeDisposable 的清理方法非常适合在 MVP 框架下的 unbind() 方法使用。
class MyActivity extends Activity {
  CompositeDisposable mDisposables = new CompositeDisposable();

  @Override protected void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    ...    
    mDisposables.add(state.subscribe(getStateSubscriber()));
    mDisposables.add(state.subscribe(getStateSubscriber()));
    mDisposables.add(state.subscribe(getStateSubscriber()));
  }

  @Override protected void onStop() {
    super.onStop();
    mDisposables.clear();
  }
}

推荐的方式—— RxLifecycle

  • RxLifecycle 提供了便捷的方式将我们的 Disposable 与 Activity/Fragment 的生命周期绑定。
  • 当相应的生命周期回调发生时,Disposable 将会调用 dispose() 方法。
class MyActivity extends RxAppCompatActivity {
  @Override protected void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    ...
    Observable.combineLatest(state, response, errors, ViewModel::new)
        .compose(bindToLifecycle())
        .subscribe(viewModel -> {
          // handle updated request state
        });

RxLifecycle + MVP

public class RxPresenter<T extends MvpView> extends Presenter<T> {

  BehaviorRelay<PresenterLifecycle> mPresenterLifecycle =
      BehaviorRelay.create(STOPPED);

  @Override public void attach(T mvpView) {
    mPresenterLifecycle.call(STARTED);
  }

  @Override public void detach() {
    mPresenterLifecycle.call(STOPPED);
  }

  protected <T>LifecycleTransformer<T> bindToLifecycle() {
    return RxLifecycle.bind(presenterLifecycleObservable, lifecycle -> {
      switch (lifecycle) {
        case STARTED:
          return STOPPED;
        case STOPPED:
        default:
          return STARTED;
      }
    });
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,490评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,043评论 25 707
  • 帝都中元节,午后骤雨,至天际皓月,转为细雨绵绵,一时落河岸边,人人持伞,手捧莲花灯,为已故亲人祈福。 盼儿侍奉父亲...
    陌上花开mshk阅读 733评论 0 9
  • 知女范微信公众平台:zhinvfan 自从《太阳的后裔》播出爆红后, “撩妹”一夜成为网络热词。 悲哀的是,真不是...
    爱美文78阅读 307评论 0 0
  • 亘古至今 人人都渴望拥有知己 懂得自己欣赏自己 理解乃至包容自己…… 古有管鲍君子之交 亦有伯牙为子期病故而...
    娑婆如斯阅读 894评论 0 104