Rpcx源码之网关Gateway

一、rpcx中的网关

在使用rpcx过程中可能会存在其他语言比如Java、Python、C#等来调用rpcx服务,这意味着需要提供不同的rpcx的协议来支撑,不过rpcx目前已提供了GateWay来为对应的rpcx服务提供了http网关服务,直接通过http的方式来完成不同语言对rpcx服务的调用。
目前rpcx提供了两种部署模式:
1、Gateway:将网关服务单独部署,所有client将http请求发送给gateway,由gateway来完成将http请求转换rpcx请求,再调用对应的rpcx服务,最终再将rpcx返回结果转换成http的response,返回给client


gateway模式

2、Agent:将网关程序与client程序部署在一起,agent以一个后端服务的形式存在client机器上,即使机器上已部署多个client,也只需一个agent来完成将client发送的http请求到本地的agent后端服务,再由agent后端服务将http请求转为rpcx请求,再转发给rpcx服务,最终将rpcx服务返回的结果转为http的response返回给client。


agent模式

二、rpcx的网关应用

在server.go文件中有如下的代码来开启gateway: 在执行server.Serve(network,addrewss)时,会尝试启动一个gateway

func (s *Server) Serve(network, address string) (err error) {
    s.startShutdownListener()
    var ln net.Listener
    ln, err = s.makeListener(network, address)
    if err != nil {
        return
    }

    if network == "http" {
        s.serveByHTTP(ln, "")
        return nil
    }

    // try to start gateway
    ln = s.startGateway(network, ln)

    return s.serveListener(ln)
}

接下来通过一个简单的demo来了解gateway:
server代码:相应的代码没有特殊的地方

package main

import (
    "flag"
    "github.com/smallnest/rpcx/server"
    "rpcx/examples/models"
)

var (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    flag.Parse()

    // 简单创建一个server 主要验证gateway
    s := server.NewServer()
    s.Register(new(models.Arith),"")
    s.Serve("tcp",*addr) 
    //s.Serve("http", *addr)
}

client代码:不同于普通的rpcx请求,需要创建一个http的request,在针对http的request进行加工,主要是header的设置

package main

import (
    "bytes"
    "flag"
    "io/ioutil"
    "net/http"
    "github.com/smallnest/rpcx/codec"
    "github.com/rpcx-ecosystem/rpcx-gateway"
    "rpcx/examples/models"
    "log"
)

var (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    cc := &codec.MsgpackCodec{}

    args := &models.Args{
        A: 100,
        B: 200,
    }

    data, _ := cc.Encode(args)
    // request
    req,err := http.NewRequest("POST","http://127.0.0.1:8972/", bytes.NewReader(data))
    if err != nil{
        log.Fatalf("failed to create request: ", err)
        return
    }

    // 设置header
    h := req.Header
    h.Set(gateway.XMessageID,"10000")
    h.Set(gateway.XMessageType,"0")
    h.Set(gateway.XSerializeType,"3")
    h.Set(gateway.XServicePath,"Arith")
    h.Set(gateway.XServiceMethod,"Mul")

    // 发送http请求
    //  http请求===>rpcx请求===>rpcx服务===>返回rpcx结果===>转换为http的response===>输出到client
    res, err := http.DefaultClient.Do(req)
    if err != nil{
        log.Fatalf("failed to call: ", err)
    }
    defer res.Body.Close()
    // 获取结果
    replyData, err := ioutil.ReadAll(res.Body)
    if err != nil{
        log.Fatalf("failed to read response: ", err)
    }
        // 解码
    reply := &models.Reply{}
    err = cc.Decode(replyData, reply)
    if err != nil{
        log.Fatalf("failed to decode reply: ", err)
    }
    log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}

关于gateway具体处理部分放到第三部分讲解,上面关于rpcx的gateway使用完成,直接启动server、client即可看到输出结果:

2019/02/20 14:35:33 response meta data = map[X-RPCX-MessageStatusType:Normal 
              X-RPCX-Meta: 
              X-RPCX-SerializeType:3 
              X-RPCX-MessageID:10000 
              X-RPCX-ServicePath:Arith 
              X-RPCX-ServiceMethod:Mul 
              X-RPCX-Version:0]
2019/02/20 14:35:33 100 * 200 = 20000

从输出结果的记录日志和上面client中代码可以看到这里面涉及到的http协议转换为rpcx协议涉及到如下的内容:

  • http请求request的header设置

X-RPCX-Version: rpcx 版本
X-RPCX-MesssageType: 设置为0,代表request
X-RPCX-Heartbeat: 是否是heartbeat请求, 默认=false
X-RPCX-Oneway: 是否是单向请求,默认=false.
X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
X-RPCX-MessageID: 消息id, uint64 类型
X-RPCX-ServicePath: service path
X-RPCX-ServiceMethod: service method
X-RPCX-Meta: 额外的metadata数据

  • http响应response的header设置

X-RPCX-Version: rpcx 版本
X-RPCX-MesssageType: 1 ,代表response
X-RPCX-Heartbeat: 是否是heartbeat请求
X-RPCX-MessageStatusType: Error 还是正常返回结果
X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
X-RPCX-MessageID: 消息id, uint64 类型
X-RPCX-ServicePath: service path
X-RPCX-ServiceMethod: service method
X-RPCX-Meta: 额外的metadata
X-RPCX-ErrorMessage: 错误信息, 需要设置X-RPCX-MessageStatusType为Error

三、rpcx的源码剖析

在server执行serve时,有一个启动gateway的操作

s.startGateway(network, ln)

而整个rpcx中关于gateway的部分是在server/gateway.go
startGateway主要完成开启http服务接收请求,同时也会增加一些http的路由:完成将http请求转为rpcx请求,以及返回结果转为http的response在输出给客户端client。

