利用 RxJava2 和 OkHttp 搭建 WebSocket 订阅流

背景

对于金融交易软件,行情信息的实时更新是一个重要需求。后端团队已经提供了 WebSocket api,现在需求在安卓客户端使用 WebSocket 来接收后端实时推送的行情更新。

平台

Android

语言环境

Kolin

目标

  • 在异步线程里建立 WebSocket 连接
  • WebSocket 所有事件回调到主线程处理
  • 支持消息背压(行情可能在短时间内有剧烈波动,消息量会激增)

实现

建立连接

WebSocket 采用了 okHttp 的内置实现 - RealWebSocket,其中 WebSocketListener 提供了相应事件回调:

public abstract class WebSocketListener {
  /**
   * Invoked when a web socket has been accepted by the remote peer and may begin transmitting
   * messages.
   */
  public void onOpen(WebSocket webSocket, Response response) {
  }

  /** Invoked when a text (type {@code 0x1}) message has been received. */
  public void onMessage(WebSocket webSocket, String text) {
  }

  /** Invoked when a binary (type {@code 0x2}) message has been received. */
  public void onMessage(WebSocket webSocket, ByteString bytes) {
  }

  /**
   * Invoked when the remote peer has indicated that no more incoming messages will be
   * transmitted.
   */
  public void onClosing(WebSocket webSocket, int code, String reason) {
  }

  /**
   * Invoked when both peers have indicated that no more messages will be transmitted and the
   * connection has been successfully released. No further calls to this listener will be made.
   */
  public void onClosed(WebSocket webSocket, int code, String reason) {
  }

  /**
   * Invoked when a web socket has been closed due to an error reading from or writing to the
   * network. Both outgoing and incoming messages may have been lost. No further calls to this
   * listener will be made.
   */
  public void onFailure(WebSocket webSocket, Throwable t, @Nullable Response response) {
  }
}

这里有两个地方需要注意,onClosedonFailure,当两个对等方都指示不再传输消息并且连接已成功释放时,onClosed 会被调用,之后不会再有任何回调,而由于从网络读取或写入错误而关闭 WebSocket 时,onFailure 会被调用,同样之后不会再有任何回调,注意此时 WebSocket 会被关闭而且 onClosed 不会再被调用,这是 RealWebSocket 的实现:

public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
  ...
  public void failWebSocket(Exception e, @Nullable Response response) {
    Streams streamsToClose;
    synchronized (this) {
      if (failed) return; // Already failed.
      failed = true;
      streamsToClose = this.streams;
      this.streams = null;
      if (cancelFuture != null) cancelFuture.cancel(false);
      if (executor != null) executor.shutdown(); //停止 executor
    }

    try {
      listener.onFailure(this, e, response);
    } finally {
      //关闭流
      closeQuietly(streamsToClose);
    }
  }
  ...
}

另外双方还需要一个心跳机制来检测连接状态,RealWebSocket 内心跳是一个空字节,即 0 byte,心跳逻辑与后端 api 一致,因此心跳不做修改,如果实际业务中心跳机制不同于此,就需要做修改了,这里暂且跳过。

下面我们来建立一个 WebSocket 连接(部分代码省略):

val okHttpClient = OkHttpClient.Builder()..........build()
val request = Request.Builder().url("这里输入连接地址").build()
val listener = object: WebSocketListener () {
        ...
}
val pingIntervalMillis = 1000 * 60 * 5 //心跳间隔 5 分钟
val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
realWss.connect(client) //建立连接

这样我们就能建立起一个 WebSocket 连接了,所有消息都会回调到我们定义的 listener 里。

异步回调

大家都知道 Android 网络操作一定不能放在主线程中,这一点就不做赘述了。这里我们使用了 RxJava2 的 Flowable 操作符以及 Scheduler 来进行线程调度,我们可以手动调用 onNext 来向订阅者发送 WebSocket 消息事件,同时 Flowable 也支持背压:

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

以上是 RxJava2 提供的几种背压策略,根据业务这里选择了 BUFFER 缓存所有收到的消息直到被消费。同时 BUFFER 也是默认的背压策略:

public final class FlowableCreate<T> extends Flowable<T> {
    ...
    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        ...
          default: {
            //默认 BufferAsyncEmitter
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
          }
        }
        ...
    }
    ...
}

这里我们注意到,BufferAsyncEmitter 实例化的过程中传入了 bufferSize() ,通过查看 Flowable 我们看到默认的 buffer size 是 128:

