一、rpcx中的网关
在使用rpcx过程中可能会存在其他语言比如Java、Python、C#等来调用rpcx服务,这意味着需要提供不同的rpcx的协议来支撑,不过rpcx目前已提供了GateWay来为对应的rpcx服务提供了http网关服务,直接通过http的方式来完成不同语言对rpcx服务的调用。
目前rpcx提供了两种部署模式:
1、Gateway:将网关服务单独部署,所有client将http请求发送给gateway,由gateway来完成将http请求转换rpcx请求,再调用对应的rpcx服务,最终再将rpcx返回结果转换成http的response,返回给client
2、Agent:将网关程序与client程序部署在一起,agent以一个后端服务的形式存在client机器上,即使机器上已部署多个client,也只需一个agent来完成将client发送的http请求到本地的agent后端服务,再由agent后端服务将http请求转为rpcx请求,再转发给rpcx服务,最终将rpcx服务返回的结果转为http的response返回给client。
二、rpcx的网关应用
在server.go文件中有如下的代码来开启gateway: 在执行server.Serve(network,addrewss)时,会尝试启动一个gateway
func (s *Server) Serve(network, address string) (err error) {
s.startShutdownListener()
var ln net.Listener
ln, err = s.makeListener(network, address)
if err != nil {
return
}
if network == "http" {
s.serveByHTTP(ln, "")
return nil
}
// try to start gateway
ln = s.startGateway(network, ln)
return s.serveListener(ln)
}
接下来通过一个简单的demo来了解gateway:
server代码:相应的代码没有特殊的地方
package main
import (
"flag"
"github.com/smallnest/rpcx/server"
"rpcx/examples/models"
)
var (
addr = flag.String("addr","localhost:8972","server address")
)
func main() {
flag.Parse()
// 简单创建一个server 主要验证gateway
s := server.NewServer()
s.Register(new(models.Arith),"")
s.Serve("tcp",*addr)
//s.Serve("http", *addr)
}
client代码:不同于普通的rpcx请求,需要创建一个http的request,在针对http的request进行加工,主要是header的设置
package main
import (
"bytes"
"flag"
"io/ioutil"
"net/http"
"github.com/smallnest/rpcx/codec"
"github.com/rpcx-ecosystem/rpcx-gateway"
"rpcx/examples/models"
"log"
)
var (
addr = flag.String("addr","localhost:8972","server address")
)
func main() {
cc := &codec.MsgpackCodec{}
args := &models.Args{
A: 100,
B: 200,
}
data, _ := cc.Encode(args)
// request
req,err := http.NewRequest("POST","http://127.0.0.1:8972/", bytes.NewReader(data))
if err != nil{
log.Fatalf("failed to create request: ", err)
return
}
// 设置header
h := req.Header
h.Set(gateway.XMessageID,"10000")
h.Set(gateway.XMessageType,"0")
h.Set(gateway.XSerializeType,"3")
h.Set(gateway.XServicePath,"Arith")
h.Set(gateway.XServiceMethod,"Mul")
// 发送http请求
// http请求===>rpcx请求===>rpcx服务===>返回rpcx结果===>转换为http的response===>输出到client
res, err := http.DefaultClient.Do(req)
if err != nil{
log.Fatalf("failed to call: ", err)
}
defer res.Body.Close()
// 获取结果
replyData, err := ioutil.ReadAll(res.Body)
if err != nil{
log.Fatalf("failed to read response: ", err)
}
// 解码
reply := &models.Reply{}
err = cc.Decode(replyData, reply)
if err != nil{
log.Fatalf("failed to decode reply: ", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}
关于gateway具体处理部分放到第三部分讲解,上面关于rpcx的gateway使用完成,直接启动server、client即可看到输出结果:
2019/02/20 14:35:33 response meta data = map[X-RPCX-MessageStatusType:Normal
X-RPCX-Meta:
X-RPCX-SerializeType:3
X-RPCX-MessageID:10000
X-RPCX-ServicePath:Arith
X-RPCX-ServiceMethod:Mul
X-RPCX-Version:0]
2019/02/20 14:35:33 100 * 200 = 20000
从输出结果的记录日志和上面client中代码可以看到这里面涉及到的http协议转换为rpcx协议涉及到如下的内容:
-
http请求request的header设置
X-RPCX-Version: rpcx 版本
X-RPCX-MesssageType: 设置为0,代表request
X-RPCX-Heartbeat: 是否是heartbeat请求, 默认=false
X-RPCX-Oneway: 是否是单向请求,默认=false.
X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
X-RPCX-MessageID: 消息id, uint64 类型
X-RPCX-ServicePath: service path
X-RPCX-ServiceMethod: service method
X-RPCX-Meta: 额外的metadata数据
-
http响应response的header设置
X-RPCX-Version: rpcx 版本
X-RPCX-MesssageType: 1 ,代表response
X-RPCX-Heartbeat: 是否是heartbeat请求
X-RPCX-MessageStatusType: Error 还是正常返回结果
X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
X-RPCX-MessageID: 消息id, uint64 类型
X-RPCX-ServicePath: service path
X-RPCX-ServiceMethod: service method
X-RPCX-Meta: 额外的metadata
X-RPCX-ErrorMessage: 错误信息, 需要设置X-RPCX-MessageStatusType为Error
三、rpcx的源码剖析
在server执行serve时,有一个启动gateway的操作
s.startGateway(network, ln)
而整个rpcx中关于gateway的部分是在server/gateway.go
startGateway主要完成开启http服务接收请求,同时也会增加一些http的路由:完成将http请求转为rpcx请求,以及返回结果转为http的response在输出给客户端client。
func (s *Server) startGateway(network string, ln net.Listener) net.Listener {
if network != "tcp" && network != "tcp4" && network != "tcp6" { // network检查
log.Infof("network is not tcp/tcp4/tcp6 so can not start gateway")
return ln
}
m := cmux.New(ln) // 将当前connection包装成多路复用的模式
// 指定优先级
httpLn := m.Match(cmux.HTTP1Fast()) // 只匹配http的请求
rpcxLn := m.Match(cmux.Any()) // 匹配任意请求
go s.startHTTP1APIGateway(httpLn) // http路由规则 真正处理:handleGatewayRequest
go m.Serve()
return rpcxLn
}
真正完成gateway的协议转换的操作在handleGatewayRequest方法中,也是在启动过程中绑定处理http请求的handler
func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
// 主要是对http请求头header中参数检查:比如XServicePath
...省略代码
servicePath := r.Header.Get(XServicePath) // 获取header中的XServicePath
...省略代码
req, err := HTTPRequest2RpcxRequest(r) // 将http请求转为rpcx请求
defer protocol.FreeMsg(req) // 为了复用protocol.Message
//设置响应头
...省略代码
// 通过context.Context传递开始请求时间 为了timeout操作的处理
ctx := context.WithValue(context.Background(), StartRequestContextKey, time.Now().UnixNano())
err = s.auth(ctx, req) // 对request进行认证(需要以Option方式提供AuthFunc)
if err != nil { // 认证过程出现error 直接将error返回给client
s.Plugins.DoPreWriteResponse(ctx, req, nil)
wh.Set(XMessageStatusType, "Error")
wh.Set(XErrorMessage, err.Error())
w.WriteHeader(401)
s.Plugins.DoPostWriteResponse(ctx, req, req.Clone(), err)
return
}
resMetadata := make(map[string]string)
newCtx := context.WithValue(context.WithValue(ctx, share.ReqMetaDataKey, req.Metadata),
share.ResMetaDataKey, resMetadata)
res, err := s.handleRequest(newCtx, req) // 处理请求
defer protocol.FreeMsg(res)
if err != nil { // 处理过程出现error 直接将error返回给client
log.Warnf("rpcx: failed to handle gateway request: %v", err)
wh.Set(XMessageStatusType, "Error")
wh.Set(XErrorMessage, err.Error())
w.WriteHeader(500)
return
}
// 针对rpcx的响应结果进行前置和输出操作
s.Plugins.DoPreWriteResponse(newCtx, req, nil) // 响应前置操作
if len(resMetadata) > 0 { //copy meta in context to request
meta := res.Metadata
if meta == nil {
res.Metadata = resMetadata
} else {
for k, v := range resMetadata {
meta[k] = v
}
}
}
meta := url.Values{}
for k, v := range res.Metadata {
meta.Add(k, v)
}
wh.Set(XMeta, meta.Encode())
w.Write(res.Payload)
s.Plugins.DoPostWriteResponse(newCtx, req, res, err) // 输出结果
}
将http请求转为rpcx请求之后的请求处理和普通rpcx请求处理无异:handleRequest
func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
// 获取请求中的参数
// 获取本地注册记录中具体处理业务的service,完成相应的处理
// 构建response对象 填充对应的参数内容
// 输出protocol.Message即是rpcx的返回结果
// service完成对应的操作:需注意提供service有method和function两类
mtype := service.method[methodName]
if mtype == nil {
if service.function[methodName] != nil { //service提供function来完成服务
return s.handleRequestForFunction(ctx, req)
}
err = errors.New("rpcx: can't find method " + methodName)
return handleError(res, err)
}
// service提供method完成服务
var argv = argsReplyPools.Get(mtype.ArgType) // 参数类型
codec := share.Codecs[req.SerializeType()] // 序列化方式
...省略代码
err = codec.Decode(req.Payload, argv) // 解码
...省略代码
replyv := argsReplyPools.Get(mtype.ReplyType) // 返回类型
if mtype.ArgType.Kind() != reflect.Ptr { // 参数类型是否为指针
err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
} else { // 输入参数为指针
err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
}
// 响应体
if !req.IsOneway() {
data, err := codec.Encode(replyv) // 编码
argsReplyPools.Put(mtype.ReplyType, replyv)
if err != nil {
return handleError(res, err)
}
res.Payload = data
}
return res, nil
}
如上基本上罗列出采用gateway方式请求rpcx服务来支持不同的语言通过http协议的调用过程。
四、后记
在源码剖析中并没有将所有代码罗列出来,详细源码剖析见gateway源码