通过Websocket与gRPC交互 | gRPC双向数据流的交互控制系列(2)

在本系列第一篇文章《gRPC双向数据流的交互控制系列(1).初步实现》(https://www.jianshu.com/p/5158d6686769)中,我们完成了通过控制台进行gRPC双向数据流交互控制的实验。但是只是用控制台交互大大限制了客户端的使用范围,如果我们要在网页或者移动端与gRPC进行双向数据流的交互怎么办?熟悉前端开发的朋友可能马上就会想到:用Websocket啦!

Websocket简介

WebSocket协议诞生于2008年,2011年成为国际标准,现代浏览器都已经支持Websocket,移动端不管是安卓还是iOS也没有问题。Websocket提供一种在单个TCP 连接上进行全双工通讯的协议,使得客户端和服务端只需要做一个握手的动作,然后,客户端和服务端之间就形成了一条快速通道,两者之间就直接可以进行双向的数据传输。看到这我们可以发现Websocket与gRPC双向数据流之间简直是天作之合!


整合Websocket服务端 + gRPC客户端

主流的各种语言都有库可供Websocket的服务端使用,为与前文(https://www.jianshu.com/p/5158d6686769)保持一致,我们仍然用Go语言来时实现。Go语言中常用的Websocket库是gorrila websocket (github.com/gorilla/websocket),为了使代码看起来更简洁一些,我们本次采用封装了gorrila websocket的微型框架 melody (github.com/olahol/melody)和 gin框架(github.com/gin-gonic/gin) 来实现。

gin在本程序中的作用比较简单,就是提供路由,一个是静态资源(index.html)的路由,一个是websocket的路由。

melody有三个最重要的函数:一个是HandleConnect,用于响应websocket客户端的连接事件;一个是HandleMessage,用于处理websocket客户端输入的消息;一个是HandleDisconnect,用于处理websocket客户端断开连接事件。melody封装了session,并且可以利用session存取自定义数据。

ws-server-grpc-client.go

package main

import (
    "context"
    "encoding/json"
    "flag"
    "io"
    "log"
    "net/http"

    "github.com/gin-gonic/gin"
    "google.golang.org/grpc"
    "gopkg.in/olahol/melody.v1"

    proto "chat" // 自动生成的 proto代码
)

var 服务端地址 string

func init() {
    flag.StringVar(&服务端地址, "server", "localhost:3000", "服务端地址")
}

func main() {
    // 解析命令行参数
    flag.Parse()

    // 设置log
    log.SetFlags(log.LstdFlags)

    // 创建gRPC连接
    conn, err := grpc.Dial(服务端地址, grpc.WithInsecure())
    if err != nil {
        log.Printf("连接失败: [%v]\n", err)
        return
    }
    defer conn.Close()

    // 声明客户端
    client := proto.NewChatClient(conn)

    r := gin.Default()
    m := melody.New()

    // 静态页面路由
    r.GET("/", func(c *gin.Context) {
        http.ServeFile(c.Writer, c.Request, "html/index.html")
    })

    // websocket路由
    r.GET("/ws", func(c *gin.Context) {
        m.HandleRequest(c.Writer, c.Request)
    })

    // 处理websocket客户端新连接,并为每一个新连接创建一个 双向数据流
    m.HandleConnect(func(s *melody.Session) {
        log.Println("有新用户接入")
        // 给每个连入的新用户创建一个数据流
        // 声明 context
        ctx := context.Background()

        // 创建双向数据流
        stream, err := client.BidStream(ctx)
        if err != nil {
            log.Printf("创建数据流失败: [%v]\n", err)
            // 如果创建数据流失败,向客户端发送失败信息 同时 关闭websocket连接
            s.CloseWithMsg([]byte("创建数据流失败:" + err.Error()))
            return
        }

        // 如果创建成功,将数据流保存在 session中
        s.Set("stream", stream)

        // 启动一个 goroutine 用于接收从服务端返回的消息
        go func() {
            for {
                // 接收从 服务端返回的数据流
                响应, err := stream.Recv()

                if err == io.EOF {
                    log.Println("⚠️ 收到服务端的结束信号")
                    s.CloseWithMsg([]byte("⚠️ 收到服务端的结束信号"))
                    return
                }

                if err != nil {
                    // TODO: 处理接收错误
                    log.Println("接收数据出错:", err)
                    s.CloseWithMsg([]byte("接收数据出错" + err.Error()))
                    return
                }

                log.Printf("[客户端收到]: %s", 响应.Output)
                // 如果成功收到从服务端返回的消息, 将消息序列化后返回给 websocket 客户端
                要返回的byte, _ := json.Marshal(响应)
                s.Write(要返回的byte)
            }
        }()
    })

    // 处理用户发来的消息
    m.HandleMessage(func(s *melody.Session, msg []byte) {
        log.Println("收到消息:", msg)
        // 把用户输入的信息原样返回 websocket 客户端
        s.Write(msg)

        // 将 []byte 类型的 msg 解析为 proto.Request
        var 输入信息 proto.Request
        if err := json.Unmarshal(msg, &输入信息); err != nil {
            log.Println("解析输入信息失败:", err)
            s.CloseWithMsg([]byte("输入信息解析失败"))
            return
        }

        // 从 session中取出 stream
        被保存的数据流, ok := s.Get("stream")
        if !ok {
            s.CloseWithMsg([]byte("没有找到stream!"))
            return
        }

        // 断言stream
        stream, ok := 被保存的数据流.(proto.Chat_BidStreamClient)
        if !ok {
            s.CloseWithMsg([]byte("被保存的数据流不是Chat_BidStreamClient!"))
            return
        }

        if err := stream.Send(&输入信息); err != nil {
            s.CloseWithMsg([]byte("向gRPC服务端发送消息失败:" + err.Error()))
            return
        }
    })

    // 处理 websocket 连接断开事件,并关闭session 中 stream的连接
    m.HandleDisconnect(func(s *melody.Session) {
        log.Println("websocket客户端断开连接")
        // 从 session中取出 stream
        被保存的数据流, ok := s.Get("stream")
        if !ok {
            log.Println("没有找到stream!")
            return
        }

        // 断言stream
        stream, ok := 被保存的数据流.(proto.Chat_BidStreamClient)
        if !ok {
            log.Println("被保存的数据流不是Chat_BidStreamClient!")
            return
        }

        if err := stream.CloseSend(); err != nil {
            log.Println("断开stream连接出错:", err)
        }
    })

    r.Run(":8080")
}

websocket客户端

为简便起见,本例中的客户端没有使用任何框架,而是用了最原始的html和javascript,所以界面比较简陋。我们完全可以用Angular、React、Vue、jQuery……或者基于移动端进行开发。

index.html

<html>
<head>
    <meta charset="UTF-8">
    <title>WebSocket + gRPC双向数据流</title>
     <style>
    #chat {
      text-align: left;
      background: #f1f1f1;
      width: 500px;
      min-height: 300px;
      padding: 20px;
    }
  </style>
</head>
<body>
    <center>
      <h3>对话</h3>
      <h4 id="clientId"></h4>
      <div id="output"/>
      <pre id="chat"></pre>
      <input placeholder="请输入信息,回车发送" id="text" type="text">
    </center>
    <script>
      var url = "ws://" + window.location.host + "/ws";
      var ws = new WebSocket(url);
      
      var chat = document.getElementById("chat");
      var text = document.getElementById("text");
      var output = document.getElementById("output");

      var clientId = document.getElementById("clientId");
      var name = "客户编号:" + Math.floor(Math.random() * 1000);
      clientId.innerHTML = name;
      
      // 打印连接状态
      var printStatus = function(状态) {
        var d       = document.createElement("div");
        d.innerHTML = 状态;
        output.appendChild(d);
      };

      // 获取当前时间
      var now = function () {
        var iso = new Date().toISOString();
        return iso.split("T")[1].split(".")[0];
      };

      // 处理websocket消息
      ws.onmessage = function (msg) {
        var msg = JSON.parse(msg.data);
        if (msg.input){
          var line =  now() + " " + msg.input + "\n";
        }

        if (msg.output){
          var line =  now() + " " + msg.output + "\n";
        }
        
        if(line){
          chat.innerText += line;
        }
      };
      
      // 处理连接事件
      ws.onopen = function(evt) {
        printStatus(now() +  ' ' + '<span style="color: green;">成功连接</span>');
      }

      // 处理断开连接事件
      ws.onclose = function(evt) {
        printStatus(now() +  ' ' + '<span style="color: red;">连接已关闭</span>');
        ws = null;
      }

      // 对话框监听回车键 并发送消息
      text.onkeydown = function (e) {
        if (e.keyCode === 13 && text.value !== "") {
          ws.send(JSON.stringify({"input": text.value}))
          text.value = "";
        }
      };
    </script>
</body>
</html>

gRPC服务端

与《gRPC双向数据流的交互控制系列(1).初步实现》(https://www.jianshu.com/p/5158d6686769)相比,server.go 做了小幅调整,主要是响应指令的条件上做了修改,读者们可以自行比较。

server.go

package main

import (
    "io"
    "log"
    "net"
    "strconv"

    "google.golang.org/grpc"
    
    proto "chat" // 自动生成的 proto代码
)

// Streamer 服务端
type Streamer struct{}

// BidStream 实现了 ChatServer 接口中定义的 BidStream 方法
func (s *Streamer) BidStream(stream proto.Chat_BidStreamServer) error {
    ctx := stream.Context()
    for {
        select {
        case <-ctx.Done():
            log.Println("收到客户端通过context发出的终止信号")
            return ctx.Err()
        default:
            // 接收从客户端发来的消息
            输入, err := stream.Recv()
            if err == io.EOF {
                log.Println("客户端发送的数据流结束")
                return nil
            }
            if err != nil {
                log.Println("接收数据出错:", err)
                return err
            }

            // 如果接收正常,则根据接收到的 字符串 执行相应的指令
            switch 输入.Input {
            case "结束对话\n", "结束对话":  //⚠️ 此处增加了匹配条件
                log.Println("收到'结束对话'指令")
                if err := stream.Send(&proto.Response{Output: "收到结束指令"}); err != nil {
                    return err
                }
                // 收到结束指令时,通过 return nil 终止双向数据流
                return nil

            case "返回数据流\n", "返回数据流":    //⚠️ 此处增加了匹配条件
                log.Println("收到'返回数据流'指令")
                // 收到 收到'返回数据流'指令, 连续返回 10 条数据
                for i := 0; i < 10; i++ {
                    if err := stream.Send(&proto.Response{Output: "数据流 #" + strconv.Itoa(i)}); err != nil {
                        return err
                    }
                }

            default:
                // 缺省情况下, 返回 '服务端返回: ' + 输入信息
                log.Printf("[收到消息]: %s", 输入.Input)
                if err := stream.Send(&proto.Response{Output: "服务端返回: " + 输入.Input}); err != nil {
                    return err
                }
            }
        }
    }
}

func main() {
    log.Println("启动服务端...")
    server := grpc.NewServer()

    // 注册 ChatServer
    proto.RegisterChatServer(server, &Streamer{})

    address, err := net.Listen("tcp", ":3000")
    if err != nil {
        panic(err)
    }

    if err := server.Serve(address); err != nil {
        panic(err)
    }
}


运行效果

先启动服务端程序 server.go
再启动客户端程序 ws-server-grpc-client.go

打开浏览器,进入指定的地址,如 127.0.0.1:8080
输入消息,结果类似下图:


浏览器运行效果

小结

本文的例子充分利用了Websocket全双工通信的特性,实现了前端程序与gRPC服务端通过双向数据流进行交互。在下一篇文章中,笔者将介绍如何利用nginx最新特性实现gRPC服务端的负载均衡

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,406评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,976评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,302评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,366评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,372评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,457评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,872评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,521评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,717评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,523评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,590评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,299评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,859评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,883评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,127评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,760评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,290评论 2 342

推荐阅读更多精彩内容