public abstract class Flowable<T> implements Publisher<T> {
        /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    ...
}

这里先使用默认容量,不做修改。

接下来将 WebSocket 所有事件密封在一个类里,用作 Flowable 的消息类型:

sealed class WebSocketEvent {

    class Open(webSocket: WebSocket?, val response: Response?) : WebSocketEvent()

    class BinaryMessage(webSocket: WebSocket?, val bytes: ByteString?) : WebSocketEvent()

    class StringMessage(webSocket: WebSocket?, val text: String?) : WebSocketEvent()

    class Closing(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()

    class Closed(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()


}

WebSocketListener 中向订阅者发送对应的事件:

class FlowableWebSocketListener(private val emitter: FlowableEmitter<WebSocketEvent>) : WebSocketListener() {

    override fun onOpen(webSocket: WebSocket?, response: Response?) {
        emitter.onNext(WebSocketEvent.Open(webSocket, response))
    }

    override fun onMessage(webSocket: WebSocket?, text: String?) {
        emitter.onNext(WebSocketEvent.StringMessage(webSocket, text))
    }

    override fun onMessage(webSocket: WebSocket?, bytes: ByteString?) {
        emitter.onNext(WebSocketEvent.BinaryMessage(webSocket, bytes))
    }

    override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
        emitter.onNext(WebSocketEvent.Closing(webSocket, code, reason))
    }

    override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
        emitter.onComplete()
    }

    override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
        if (!emitter.isCancelled) {
            emitter.onError(t ?: IOException("WebSocket unknown error"))
            emitter.onComplete()
        }
    }
}

接下来我们创建一个 Flowable,在异步线程建立连接,消息发送到主线程做处理:

val flowable: Flowable<WebSocketEvent> = Flowable.create({ emitter ->
      ...
      val listener = RxWebSocketListener(emitter)
      val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
      realWss.connect(client) //建立连接
}, BackpressureStrategy.BUFFER)

flowable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: ResourceSubscriber<WebSocketEvent> {
      ...
      override fun onStart() {
         //ResourceSubscriber 会一次拉取 Long.MAX_VALUE 个消息,这里我们覆写先拉取一条
         request(1)
      }

      override fun onNext(t: WebSocketEvent?) {
          when(t){
              WebSocketEvent.Open -> {...}
              WebSocketEvent.BinaryMessage -> {...}
              WebSocketEvent.StringMessage -> {...}
              WebSocketEvent.Closing -> {...}
          }

          if (判断是否需要继续拉取消息) {
              request(1)
          }
      }

      override fun onComplete() {
           onClosed()
           dispose()
      }
      ...
})

这里要注意 Schedulers.io() 创建工作线程的数量是没有上限的,因此在 WebSocket 关闭之后应该立即 dispose() 来释放,否则可能引发 OutOfMemory。

...
static final class CachedWorkerPool implements Runnable {
    ThreadWorker get() {
          if (allWorkers.isDisposed()) {
              return SHUTDOWN_THREAD_WORKER;
          }
          while (!expiringWorkerQueue.isEmpty()) {
              ThreadWorker threadWorker = expiringWorkerQueue.poll();
              if (threadWorker != null) {
                  return threadWorker;
              }
          }

          // No cached worker found, so create a new one.
          // 缓存中没有可复用的 ThreadWorker 时,创建一个新的 ThreadWorker
          ThreadWorker w = new ThreadWorker(threadFactory);
          allWorkers.add(w);
          return w;
    }
    ...
}
...

到此一个异步 WebSocket 连接就搭建完成了。

(转载请注明出处)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容

  • 原文地址:http://www.ibm.com/developerworks/cn/java/j-lo-WebSo...
    敢梦敢当阅读 8,863评论 0 50
  • 2. NODE模块端实现 2.2 node模块的实现 引入模块: 路径分析 文件定位 编译执行 2.2.1 优先从...
    yozosann阅读 2,106评论 0 0
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,067评论 1 32
  • 一开始的时候我对这个性格很弱的女生并没有什么感觉,在五美中真的是最不起眼的一个。 但我渐渐发现,这...
    杜木兮阅读 262评论 0 0
  • 1. 下面的代码输出多少?修改代码让 fnArr[i]() 输出 i。使用 两种以上的方法 2. 封装一个汽车对象...
    饥人谷_Jack阅读 195评论 0 0