func (s *Server) startGateway(network string, ln net.Listener) net.Listener {
    if network != "tcp" && network != "tcp4" && network != "tcp6" { // network检查
        log.Infof("network is not tcp/tcp4/tcp6 so can not start gateway")
        return ln
    }

    m := cmux.New(ln) // 将当前connection包装成多路复用的模式

    // 指定优先级
    httpLn := m.Match(cmux.HTTP1Fast())  // 只匹配http的请求
    rpcxLn := m.Match(cmux.Any()) // 匹配任意请求

    go s.startHTTP1APIGateway(httpLn) // http路由规则 真正处理:handleGatewayRequest
    go m.Serve()

    return rpcxLn
}

真正完成gateway的协议转换的操作在handleGatewayRequest方法中,也是在启动过程中绑定处理http请求的handler

func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
       // 主要是对http请求头header中参数检查:比如XServicePath
       ...省略代码
    servicePath := r.Header.Get(XServicePath) // 获取header中的XServicePath
       ...省略代码
    req, err := HTTPRequest2RpcxRequest(r) // 将http请求转为rpcx请求
    defer protocol.FreeMsg(req) // 为了复用protocol.Message

    //设置响应头
    ...省略代码
       // 通过context.Context传递开始请求时间 为了timeout操作的处理
    ctx := context.WithValue(context.Background(), StartRequestContextKey, time.Now().UnixNano())
    err = s.auth(ctx, req) // 对request进行认证(需要以Option方式提供AuthFunc)
    if err != nil { // 认证过程出现error 直接将error返回给client
        s.Plugins.DoPreWriteResponse(ctx, req, nil)
        wh.Set(XMessageStatusType, "Error")
        wh.Set(XErrorMessage, err.Error())
        w.WriteHeader(401)
        s.Plugins.DoPostWriteResponse(ctx, req, req.Clone(), err)
        return
    }

    resMetadata := make(map[string]string)
    newCtx := context.WithValue(context.WithValue(ctx, share.ReqMetaDataKey, req.Metadata),
        share.ResMetaDataKey, resMetadata)

    res, err := s.handleRequest(newCtx, req) // 处理请求
    defer protocol.FreeMsg(res)

    if err != nil { // 处理过程出现error 直接将error返回给client
        log.Warnf("rpcx: failed to handle gateway request: %v", err)
        wh.Set(XMessageStatusType, "Error")
        wh.Set(XErrorMessage, err.Error())
        w.WriteHeader(500)
        return
    }

       // 针对rpcx的响应结果进行前置和输出操作

    s.Plugins.DoPreWriteResponse(newCtx, req, nil) // 响应前置操作
    if len(resMetadata) > 0 { //copy meta in context to request
        meta := res.Metadata
        if meta == nil {
            res.Metadata = resMetadata
        } else {
            for k, v := range resMetadata {
                meta[k] = v
            }
        }
    }

    meta := url.Values{}
    for k, v := range res.Metadata {
        meta.Add(k, v)
    }
    wh.Set(XMeta, meta.Encode())
    w.Write(res.Payload)
    s.Plugins.DoPostWriteResponse(newCtx, req, res, err) // 输出结果 
}

将http请求转为rpcx请求之后的请求处理和普通rpcx请求处理无异:handleRequest

func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
        // 获取请求中的参数
        // 获取本地注册记录中具体处理业务的service,完成相应的处理
        // 构建response对象 填充对应的参数内容
        // 输出protocol.Message即是rpcx的返回结果
    
        // service完成对应的操作:需注意提供service有method和function两类
    mtype := service.method[methodName]
    if mtype == nil { 
        if service.function[methodName] != nil { //service提供function来完成服务
            return s.handleRequestForFunction(ctx, req)
        }
        err = errors.New("rpcx: can't find method " + methodName)
        return handleError(res, err)
    }
        // service提供method完成服务
    var argv = argsReplyPools.Get(mtype.ArgType) // 参数类型

    codec := share.Codecs[req.SerializeType()] // 序列化方式
    ...省略代码
    err = codec.Decode(req.Payload, argv) // 解码
    ...省略代码
    replyv := argsReplyPools.Get(mtype.ReplyType) // 返回类型

    if mtype.ArgType.Kind() != reflect.Ptr { // 参数类型是否为指针
        err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
    } else { // 输入参数为指针
        err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
    }
        // 响应体
    if !req.IsOneway() {
        data, err := codec.Encode(replyv) // 编码
        argsReplyPools.Put(mtype.ReplyType, replyv)
        if err != nil {
            return handleError(res, err)

        }
        res.Payload = data
    }

    return res, nil
}

如上基本上罗列出采用gateway方式请求rpcx服务来支持不同的语言通过http协议的调用过程。

四、后记

在源码剖析中并没有将所有代码罗列出来,详细源码剖析见gateway源码

调用过程

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,924评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,781评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,813评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,264评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,273评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,383评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,800评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,482评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,673评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,497评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,545评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,240评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,802评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,866评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,101评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,673评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,245评论 2 341

推荐阅读更多精彩内容

  • 原文https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html...
    梁行之阅读 1,067评论 0 0
  • 深入浅出HTTP协议(WEB开发和面试必备) 1.基础概念篇 a.简介 HTTP是Hyper Text Trans...
    半世韶华忆阑珊阅读 1,210评论 0 7
  • 住在老的小区和新的小区有什么不同呢?新的小区有新的户型,有规划好的停车位,有物业管理,各种设施都是崭新的。老的小区...
    海皇的向日葵阅读 451评论 0 2
  • 租是对资产的付费,这里所说的资产含义非常广,包括土地、矿山、人的才能、发明创造、甚至是特权,只要能够带来收入的就叫...
    loveEconomics阅读 287评论 0 1